aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2024-10-18 22:52:25 +0500
committerGitHub <noreply@github.com>2024-10-18 22:52:25 +0500
commit7b9578cf0d27b671e203cf4f36551b593927c25c (patch)
treef9de10414eb4a0d18ae46f1f17c57a35caa7c124
parent2a74bac2d2d3bccb4e10120f1ead805640ec9dd0 (diff)
downloadydb-7b9578cf0d27b671e203cf4f36551b593927c25c.tar.gz
More test for autopartitionin of topics (#10613)
-rw-r--r--ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp89
1 files changed, 58 insertions, 31 deletions
diff --git a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
index 20f8b71ba5..6c68876bf6 100644
--- a/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
+++ b/ydb/core/persqueue/ut/ut_with_sdk/autoscaling_ut.cpp
@@ -929,9 +929,63 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
+ ui64 GetBalancerTabletId(TTopicSdkTestSetup& setup, const TString& topicPath) {
+ auto pathDescr = setup.GetServer().AnnoyingClient->Ls(topicPath)->Record.GetPathDescription().GetSelf();
+ auto balancerTabletId = pathDescr.GetBalancerTabletID();
+ Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush;
+ UNIT_ASSERT(balancerTabletId);
+ return balancerTabletId;
+ }
+
+ void SplitPartition(TTopicSdkTestSetup& setup, const TString& topicPath, ui32 partitionId) {
+ auto balancerTabletId = GetBalancerTabletId(setup, topicPath);
+ auto edge = setup.GetRuntime().AllocateEdgeActor();
+ setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(partitionId, NKikimrPQ::EScaleStatus::NEED_SPLIT));
+ }
+
+ void AssertPartitionCount(TTopicSdkTestSetup& setup, const TString& topicPath, size_t expectedCount) {
+ auto client = setup.MakeClient();
+ auto describe = client.DescribeTopic(topicPath).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), expectedCount);
+ }
+
+ void WaitAndAssertPartitionCount(TTopicSdkTestSetup& setup, const TString& topicPath, size_t expectedCount) {
+ auto client = setup.MakeClient();
+ size_t partitionCount = 0;
+ for (size_t i = 0; i < 10; ++i) {
+ Sleep(TDuration::Seconds(1));
+ auto describe = client.DescribeTopic(topicPath).GetValueSync();
+ partitionCount = describe.GetTopicDescription().GetPartitions().size();
+ if (partitionCount == expectedCount) {
+ break;
+ }
+ }
+ UNIT_ASSERT_VALUES_EQUAL(partitionCount, expectedCount);
+ }
+
+ Y_UNIT_TEST(WithDir_PartitionSplit_AutosplitByLoad) {
+ TTopicSdkTestSetup setup = CreateSetup();
+ auto tableClient = setup.MakeTableClient();
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+
+ setup.GetServer().AnnoyingClient->MkDir("/Root", "dir");
+
+ ExecuteQuery(session, R"(
+ --!syntax_v1
+ CREATE TOPIC `/Root/dir/origin`
+ WITH (
+ AUTO_PARTITIONING_STRATEGY = 'SCALE_UP',
+ MAX_ACTIVE_PARTITIONS = 50
+ );
+ )");
+
+ AssertPartitionCount(setup, "/Root/dir/origin", 1);
+ SplitPartition(setup, "/Root/dir/origin", 0);
+ WaitAndAssertPartitionCount(setup, "/Root/dir/origin", 3);
+ }
+
Y_UNIT_TEST(CDC_PartitionSplit_AutosplitByLoad) {
TTopicSdkTestSetup setup = CreateSetup();
- auto client = setup.MakeClient();
auto tableClient = setup.MakeTableClient();
auto session = tableClient.CreateSession().GetValueSync().GetSession();
@@ -954,36 +1008,9 @@ Y_UNIT_TEST_SUITE(TopicAutoscaling) {
);
)");
- {
- auto describe = client.DescribeTopic("/Root/origin/feed").GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(describe.GetTopicDescription().GetPartitions().size(), 1);
- }
-
- ui64 balancerTabletId;
- {
- auto pathDescr = setup.GetServer().AnnoyingClient->Ls("/Root/origin/feed/streamImpl")->Record.GetPathDescription().GetSelf();
- balancerTabletId = pathDescr.GetBalancerTabletID();
- Cerr << ">>>>> BalancerTabletID=" << balancerTabletId << Endl << Flush;
- UNIT_ASSERT(balancerTabletId);
- }
-
- {
- const auto edge = setup.GetRuntime().AllocateEdgeActor();
- setup.GetRuntime().SendToPipe(balancerTabletId, edge, new TEvPQ::TEvPartitionScaleStatusChanged(0, NKikimrPQ::EScaleStatus::NEED_SPLIT));
- }
-
- {
- size_t partitionCount = 0;
- for (size_t i = 0; i < 10; ++i) {
- Sleep(TDuration::Seconds(1));
- auto describe = client.DescribeTopic("/Root/origin/feed").GetValueSync();
- partitionCount = describe.GetTopicDescription().GetPartitions().size();
- if (partitionCount == 3) {
- break;
- }
- }
- UNIT_ASSERT_VALUES_EQUAL(partitionCount, 3);
- }
+ AssertPartitionCount(setup, "/Root/origin/feed", 1);
+ SplitPartition(setup, "/Root/origin/feed/streamImpl", 0);
+ WaitAndAssertPartitionCount(setup, "/Root/origin/feed", 3);
}
Y_UNIT_TEST(MidOfRange) {