diff options
author | mokhotskii <mokhotskii@ydb.tech> | 2022-07-26 18:06:03 +0300 |
---|---|---|
committer | mokhotskii <mokhotskii@ydb.tech> | 2022-07-26 18:06:03 +0300 |
commit | 00d6612568801a010ef5cc7514e4f86cb5c8ca78 (patch) | |
tree | a722dd31e262d301ebc86d5fc56c0f0e0d125f53 | |
parent | 90041d9f27af525ba61f4a23456beb6c02c0d194 (diff) | |
download | ydb-00d6612568801a010ef5cc7514e4f86cb5c8ca78.tar.gz |
Refactor PQTabletPrepare function in pq tests
Refactor PQTabletPrepare function in pq tests
Replace parameters with a struct to set only particular ones
-rw-r--r-- | ydb/core/persqueue/pq_ut.cpp | 186 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_ut.h | 58 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_ut_slow.cpp | 87 |
3 files changed, 166 insertions, 165 deletions
diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp index 76a28d0d9e..e54c24a75b 100644 --- a/ydb/core/persqueue/pq_ut.cpp +++ b/ydb/core/persqueue/pq_ut.cpp @@ -149,7 +149,7 @@ Y_UNIT_TEST(TestUserInfoCompatibility) { TString client = "test"; tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); - PQTabletPrepare(20000000, 100_MB, 0, {{client, false}}, tc, 4, 6_MB, true, 0, 0, 1); + PQTabletPrepare({.partitions=4, .specVersion=1,}, {{client, false}}, tc); TVector<std::pair<ui64, TString>> data; data.push_back({1, "s"}); @@ -195,7 +195,7 @@ Y_UNIT_TEST(TestReadRuleVersions) { activeZone = false; TString client = "test"; - PQTabletPrepare(20000000, 100_MB, 0, {{client, false}, {"another-user", false}}, tc, 3); + PQTabletPrepare({.partitions=3}, {{client, false}, {"another-user", false}}, tc); tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); @@ -248,7 +248,7 @@ Y_UNIT_TEST(TestReadRuleVersions) { UNIT_ASSERT(result->Record.GetReadRangeResult(0).GetPair().size() == 7); } - PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 3); + PQTabletPrepare({.partitions=3}, {}, tc); CmdGetOffset(0, client, 0, tc); CmdGetOffset(1, client, 0, tc); @@ -487,8 +487,7 @@ Y_UNIT_TEST(TestCheckACL) { } -void CheckLabeledCountersResponse(ui32 count, TTestContext& tc, TVector<TString> mustHave = {}) -{ +void CheckLabeledCountersResponse(ui32 count, TTestContext& tc, TVector<TString> mustHave = {}) { IActor* actor = CreateClusterLabeledCountersAggregatorActor(tc.Edge, TTabletTypes::PersQueue); tc.Runtime->Register(actor); @@ -520,7 +519,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) { tc.Prepare(dispatchName, setup, activeZone); activeZone = false; tc.Runtime->SetScheduledLimit(600); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc); + PQTabletPrepare({}, {}, tc); { TDispatchOptions options; @@ -530,7 +529,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) { CheckLabeledCountersResponse(8, tc); //only topic counters - PQTabletPrepare(20000000, 100_MB, 0, {{"user", true}}, tc); + PQTabletPrepare({}, {{"user", true}}, tc); { TDispatchOptions options; @@ -541,7 +540,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) { CheckLabeledCountersResponse(8, tc, {NKikimr::JoinPath({"user/1", TOPIC_NAME})}); //topic counters + important - PQTabletPrepare(20000000, 100_MB, 0, {}, tc); + PQTabletPrepare({}, {}, tc); { TDispatchOptions options; @@ -564,7 +563,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) { }; CheckLabeledCountersResponse(8, tc, MakeTopics({"user/0"})); //topic counters + not important - PQTabletPrepare(20000000, 100_MB, 0, {{"user", true}, {"user2", true}}, tc); + PQTabletPrepare({}, {{"user", true}, {"user2", true}}, tc); { TDispatchOptions options; @@ -580,7 +579,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) { CheckLabeledCountersResponse(11, tc, MakeTopics({"user/1", "user2/1"})); //topic counters + not important - PQTabletPrepare(20000000, 100_MB, 0, {{"user", true}, {"user2", false}}, tc); + PQTabletPrepare({}, {{"user", true}, {"user2", false}}, tc); { TDispatchOptions options; @@ -597,7 +596,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) { CheckLabeledCountersResponse(12, tc, MakeTopics({"user/1", "user2/0"})); - PQTabletPrepare(20000000, 100_MB, 0, {{"user", true}}, tc); + PQTabletPrepare({}, {{"user", true}}, tc); { TDispatchOptions options; @@ -628,7 +627,7 @@ Y_UNIT_TEST(TestSeveralOwners) { activeZone = false; tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob + PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob TVector<std::pair<ui64, TString>> data; @@ -660,7 +659,7 @@ Y_UNIT_TEST(TestWaitInOwners) { activeZone = false; tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob + PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob TVector<std::pair<ui64, TString>> data; @@ -756,7 +755,7 @@ Y_UNIT_TEST(TestReserveBytes) { activeZone = false; tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob + PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob TVector<std::pair<ui64, TString>> data; @@ -766,12 +765,12 @@ Y_UNIT_TEST(TestReserveBytes) { data.push_back({2, s.substr(pp)}); auto p = CmdSetOwner(0, tc); - CmdReserveBytes(0, tc, p.first, 0, 20000000, p.second); - CmdReserveBytes(0, tc, p.first, 1, 20000000, p.second, false, true); + CmdReserveBytes(0, tc, p.first, 0, 20'000'000, p.second); + CmdReserveBytes(0, tc, p.first, 1, 20'000'000, p.second, false, true); - CmdReserveBytes(0, tc, p.first, 2, 40000000, p.second); + CmdReserveBytes(0, tc, p.first, 2, 40'000'000, p.second); - CmdReserveBytes(0, tc, p.first, 3, 80000000, p.second, true); + CmdReserveBytes(0, tc, p.first, 3, 80'000'000, p.second, true); TString cookie = p.first; @@ -797,9 +796,9 @@ Y_UNIT_TEST(TestReserveBytes) { CmdWrite(0, "sourceid3", data, tc, false, {}, false, cookie, 6); - CmdReserveBytes(0, tc, p.first, 7, 80000000, p.second); + CmdReserveBytes(0, tc, p.first, 7, 80'000'000, p.second); p = CmdSetOwner(0, tc); - CmdReserveBytes(0, tc, p.first, 0, 80000000, p.second); + CmdReserveBytes(0, tc, p.first, 0, 80'000'000, p.second); }); } @@ -817,7 +816,7 @@ Y_UNIT_TEST(TestMessageNo) { activeZone = false; tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob + PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob TVector<std::pair<ui64, TString>> data; @@ -869,7 +868,8 @@ Y_UNIT_TEST(TestPartitionedBlobFails) { activeZone = false; tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 200_MB, 0, {{"user1", true}}, tc); //one important client, never delete + // One important client, never delete + PQTabletPrepare({.maxSizeInPartition=200_MB}, {{"user1", true}}, tc); TString ss{50_MB, '_'}; char k = 0; @@ -1029,7 +1029,7 @@ Y_UNIT_TEST(TestAlreadyWritten) { activeZone = false; tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob + PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob activeZone = true; TVector<std::pair<ui64, TString>> data; @@ -1056,7 +1056,7 @@ Y_UNIT_TEST(TestAlreadyWrittenWithoutDeduplication) { activeZone = false; tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob + PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob TVector<std::pair<ui64, TString>> data; activeZone = true; @@ -1081,8 +1081,8 @@ Y_UNIT_TEST(TestWritePQCompact) { activeZone = false; tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 8_MB - 512_KB); - //no important clients, lifetimeseconds=0 - delete all right now, except last datablob + // No important clients <-> lifetimeseconds=0 - delete all right now, but last datablob + PQTabletPrepare({.lowWatermark=(8_MB - 512_KB)}, {}, tc); TVector<std::pair<ui64, TString>> data; @@ -1138,7 +1138,7 @@ Y_UNIT_TEST(TestWritePQBigMessage) { activeZone = false; tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 1000_MB, 0, {{"user1", true}}, tc, 2, 8_MB - 512_KB); //nothing dropped + PQTabletPrepare({.lowWatermark=(8_MB - 512_KB)}, {{"user1", true}}, tc); //nothing dropped //no important clients, lifetimeseconds=0 - delete all right now, except last datablob TVector<std::pair<ui64, TString>> data; @@ -1190,7 +1190,7 @@ Y_UNIT_TEST(TestWritePQ) { tc.Prepare(dispatchName, setup, activeZone); tc.Runtime->SetScheduledLimit(100); - PQTabletPrepare(20000000, 100_MB, 0, {{"user", true}}, tc); //important client, lifetimeseconds=0 - never delete + PQTabletPrepare({}, {{"user", true}}, tc); //important client, lifetimeseconds=0 - never delete TVector<std::pair<ui64, TString>> data, data1, data2; activeZone = PlainOrSoSlow(true, false); @@ -1230,7 +1230,7 @@ Y_UNIT_TEST(TestWritePQ) { CmdWrite(0,"sourceid5", data, tc); CmdWrite(0,"sourceid6", data1, tc); CmdWrite(0,"sourceid7", data, tc); - data.back().first = 4296000000lu; + data.back().first = 4'296'000'000lu; CmdWrite(0,"sourceid8", data, tc); PQGetPartInfo(100, 113, tc); @@ -1281,7 +1281,7 @@ Y_UNIT_TEST(TestSourceIdDropByUserWrites) { tc.Prepare(dispatchName, setup, activeZone); tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important client, lifetimeseconds=0 - delete right now + PQTabletPrepare({}, {}, tc); //no important client, lifetimeseconds=0 - delete right now TVector<std::pair<ui64, TString>> data; activeZone = true; @@ -1318,7 +1318,7 @@ Y_UNIT_TEST(TestSourceIdDropBySourceIdCount) { tc.Prepare(dispatchName, setup, activeZone); tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 6_MB, true, 0, 3); //no important client, lifetimeseconds=0 - delete right now + PQTabletPrepare({.sidMaxCount=3}, {}, tc); //no important client, lifetimeseconds=0 - delete right now TVector<std::pair<ui64, TString>> data; activeZone = true; @@ -1361,16 +1361,16 @@ Y_UNIT_TEST(TestWriteOffsetWithBigMessage) { tc.Prepare(dispatchName, setup, activeZone); tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {{{"user", true}}}, tc, 3); //important client, lifetimeseconds=0 - never delete + PQTabletPrepare({.partitions=3}, {{{"user", true}}}, tc); //important client, lifetimeseconds=0 - never delete activeZone = false; TVector<std::pair<ui64, TString>> data; data.push_back({1, TString{10_MB, 'a'}}); - CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 80000); + CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 80'000); data.front().first = 2; - CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 160000); + CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 160'000); data.clear(); data.push_back({1, TString{100_KB, 'a'}}); @@ -1378,11 +1378,11 @@ Y_UNIT_TEST(TestWriteOffsetWithBigMessage) { data.push_back(data.front()); data.back().first = i + 2; } - CmdWrite(0, "sourceIdx", data, tc, false, {}, false, "", -1, 80000); - PQGetPartInfo(80000, 80101, tc); + CmdWrite(0, "sourceIdx", data, tc, false, {}, false, "", -1, 80'000); + PQGetPartInfo(80'000, 80'101, tc); data.resize(70); CmdWrite(2, "sourceId1", data, tc, false, {}, false, "", -1, 0); - CmdWrite(2, "sourceId2", data, tc, false, {}, false, "", -1, 80000); + CmdWrite(2, "sourceId2", data, tc, false, {}, false, "", -1, 80'000); }); } @@ -1397,17 +1397,17 @@ Y_UNIT_TEST(TestWriteSplit) { activeZone = false; tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {{"user1", true}}, tc); //never delete + PQTabletPrepare({}, {{"user1", true}}, tc); //never delete const ui32 size = PlainOrSoSlow(2_MB, 1_MB); TVector<std::pair<ui64, TString>> data; data.push_back({1, TString{size, 'b'}}); data.push_back({2, TString{size, 'a'}}); activeZone = PlainOrSoSlow(true, false); - CmdWrite(0, "sourceIdx", data, tc, false, {}, false, "", -1, 40000); + CmdWrite(0, "sourceIdx", data, tc, false, {}, false, "", -1, 40'000); RestartTablet(tc); activeZone = false; - PQGetPartInfo(40000, 40002, tc); + PQGetPartInfo(40'000, 40'002, tc); }); } @@ -1421,7 +1421,7 @@ Y_UNIT_TEST(TestLowWatermark) { tc.Prepare(dispatchName, setup, activeZone); tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 2_MB); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob + PQTabletPrepare({.lowWatermark=2_MB}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob TVector<std::pair<ui64, TString>> data; @@ -1433,12 +1433,12 @@ Y_UNIT_TEST(TestLowWatermark) { data.push_back({3, ss.substr(pp)}); CmdWrite(0,"sourceid0", data, tc, false, {}, true); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 6_MB); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob + PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob CmdWrite(0,"sourceid1", data, tc, false, {}, false); //first are compacted PQGetPartInfo(0, 6, tc); CmdWrite(0,"sourceid2", data, tc, false, {}, false); //3 and 6 are compacted PQGetPartInfo(3, 9, tc); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 3_MB); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob + PQTabletPrepare({.lowWatermark=3_MB}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob CmdWrite(0,"sourceid3", data, tc, false, {}, false); //3, 6 and 3 are compacted data.resize(1); CmdWrite(0,"sourceid4", data, tc, false, {}, false); //3, 6 and 3 are compacted @@ -1459,7 +1459,7 @@ Y_UNIT_TEST(TestWriteToFullPartition) { tc.Runtime->SetScheduledLimit(100); - PQTabletPrepare(11, 100_MB, 0, {}, tc); + PQTabletPrepare({.maxCountInPartition=11}, {}, tc); TVector<std::pair<ui64, TString>> data; activeZone = PlainOrSoSlow(true, false); @@ -1470,13 +1470,13 @@ Y_UNIT_TEST(TestWriteToFullPartition) { data.push_back({i + 1, s.substr(pp)}); } CmdWrite(0, "sourceid0", data, tc, false, {}, true); //now 1 blob - PQTabletPrepare(10, 100_MB, 0, {}, tc); + PQTabletPrepare({.maxCountInPartition=10}, {}, tc); PQGetPartInfo(0, 10, tc); data.resize(1); CmdWrite(0, "sourceid1", data, tc, true); - PQTabletPrepare(12, 100_MB, 0, {}, tc); + PQTabletPrepare({.maxCountInPartition=12}, {}, tc); CmdWrite(0, "sourceid1", data, tc); - PQTabletPrepare(12, 100, 0, {}, tc); + PQTabletPrepare({.maxCountInPartition=12, .maxSizeInPartition=100}, {}, tc); CmdWrite(0, "sourceid1", data, tc, true); }); } @@ -1502,13 +1502,14 @@ Y_UNIT_TEST(TestTimeRetention) { for (ui32 i = 0; i < 10; ++i) { data.push_back({i + 1, s.substr(pp)}); } - PQTabletPrepare(1000, 100_MB, TDuration::Seconds(1000).Seconds(), {}, tc, 2, 100); + PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=TDuration::Seconds(1000).Seconds(), + .lowWatermark=100}, {}, tc); CmdWrite(0, "sourceid0", data, tc, false, {}, true); CmdWrite(0, "sourceid1", data, tc, false); CmdWrite(0, "sourceid2", data, tc, false); PQGetPartInfo(0, 30, tc); - PQTabletPrepare(1000, 100_MB, TDuration::Seconds(0).Seconds(), {}, tc, 2, 100); + PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=0, .lowWatermark=100}, {}, tc); CmdWrite(0, "sourceid3", data, tc, false); CmdWrite(0, "sourceid4", data, tc, false); CmdWrite(0, "sourceid5", data, tc, false); @@ -1537,13 +1538,13 @@ Y_UNIT_TEST(TestStorageRetention) { for (ui32 i = 0; i < 10; ++i) { data.push_back({i + 1, s.substr(pp)}); } - PQTabletPrepare(1000, 100_MB, 0, {}, tc, 2, 100, true, 0, 0, 0, 1_MB); + PQTabletPrepare({.maxCountInPartition=1000, .lowWatermark=100, .storageLimitBytes=1_MB}, {}, tc); CmdWrite(0, "sourceid0", data, tc, false, {}, true); //now 1 blob CmdWrite(0, "sourceid1", data, tc, false); CmdWrite(0, "sourceid2", data, tc, false); PQGetPartInfo(0, 30, tc); - PQTabletPrepare(1000, 100_MB, 0, {}, tc, 2, 50, true, 0, 0, 0, 160); + PQTabletPrepare({.maxCountInPartition=1000, .lowWatermark=50, .storageLimitBytes=160}, {}, tc); CmdWrite(0, "sourceid3", data, tc, false); CmdWrite(0, "sourceid4", data, tc, false); PQGetPartInfo(40, 50, tc); @@ -1562,7 +1563,7 @@ Y_UNIT_TEST(TestPQPartialRead) { tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); //important client - never delete + PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete activeZone = false; TVector<std::pair<ui64, TString>> data; @@ -1595,7 +1596,7 @@ Y_UNIT_TEST(TestPQRead) { tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); //important client - never delete + PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete activeZone = false; TVector<std::pair<ui64, TString>> data; @@ -1647,7 +1648,7 @@ Y_UNIT_TEST(TestPQSmallRead) { tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); //important client - never delete + PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete activeZone = false; TVector<std::pair<ui64, TString>> data; @@ -1689,7 +1690,7 @@ Y_UNIT_TEST(TestPQReadAhead) { tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); //important client - never delete + PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete TVector<std::pair<ui64, TString>> data; @@ -1735,7 +1736,7 @@ Y_UNIT_TEST(TestOwnership) { tc.Runtime->SetScheduledLimit(50); - PQTabletPrepare(10, 100_MB, 0, {}, tc); + PQTabletPrepare({.maxCountInPartition=10}, {}, tc); TString cookie, cookie2; cookie = CmdSetOwner(0, tc).first; @@ -1755,7 +1756,7 @@ Y_UNIT_TEST(TestSetClientOffset) { tc.Prepare(dispatchName, setup, activeZone); tc.Runtime->SetScheduledLimit(50); - PQTabletPrepare(10, 100_MB, 0, {{"user1", false}}, tc); + PQTabletPrepare({.maxCountInPartition=10}, {{"user1", false}}, tc); activeZone = true; @@ -1785,7 +1786,7 @@ Y_UNIT_TEST(TestReadSessions) { tc.Prepare(dispatchName, setup, activeZone); tc.Runtime->SetScheduledLimit(50); - PQTabletPrepare(10, 100_MB, 0, {{"user1", false}}, tc); + PQTabletPrepare({.maxCountInPartition=10}, {{"user1", false}}, tc); activeZone = true; @@ -1827,7 +1828,7 @@ Y_UNIT_TEST(TestGetTimestamps) { tc.Runtime->UpdateCurrentTime(TInstant::Zero() + TDuration::Days(2)); activeZone = false; - PQTabletPrepare(10, 100_MB, 0, {{"user1", false}}, tc); + PQTabletPrepare({.maxCountInPartition=10}, {{"user1", false}}, tc); TVector<std::pair<ui64, TString>> data; data.push_back({1, TString(1_KB, 'a')}); @@ -1893,9 +1894,12 @@ Y_UNIT_TEST(TestChangeConfig) { data.push_back({i + 1, ss}); } - PQTabletPrepare(100, 100_MB, 86400 * 2, {{"aaa", true}}, tc, 5); + PQTabletPrepare({.maxCountInPartition=100, .deleteTime=TDuration::Days(2).Seconds(), .partitions=5}, + {{"aaa", true}}, tc); CmdWrite(0, "sourceid0", data, tc, false, {}, true); //now 1 blob - PQTabletPrepare(5, 1_MB, 86400, {{"bbb", true}, {"ccc", true}}, tc, 10); + + PQTabletPrepare({.maxCountInPartition=5, .maxSizeInPartition=1_MB, + .deleteTime=TDuration::Days(1).Seconds(), .partitions=10}, {{"bbb", true}, {"ccc", true}}, tc); data.pop_back(); //to be sure that after write partition will no be full CmdWrite(0, "sourceid1", data, tc, true); //partition is full CmdWrite(1, "sourceid2", data, tc); @@ -1928,7 +1932,8 @@ Y_UNIT_TEST(TestReadSubscription) { data.push_back({i + 1, ss}); } - PQTabletPrepare(100, 100_MB, 86400 * 2, {{"user1", true}}, tc, 5); + PQTabletPrepare({.maxCountInPartition=100, .deleteTime=TDuration::Days(2).Seconds(), .partitions=5}, + {{"user1", true}}, tc); CmdWrite(0, "sourceid0", data, tc, false, {}, true); TAutoPtr<IEventHandle> handle; @@ -1942,7 +1947,7 @@ Y_UNIT_TEST(TestReadSubscription) { read->SetOffset(5); read->SetClientId("user1"); read->SetCount(5); - read->SetBytes(1000000); + read->SetBytes(1'000'000); read->SetTimeoutMs(5000); tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); @@ -1961,7 +1966,7 @@ Y_UNIT_TEST(TestReadSubscription) { read->SetOffset(5); read->SetClientId("user1"); read->SetCount(3); - read->SetBytes(1000000); + read->SetBytes(1'000'000); read->SetTimeoutMs(5000); tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); //got read @@ -1982,7 +1987,7 @@ Y_UNIT_TEST(TestReadSubscription) { read->SetOffset(10); read->SetClientId("user1"); read->SetCount(55); - read->SetBytes(1000000); + read->SetBytes(1'000'000); read->SetTimeoutMs(5000); tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); //got read @@ -2012,7 +2017,7 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) { tc.Runtime->SetScheduledLimit(200); activeZone = false; - PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); //important client - never delete + PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete TVector<std::pair<ui64, TString>> data; @@ -2070,7 +2075,7 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) { tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); + PQTabletPrepare({}, {{"aaa", true}}, tc); activeZone = false; @@ -2082,17 +2087,21 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) { CmdWrite(0, "sourceid0", data, tc, false, {}, i == 0); tc.Runtime->UpdateCurrentTime(tc.Runtime->GetCurrentTime() + TDuration::Minutes(1)); } - auto ts = tc.Runtime->GetCurrentTime(); + const auto ts = tc.Runtime->GetCurrentTime(); CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0}); - CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 3 * 60 * 1000); - CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 3 * 60 * 1000); + CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, TDuration::Minutes(3).MilliSeconds()); + CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, TDuration::Minutes(3).MilliSeconds()); CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 1000); - CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0, ts.MilliSeconds() - 3 * 60 * 1000); - CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 0, ts.MilliSeconds() - 3 * 60 * 1000); - CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 0, ts.MilliSeconds() - 1000); + CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0, + (ts - TDuration::Minutes(3)).MilliSeconds()); + CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 0, + (ts - TDuration::Minutes(3)).MilliSeconds()); + CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 0, + (ts - TDuration::Seconds(1)).MilliSeconds()); - PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc, 2, 6_MB, true, ts.MilliSeconds() - 1000); + PQTabletPrepare({.readFromTimestampsMs=(ts - TDuration::Seconds(1)).MilliSeconds()}, + {{"aaa", true}}, tc); CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {34}); }); @@ -2108,31 +2117,31 @@ Y_UNIT_TEST(TestWriteTimeStampEstimate) { tc.Runtime->SetDispatchTimeout(TDuration::Seconds(1)); tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); - PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); + PQTabletPrepare({}, {{"aaa", true}}, tc); - tc.Runtime->UpdateCurrentTime(TInstant::MilliSeconds(1000000)); + tc.Runtime->UpdateCurrentTime(TInstant::MilliSeconds(1'000'000)); TVector<std::pair<ui64, TString>> data{{1,"abacaba"}}; CmdWrite(0, "sourceid0", data, tc); - CmdGetOffset(0, "user1", 0, tc, -1, 1000000); + CmdGetOffset(0, "user1", 0, tc, -1, 1'000'000); - PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc, 2, 6_MB, false); + PQTabletPrepare({.localDC=false}, {{"aaa", true}}, tc); RestartTablet(tc); CmdGetOffset(0, "user1", 0, tc, -1, 0); - tc.Runtime->UpdateCurrentTime(TInstant::MilliSeconds(2000000)); + tc.Runtime->UpdateCurrentTime(TInstant::MilliSeconds(2'000'000)); data.front().first = 2; CmdWrite(0, "sourceid0", data, tc); - CmdGetOffset(0, "user1", 0, tc, -1, 2000000); + CmdGetOffset(0, "user1", 0, tc, -1, 2'000'000); - CmdUpdateWriteTimestamp(0, 3000000, tc); + CmdUpdateWriteTimestamp(0, 3'000'000, tc); - CmdGetOffset(0, "user1", 0, tc, -1, 3000000); + CmdGetOffset(0, "user1", 0, tc, -1, 3'000'000); } @@ -2147,9 +2156,9 @@ Y_UNIT_TEST(TestWriteTimeLag) { tc.Runtime->SetDispatchTimeout(TDuration::Seconds(1)); tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); - PQTabletPrepare(20000000, 1_TB, 0, {{"aaa", false}}, tc); + PQTabletPrepare({.maxSizeInPartition=1_TB}, {{"aaa", false}}, tc); - TVector<std::pair<ui64, TString>> data{{1,TString(1024*1024, 'a')}}; + TVector<std::pair<ui64, TString>> data{{1,TString(1_MB, 'a')}}; for (ui32 i = 0; i < 20; ++i) { CmdWrite(0, TStringBuilder() << "sourceid" << i, data, tc); } @@ -2157,9 +2166,10 @@ Y_UNIT_TEST(TestWriteTimeLag) { // After restart all caches are empty. RestartTablet(tc); - PQTabletPrepare(20000000, 1_TB, 0, {{"aaa", false}, {"important", true}, {"another", true}}, tc); - PQTabletPrepare(20000000, 1_TB, 0, {{"aaa", false}, {"another1", true}, {"important", true}}, tc); - PQTabletPrepare(20000000, 1_TB, 0, {{"aaa", false}, {"another1", true}, {"important", true}, {"another", false}}, tc); + PQTabletPrepare({.maxSizeInPartition=1_TB}, {{"aaa", false}, {"important", true}, {"another", true}}, tc); + PQTabletPrepare({.maxSizeInPartition=1_TB}, {{"aaa", false}, {"another1", true}, {"important", true}}, tc); + PQTabletPrepare({.maxSizeInPartition=1_TB}, + {{"aaa", false}, {"another1", true}, {"important", true}, {"another", false}}, tc); CmdGetOffset(0, "important", 12, tc, -1, 0); @@ -2204,7 +2214,7 @@ Y_UNIT_TEST(TestTabletRestoreEventsOrder) { // Scenario 2: expect EvTabletActive only after partitions init complete CheckEventSequence(tc, /*scenario=*/[&tc]() { - PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc, /*partitions=*/2); + PQTabletPrepare({}, {{"aaa", true}}, tc); ForwardToTablet(*tc.Runtime, tc.TabletId, tc.Edge, new TEvents::TEvPoisonPill()); }, /*expectedEvents=*/{ TEvTablet::TEvRestored::EventType, diff --git a/ydb/core/persqueue/pq_ut.h b/ydb/core/persqueue/pq_ut.h index 3226115a67..de8425f855 100644 --- a/ydb/core/persqueue/pq_ut.h +++ b/ydb/core/persqueue/pq_ut.h @@ -221,24 +221,24 @@ struct TFinalizer { // SINGLE COMMAND TEST FUNCTIONS //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -void PQTabletPrepare( - ui32 mcip, - ui64 msip, - ui32 deleteTime, - const TVector<std::pair<TString, bool>>& users, - TTestContext& tc, - int partitions = 2, - ui32 lw = 6_MB, - bool localDC = true, - ui64 ts = 0, - ui64 sidMaxCount = 0, - ui32 specVersion = 0, - i32 storageLimitBytes = 0 - ) { +struct TTabletPreparationParameters { + ui32 maxCountInPartition{20'000'000}; + ui64 maxSizeInPartition{100_MB}; + ui32 deleteTime{0}; // Delete instantly + ui32 partitions{2}; + ui32 lowWatermark{6_MB}; + bool localDC{true}; + ui64 readFromTimestampsMs{0}; + ui64 sidMaxCount{0}; + ui32 specVersion{0}; + i32 storageLimitBytes{0}; +}; +void PQTabletPrepare(const TTabletPreparationParameters& parameters, + const TVector<std::pair<TString, bool>>& users, TTestContext& tc) { TAutoPtr<IEventHandle> handle; static int version = 0; - if (specVersion) { - version = specVersion; + if (parameters.specVersion) { + version = parameters.specVersion; } else { ++version; } @@ -247,7 +247,7 @@ void PQTabletPrepare( tc.Runtime->ResetScheduledCount(); THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig()); - for (i32 i = 0; i < partitions; ++i) { + for (ui32 i = 0; i < parameters.partitions; ++i) { request->Record.MutableTabletConfig()->AddPartitionIds(i); } request->Record.MutableTabletConfig()->SetCacheSize(10_MB); @@ -262,22 +262,22 @@ void PQTabletPrepare( } tabletConfig->SetTopic("topic"); tabletConfig->SetVersion(version); - tabletConfig->SetLocalDC(localDC); + tabletConfig->SetLocalDC(parameters.localDC); tabletConfig->AddReadRules("user"); - tabletConfig->AddReadFromTimestampsMs(ts); + tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs); auto config = tabletConfig->MutablePartitionConfig(); - config->SetMaxCountInPartition(mcip); - config->SetMaxSizeInPartition(msip); - if (storageLimitBytes > 0) { - config->SetStorageLimitBytes(storageLimitBytes); + config->SetMaxCountInPartition(parameters.maxCountInPartition); + config->SetMaxSizeInPartition(parameters.maxSizeInPartition); + if (parameters.storageLimitBytes > 0) { + config->SetStorageLimitBytes(parameters.storageLimitBytes); } else { - config->SetLifetimeSeconds(deleteTime); + config->SetLifetimeSeconds(parameters.deleteTime); } - config->SetSourceIdLifetimeSeconds(1*60*60); - if (sidMaxCount > 0) - config->SetSourceIdMaxCounts(sidMaxCount); - config->SetMaxWriteInflightSize(90000000); - config->SetLowWatermark(lw); + config->SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds()); + if (parameters.sidMaxCount > 0) + config->SetSourceIdMaxCounts(parameters.sidMaxCount); + config->SetMaxWriteInflightSize(90'000'000); + config->SetLowWatermark(parameters.lowWatermark); for (auto& u : users) { if (u.second) diff --git a/ydb/core/persqueue/pq_ut_slow.cpp b/ydb/core/persqueue/pq_ut_slow.cpp index 37e90f3533..d3d3678b19 100644 --- a/ydb/core/persqueue/pq_ut_slow.cpp +++ b/ydb/core/persqueue/pq_ut_slow.cpp @@ -1,32 +1,28 @@ #include "pq_ut.h" -#include <ydb/core/testlib/basics/runtime.h> -#include <ydb/core/tablet_flat/tablet_flat_executed.h> -#include <ydb/core/tx/schemeshard/schemeshard.h> -#include <ydb/public/lib/base/msgbus.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/system/sanitizers.h> +#include <util/system/valgrind.h> + +#include <ydb/core/engine/minikql/flat_local_tx_factory.h> #include <ydb/core/keyvalue/keyvalue_events.h> -#include <ydb/core/tablet/tablet_counters_aggregator.h> -#include <ydb/core/persqueue/partition.h> #include <ydb/core/persqueue/events/global.h> -#include <ydb/core/engine/minikql/flat_local_tx_factory.h> +#include <ydb/core/persqueue/partition.h> #include <ydb/core/security/ticket_parser.h> - +#include <ydb/core/tablet/tablet_counters_aggregator.h> +#include <ydb/core/tablet_flat/tablet_flat_executed.h> +#include <ydb/core/testlib/basics/runtime.h> #include <ydb/core/testlib/fake_scheme_shard.h> #include <ydb/core/testlib/tablet_helpers.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> -#include <library/cpp/testing/unittest/registar.h> +#include <ydb/public/lib/base/msgbus.h> -#include <util/system/sanitizers.h> -#include <util/system/valgrind.h> namespace NKikimr { -Y_UNIT_TEST_SUITE(TPQTestSlow) { - - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// SINGLE COMMAND TEST FUNCTIONS -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +Y_UNIT_TEST_SUITE(TPQTestSlow) { Y_UNIT_TEST(TestWriteVeryBigMessage) { TTestContext tc; @@ -38,16 +34,16 @@ Y_UNIT_TEST(TestWriteVeryBigMessage) { tc.Runtime->SetScheduledLimit(200); activeZone = false; - PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //always delete + PQTabletPrepare({}, {}, tc); //always delete TVector<std::pair<ui64, TString>> data; data.push_back({1, TString{10, 'b'}}); - CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 40000); + CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 40'000); data.clear(); const ui32 size = PlainOrSoSlow(40_MB, 1_MB); const ui32 so = PlainOrSoSlow(1, 0); data.push_back({2, TString{size, 'a'}}); - CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 80000); + CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 80'000); CmdWrite(0, "sourceIdx", data, tc, false, {}, false, "", -1, 0); activeZone = true; PQGetPartInfo(so, 1, tc); @@ -56,7 +52,6 @@ Y_UNIT_TEST(TestWriteVeryBigMessage) { }); } - Y_UNIT_TEST(TestOnDiskStoredSourceIds) { TTestContext tc; RunTestWithReboots(tc.TabletIds, [&]() { @@ -66,7 +61,8 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) { tc.Prepare(dispatchName, setup, activeZone); tc.Runtime->SetScheduledLimit(200); - PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 6_MB, true, 0, 3); //no important client, lifetimeseconds=0 - delete right now + // No important client, lifetimeseconds=0 - delete right now + PQTabletPrepare({.sidMaxCount=3}, {}, tc); TVector<TString> writtenSourceIds; @@ -89,32 +85,34 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) { req->SetOwnerCookie(cookie); req->SetMessageNo(0); req->SetCmdWriteOffset(offset); - for (ui32 i=0; i < NUM_SOURCEIDS; ++i) { - auto write = req->AddCmdWrite(); - write->SetSourceId(TStringBuilder() << "sourceid_" << i); - write->SetSeqNo(0); - write->SetData(ss); + for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) { + auto write = req->AddCmdWrite(); + write->SetSourceId(TStringBuilder() << "sourceid_" << i); + write->SetSeqNo(0); + write->SetData(ss); } tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); TAutoPtr<IEventHandle> handle; TEvPersQueue::TEvResponse *result; result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){ - if (!ev.Record.HasPartitionResponse() || !ev.Record.GetPartitionResponse().HasCmdReadResult()) - return true; - return false; - }); //there could be outgoing reads in TestReadSubscription test + if (!ev.Record.HasPartitionResponse() || + !ev.Record.GetPartitionResponse().HasCmdReadResult()) { + return true; + } + return false; + }); //there could be outgoing reads in TestReadSubscription test UNIT_ASSERT(result); UNIT_ASSERT(result->Record.HasStatus()); if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) { - tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress - retriesLeft = 2; - continue; + tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress + retriesLeft = 2; + continue; } if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_BAD_OFFSET) { - ++offset; - continue; + ++offset; + continue; } Cout << "Error code is " << static_cast<int>(result->Record.GetErrorCode()) << Endl; @@ -122,13 +120,9 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) { UNIT_ASSERT(result->Record.GetPartitionResponse().CmdWriteResultSize() == NUM_SOURCEIDS); for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) { - UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten()); - UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset()); + UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten()); + UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset()); } - // for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) { - // auto res = result->Record.GetPartitionResponse().GetCmdWriteResult(i); - // UNIT_ASSERT(!result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten()); - // } retriesLeft = 0; } catch (NActors::TSchedulingLimitReachedException) { @@ -136,11 +130,11 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) { retriesLeft = 3; } } - for (ui64 i=100; i < 104; ++i) { + for (ui64 i = 100; i < 104; ++i) { CmdWrite(0, TStringBuilder() << "sourceid_" << i, data, tc, false, {}, false, "", -1, offset + i); Cout << TInstant::Now() << " written sourceid_" << i << Endl; } - for (ui64 i=100; i < 104; ++i) { + for (ui64 i = 100; i < 104; ++i) { writtenSourceIds.push_back(TStringBuilder() << "sourceid_" << i); } Cout << TInstant::Now() << " now check list of sourceIds" << Endl; @@ -155,8 +149,5 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) { }); } - - - } // TKeyValueTest -} // NKikimr +} // namespace NKikimr |