aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@ydb.tech>2022-07-26 18:06:03 +0300
committermokhotskii <mokhotskii@ydb.tech>2022-07-26 18:06:03 +0300
commit00d6612568801a010ef5cc7514e4f86cb5c8ca78 (patch)
treea722dd31e262d301ebc86d5fc56c0f0e0d125f53
parent90041d9f27af525ba61f4a23456beb6c02c0d194 (diff)
downloadydb-00d6612568801a010ef5cc7514e4f86cb5c8ca78.tar.gz
Refactor PQTabletPrepare function in pq tests
Refactor PQTabletPrepare function in pq tests Replace parameters with a struct to set only particular ones
-rw-r--r--ydb/core/persqueue/pq_ut.cpp186
-rw-r--r--ydb/core/persqueue/pq_ut.h58
-rw-r--r--ydb/core/persqueue/pq_ut_slow.cpp87
3 files changed, 166 insertions, 165 deletions
diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp
index 76a28d0d9e..e54c24a75b 100644
--- a/ydb/core/persqueue/pq_ut.cpp
+++ b/ydb/core/persqueue/pq_ut.cpp
@@ -149,7 +149,7 @@ Y_UNIT_TEST(TestUserInfoCompatibility) {
TString client = "test";
tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG);
- PQTabletPrepare(20000000, 100_MB, 0, {{client, false}}, tc, 4, 6_MB, true, 0, 0, 1);
+ PQTabletPrepare({.partitions=4, .specVersion=1,}, {{client, false}}, tc);
TVector<std::pair<ui64, TString>> data;
data.push_back({1, "s"});
@@ -195,7 +195,7 @@ Y_UNIT_TEST(TestReadRuleVersions) {
activeZone = false;
TString client = "test";
- PQTabletPrepare(20000000, 100_MB, 0, {{client, false}, {"another-user", false}}, tc, 3);
+ PQTabletPrepare({.partitions=3}, {{client, false}, {"another-user", false}}, tc);
tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG);
@@ -248,7 +248,7 @@ Y_UNIT_TEST(TestReadRuleVersions) {
UNIT_ASSERT(result->Record.GetReadRangeResult(0).GetPair().size() == 7);
}
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 3);
+ PQTabletPrepare({.partitions=3}, {}, tc);
CmdGetOffset(0, client, 0, tc);
CmdGetOffset(1, client, 0, tc);
@@ -487,8 +487,7 @@ Y_UNIT_TEST(TestCheckACL) {
}
-void CheckLabeledCountersResponse(ui32 count, TTestContext& tc, TVector<TString> mustHave = {})
-{
+void CheckLabeledCountersResponse(ui32 count, TTestContext& tc, TVector<TString> mustHave = {}) {
IActor* actor = CreateClusterLabeledCountersAggregatorActor(tc.Edge, TTabletTypes::PersQueue);
tc.Runtime->Register(actor);
@@ -520,7 +519,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) {
tc.Prepare(dispatchName, setup, activeZone);
activeZone = false;
tc.Runtime->SetScheduledLimit(600);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc);
+ PQTabletPrepare({}, {}, tc);
{
TDispatchOptions options;
@@ -530,7 +529,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) {
CheckLabeledCountersResponse(8, tc); //only topic counters
- PQTabletPrepare(20000000, 100_MB, 0, {{"user", true}}, tc);
+ PQTabletPrepare({}, {{"user", true}}, tc);
{
TDispatchOptions options;
@@ -541,7 +540,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) {
CheckLabeledCountersResponse(8, tc, {NKikimr::JoinPath({"user/1", TOPIC_NAME})}); //topic counters + important
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc);
+ PQTabletPrepare({}, {}, tc);
{
TDispatchOptions options;
@@ -564,7 +563,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) {
};
CheckLabeledCountersResponse(8, tc, MakeTopics({"user/0"})); //topic counters + not important
- PQTabletPrepare(20000000, 100_MB, 0, {{"user", true}, {"user2", true}}, tc);
+ PQTabletPrepare({}, {{"user", true}, {"user2", true}}, tc);
{
TDispatchOptions options;
@@ -580,7 +579,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) {
CheckLabeledCountersResponse(11, tc, MakeTopics({"user/1", "user2/1"})); //topic counters + not important
- PQTabletPrepare(20000000, 100_MB, 0, {{"user", true}, {"user2", false}}, tc);
+ PQTabletPrepare({}, {{"user", true}, {"user2", false}}, tc);
{
TDispatchOptions options;
@@ -597,7 +596,7 @@ Y_UNIT_TEST(TestSwitchOffImportantFlag) {
CheckLabeledCountersResponse(12, tc, MakeTopics({"user/1", "user2/0"}));
- PQTabletPrepare(20000000, 100_MB, 0, {{"user", true}}, tc);
+ PQTabletPrepare({}, {{"user", true}}, tc);
{
TDispatchOptions options;
@@ -628,7 +627,7 @@ Y_UNIT_TEST(TestSeveralOwners) {
activeZone = false;
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
+ PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
TVector<std::pair<ui64, TString>> data;
@@ -660,7 +659,7 @@ Y_UNIT_TEST(TestWaitInOwners) {
activeZone = false;
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
+ PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
TVector<std::pair<ui64, TString>> data;
@@ -756,7 +755,7 @@ Y_UNIT_TEST(TestReserveBytes) {
activeZone = false;
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
+ PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
TVector<std::pair<ui64, TString>> data;
@@ -766,12 +765,12 @@ Y_UNIT_TEST(TestReserveBytes) {
data.push_back({2, s.substr(pp)});
auto p = CmdSetOwner(0, tc);
- CmdReserveBytes(0, tc, p.first, 0, 20000000, p.second);
- CmdReserveBytes(0, tc, p.first, 1, 20000000, p.second, false, true);
+ CmdReserveBytes(0, tc, p.first, 0, 20'000'000, p.second);
+ CmdReserveBytes(0, tc, p.first, 1, 20'000'000, p.second, false, true);
- CmdReserveBytes(0, tc, p.first, 2, 40000000, p.second);
+ CmdReserveBytes(0, tc, p.first, 2, 40'000'000, p.second);
- CmdReserveBytes(0, tc, p.first, 3, 80000000, p.second, true);
+ CmdReserveBytes(0, tc, p.first, 3, 80'000'000, p.second, true);
TString cookie = p.first;
@@ -797,9 +796,9 @@ Y_UNIT_TEST(TestReserveBytes) {
CmdWrite(0, "sourceid3", data, tc, false, {}, false, cookie, 6);
- CmdReserveBytes(0, tc, p.first, 7, 80000000, p.second);
+ CmdReserveBytes(0, tc, p.first, 7, 80'000'000, p.second);
p = CmdSetOwner(0, tc);
- CmdReserveBytes(0, tc, p.first, 0, 80000000, p.second);
+ CmdReserveBytes(0, tc, p.first, 0, 80'000'000, p.second);
});
}
@@ -817,7 +816,7 @@ Y_UNIT_TEST(TestMessageNo) {
activeZone = false;
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
+ PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
TVector<std::pair<ui64, TString>> data;
@@ -869,7 +868,8 @@ Y_UNIT_TEST(TestPartitionedBlobFails) {
activeZone = false;
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 200_MB, 0, {{"user1", true}}, tc); //one important client, never delete
+ // One important client, never delete
+ PQTabletPrepare({.maxSizeInPartition=200_MB}, {{"user1", true}}, tc);
TString ss{50_MB, '_'};
char k = 0;
@@ -1029,7 +1029,7 @@ Y_UNIT_TEST(TestAlreadyWritten) {
activeZone = false;
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
+ PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
activeZone = true;
TVector<std::pair<ui64, TString>> data;
@@ -1056,7 +1056,7 @@ Y_UNIT_TEST(TestAlreadyWrittenWithoutDeduplication) {
activeZone = false;
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
+ PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
TVector<std::pair<ui64, TString>> data;
activeZone = true;
@@ -1081,8 +1081,8 @@ Y_UNIT_TEST(TestWritePQCompact) {
activeZone = false;
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 8_MB - 512_KB);
- //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
+ // No important clients <-> lifetimeseconds=0 - delete all right now, but last datablob
+ PQTabletPrepare({.lowWatermark=(8_MB - 512_KB)}, {}, tc);
TVector<std::pair<ui64, TString>> data;
@@ -1138,7 +1138,7 @@ Y_UNIT_TEST(TestWritePQBigMessage) {
activeZone = false;
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 1000_MB, 0, {{"user1", true}}, tc, 2, 8_MB - 512_KB); //nothing dropped
+ PQTabletPrepare({.lowWatermark=(8_MB - 512_KB)}, {{"user1", true}}, tc); //nothing dropped
//no important clients, lifetimeseconds=0 - delete all right now, except last datablob
TVector<std::pair<ui64, TString>> data;
@@ -1190,7 +1190,7 @@ Y_UNIT_TEST(TestWritePQ) {
tc.Prepare(dispatchName, setup, activeZone);
tc.Runtime->SetScheduledLimit(100);
- PQTabletPrepare(20000000, 100_MB, 0, {{"user", true}}, tc); //important client, lifetimeseconds=0 - never delete
+ PQTabletPrepare({}, {{"user", true}}, tc); //important client, lifetimeseconds=0 - never delete
TVector<std::pair<ui64, TString>> data, data1, data2;
activeZone = PlainOrSoSlow(true, false);
@@ -1230,7 +1230,7 @@ Y_UNIT_TEST(TestWritePQ) {
CmdWrite(0,"sourceid5", data, tc);
CmdWrite(0,"sourceid6", data1, tc);
CmdWrite(0,"sourceid7", data, tc);
- data.back().first = 4296000000lu;
+ data.back().first = 4'296'000'000lu;
CmdWrite(0,"sourceid8", data, tc);
PQGetPartInfo(100, 113, tc);
@@ -1281,7 +1281,7 @@ Y_UNIT_TEST(TestSourceIdDropByUserWrites) {
tc.Prepare(dispatchName, setup, activeZone);
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //no important client, lifetimeseconds=0 - delete right now
+ PQTabletPrepare({}, {}, tc); //no important client, lifetimeseconds=0 - delete right now
TVector<std::pair<ui64, TString>> data;
activeZone = true;
@@ -1318,7 +1318,7 @@ Y_UNIT_TEST(TestSourceIdDropBySourceIdCount) {
tc.Prepare(dispatchName, setup, activeZone);
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 6_MB, true, 0, 3); //no important client, lifetimeseconds=0 - delete right now
+ PQTabletPrepare({.sidMaxCount=3}, {}, tc); //no important client, lifetimeseconds=0 - delete right now
TVector<std::pair<ui64, TString>> data;
activeZone = true;
@@ -1361,16 +1361,16 @@ Y_UNIT_TEST(TestWriteOffsetWithBigMessage) {
tc.Prepare(dispatchName, setup, activeZone);
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {{{"user", true}}}, tc, 3); //important client, lifetimeseconds=0 - never delete
+ PQTabletPrepare({.partitions=3}, {{{"user", true}}}, tc); //important client, lifetimeseconds=0 - never delete
activeZone = false;
TVector<std::pair<ui64, TString>> data;
data.push_back({1, TString{10_MB, 'a'}});
- CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 80000);
+ CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 80'000);
data.front().first = 2;
- CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 160000);
+ CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 160'000);
data.clear();
data.push_back({1, TString{100_KB, 'a'}});
@@ -1378,11 +1378,11 @@ Y_UNIT_TEST(TestWriteOffsetWithBigMessage) {
data.push_back(data.front());
data.back().first = i + 2;
}
- CmdWrite(0, "sourceIdx", data, tc, false, {}, false, "", -1, 80000);
- PQGetPartInfo(80000, 80101, tc);
+ CmdWrite(0, "sourceIdx", data, tc, false, {}, false, "", -1, 80'000);
+ PQGetPartInfo(80'000, 80'101, tc);
data.resize(70);
CmdWrite(2, "sourceId1", data, tc, false, {}, false, "", -1, 0);
- CmdWrite(2, "sourceId2", data, tc, false, {}, false, "", -1, 80000);
+ CmdWrite(2, "sourceId2", data, tc, false, {}, false, "", -1, 80'000);
});
}
@@ -1397,17 +1397,17 @@ Y_UNIT_TEST(TestWriteSplit) {
activeZone = false;
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {{"user1", true}}, tc); //never delete
+ PQTabletPrepare({}, {{"user1", true}}, tc); //never delete
const ui32 size = PlainOrSoSlow(2_MB, 1_MB);
TVector<std::pair<ui64, TString>> data;
data.push_back({1, TString{size, 'b'}});
data.push_back({2, TString{size, 'a'}});
activeZone = PlainOrSoSlow(true, false);
- CmdWrite(0, "sourceIdx", data, tc, false, {}, false, "", -1, 40000);
+ CmdWrite(0, "sourceIdx", data, tc, false, {}, false, "", -1, 40'000);
RestartTablet(tc);
activeZone = false;
- PQGetPartInfo(40000, 40002, tc);
+ PQGetPartInfo(40'000, 40'002, tc);
});
}
@@ -1421,7 +1421,7 @@ Y_UNIT_TEST(TestLowWatermark) {
tc.Prepare(dispatchName, setup, activeZone);
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 2_MB); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
+ PQTabletPrepare({.lowWatermark=2_MB}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
TVector<std::pair<ui64, TString>> data;
@@ -1433,12 +1433,12 @@ Y_UNIT_TEST(TestLowWatermark) {
data.push_back({3, ss.substr(pp)});
CmdWrite(0,"sourceid0", data, tc, false, {}, true);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 6_MB); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
+ PQTabletPrepare({}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
CmdWrite(0,"sourceid1", data, tc, false, {}, false); //first are compacted
PQGetPartInfo(0, 6, tc);
CmdWrite(0,"sourceid2", data, tc, false, {}, false); //3 and 6 are compacted
PQGetPartInfo(3, 9, tc);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 3_MB); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
+ PQTabletPrepare({.lowWatermark=3_MB}, {}, tc); //no important clients, lifetimeseconds=0 - delete all right now, except last datablob
CmdWrite(0,"sourceid3", data, tc, false, {}, false); //3, 6 and 3 are compacted
data.resize(1);
CmdWrite(0,"sourceid4", data, tc, false, {}, false); //3, 6 and 3 are compacted
@@ -1459,7 +1459,7 @@ Y_UNIT_TEST(TestWriteToFullPartition) {
tc.Runtime->SetScheduledLimit(100);
- PQTabletPrepare(11, 100_MB, 0, {}, tc);
+ PQTabletPrepare({.maxCountInPartition=11}, {}, tc);
TVector<std::pair<ui64, TString>> data;
activeZone = PlainOrSoSlow(true, false);
@@ -1470,13 +1470,13 @@ Y_UNIT_TEST(TestWriteToFullPartition) {
data.push_back({i + 1, s.substr(pp)});
}
CmdWrite(0, "sourceid0", data, tc, false, {}, true); //now 1 blob
- PQTabletPrepare(10, 100_MB, 0, {}, tc);
+ PQTabletPrepare({.maxCountInPartition=10}, {}, tc);
PQGetPartInfo(0, 10, tc);
data.resize(1);
CmdWrite(0, "sourceid1", data, tc, true);
- PQTabletPrepare(12, 100_MB, 0, {}, tc);
+ PQTabletPrepare({.maxCountInPartition=12}, {}, tc);
CmdWrite(0, "sourceid1", data, tc);
- PQTabletPrepare(12, 100, 0, {}, tc);
+ PQTabletPrepare({.maxCountInPartition=12, .maxSizeInPartition=100}, {}, tc);
CmdWrite(0, "sourceid1", data, tc, true);
});
}
@@ -1502,13 +1502,14 @@ Y_UNIT_TEST(TestTimeRetention) {
for (ui32 i = 0; i < 10; ++i) {
data.push_back({i + 1, s.substr(pp)});
}
- PQTabletPrepare(1000, 100_MB, TDuration::Seconds(1000).Seconds(), {}, tc, 2, 100);
+ PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=TDuration::Seconds(1000).Seconds(),
+ .lowWatermark=100}, {}, tc);
CmdWrite(0, "sourceid0", data, tc, false, {}, true);
CmdWrite(0, "sourceid1", data, tc, false);
CmdWrite(0, "sourceid2", data, tc, false);
PQGetPartInfo(0, 30, tc);
- PQTabletPrepare(1000, 100_MB, TDuration::Seconds(0).Seconds(), {}, tc, 2, 100);
+ PQTabletPrepare({.maxCountInPartition=1000, .deleteTime=0, .lowWatermark=100}, {}, tc);
CmdWrite(0, "sourceid3", data, tc, false);
CmdWrite(0, "sourceid4", data, tc, false);
CmdWrite(0, "sourceid5", data, tc, false);
@@ -1537,13 +1538,13 @@ Y_UNIT_TEST(TestStorageRetention) {
for (ui32 i = 0; i < 10; ++i) {
data.push_back({i + 1, s.substr(pp)});
}
- PQTabletPrepare(1000, 100_MB, 0, {}, tc, 2, 100, true, 0, 0, 0, 1_MB);
+ PQTabletPrepare({.maxCountInPartition=1000, .lowWatermark=100, .storageLimitBytes=1_MB}, {}, tc);
CmdWrite(0, "sourceid0", data, tc, false, {}, true); //now 1 blob
CmdWrite(0, "sourceid1", data, tc, false);
CmdWrite(0, "sourceid2", data, tc, false);
PQGetPartInfo(0, 30, tc);
- PQTabletPrepare(1000, 100_MB, 0, {}, tc, 2, 50, true, 0, 0, 0, 160);
+ PQTabletPrepare({.maxCountInPartition=1000, .lowWatermark=50, .storageLimitBytes=160}, {}, tc);
CmdWrite(0, "sourceid3", data, tc, false);
CmdWrite(0, "sourceid4", data, tc, false);
PQGetPartInfo(40, 50, tc);
@@ -1562,7 +1563,7 @@ Y_UNIT_TEST(TestPQPartialRead) {
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); //important client - never delete
+ PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete
activeZone = false;
TVector<std::pair<ui64, TString>> data;
@@ -1595,7 +1596,7 @@ Y_UNIT_TEST(TestPQRead) {
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); //important client - never delete
+ PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete
activeZone = false;
TVector<std::pair<ui64, TString>> data;
@@ -1647,7 +1648,7 @@ Y_UNIT_TEST(TestPQSmallRead) {
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); //important client - never delete
+ PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete
activeZone = false;
TVector<std::pair<ui64, TString>> data;
@@ -1689,7 +1690,7 @@ Y_UNIT_TEST(TestPQReadAhead) {
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); //important client - never delete
+ PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete
TVector<std::pair<ui64, TString>> data;
@@ -1735,7 +1736,7 @@ Y_UNIT_TEST(TestOwnership) {
tc.Runtime->SetScheduledLimit(50);
- PQTabletPrepare(10, 100_MB, 0, {}, tc);
+ PQTabletPrepare({.maxCountInPartition=10}, {}, tc);
TString cookie, cookie2;
cookie = CmdSetOwner(0, tc).first;
@@ -1755,7 +1756,7 @@ Y_UNIT_TEST(TestSetClientOffset) {
tc.Prepare(dispatchName, setup, activeZone);
tc.Runtime->SetScheduledLimit(50);
- PQTabletPrepare(10, 100_MB, 0, {{"user1", false}}, tc);
+ PQTabletPrepare({.maxCountInPartition=10}, {{"user1", false}}, tc);
activeZone = true;
@@ -1785,7 +1786,7 @@ Y_UNIT_TEST(TestReadSessions) {
tc.Prepare(dispatchName, setup, activeZone);
tc.Runtime->SetScheduledLimit(50);
- PQTabletPrepare(10, 100_MB, 0, {{"user1", false}}, tc);
+ PQTabletPrepare({.maxCountInPartition=10}, {{"user1", false}}, tc);
activeZone = true;
@@ -1827,7 +1828,7 @@ Y_UNIT_TEST(TestGetTimestamps) {
tc.Runtime->UpdateCurrentTime(TInstant::Zero() + TDuration::Days(2));
activeZone = false;
- PQTabletPrepare(10, 100_MB, 0, {{"user1", false}}, tc);
+ PQTabletPrepare({.maxCountInPartition=10}, {{"user1", false}}, tc);
TVector<std::pair<ui64, TString>> data;
data.push_back({1, TString(1_KB, 'a')});
@@ -1893,9 +1894,12 @@ Y_UNIT_TEST(TestChangeConfig) {
data.push_back({i + 1, ss});
}
- PQTabletPrepare(100, 100_MB, 86400 * 2, {{"aaa", true}}, tc, 5);
+ PQTabletPrepare({.maxCountInPartition=100, .deleteTime=TDuration::Days(2).Seconds(), .partitions=5},
+ {{"aaa", true}}, tc);
CmdWrite(0, "sourceid0", data, tc, false, {}, true); //now 1 blob
- PQTabletPrepare(5, 1_MB, 86400, {{"bbb", true}, {"ccc", true}}, tc, 10);
+
+ PQTabletPrepare({.maxCountInPartition=5, .maxSizeInPartition=1_MB,
+ .deleteTime=TDuration::Days(1).Seconds(), .partitions=10}, {{"bbb", true}, {"ccc", true}}, tc);
data.pop_back(); //to be sure that after write partition will no be full
CmdWrite(0, "sourceid1", data, tc, true); //partition is full
CmdWrite(1, "sourceid2", data, tc);
@@ -1928,7 +1932,8 @@ Y_UNIT_TEST(TestReadSubscription) {
data.push_back({i + 1, ss});
}
- PQTabletPrepare(100, 100_MB, 86400 * 2, {{"user1", true}}, tc, 5);
+ PQTabletPrepare({.maxCountInPartition=100, .deleteTime=TDuration::Days(2).Seconds(), .partitions=5},
+ {{"user1", true}}, tc);
CmdWrite(0, "sourceid0", data, tc, false, {}, true);
TAutoPtr<IEventHandle> handle;
@@ -1942,7 +1947,7 @@ Y_UNIT_TEST(TestReadSubscription) {
read->SetOffset(5);
read->SetClientId("user1");
read->SetCount(5);
- read->SetBytes(1000000);
+ read->SetBytes(1'000'000);
read->SetTimeoutMs(5000);
tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
@@ -1961,7 +1966,7 @@ Y_UNIT_TEST(TestReadSubscription) {
read->SetOffset(5);
read->SetClientId("user1");
read->SetCount(3);
- read->SetBytes(1000000);
+ read->SetBytes(1'000'000);
read->SetTimeoutMs(5000);
tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); //got read
@@ -1982,7 +1987,7 @@ Y_UNIT_TEST(TestReadSubscription) {
read->SetOffset(10);
read->SetClientId("user1");
read->SetCount(55);
- read->SetBytes(1000000);
+ read->SetBytes(1'000'000);
read->SetTimeoutMs(5000);
tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); //got read
@@ -2012,7 +2017,7 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) {
tc.Runtime->SetScheduledLimit(200);
activeZone = false;
- PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc); //important client - never delete
+ PQTabletPrepare({}, {{"aaa", true}}, tc); //important client - never delete
TVector<std::pair<ui64, TString>> data;
@@ -2070,7 +2075,7 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) {
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc);
+ PQTabletPrepare({}, {{"aaa", true}}, tc);
activeZone = false;
@@ -2082,17 +2087,21 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) {
CmdWrite(0, "sourceid0", data, tc, false, {}, i == 0);
tc.Runtime->UpdateCurrentTime(tc.Runtime->GetCurrentTime() + TDuration::Minutes(1));
}
- auto ts = tc.Runtime->GetCurrentTime();
+ const auto ts = tc.Runtime->GetCurrentTime();
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0});
- CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 3 * 60 * 1000);
- CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 3 * 60 * 1000);
+ CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
+ CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 1000);
- CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0, ts.MilliSeconds() - 3 * 60 * 1000);
- CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 0, ts.MilliSeconds() - 3 * 60 * 1000);
- CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 0, ts.MilliSeconds() - 1000);
+ CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0,
+ (ts - TDuration::Minutes(3)).MilliSeconds());
+ CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 0,
+ (ts - TDuration::Minutes(3)).MilliSeconds());
+ CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 0,
+ (ts - TDuration::Seconds(1)).MilliSeconds());
- PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc, 2, 6_MB, true, ts.MilliSeconds() - 1000);
+ PQTabletPrepare({.readFromTimestampsMs=(ts - TDuration::Seconds(1)).MilliSeconds()},
+ {{"aaa", true}}, tc);
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {34});
});
@@ -2108,31 +2117,31 @@ Y_UNIT_TEST(TestWriteTimeStampEstimate) {
tc.Runtime->SetDispatchTimeout(TDuration::Seconds(1));
tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG);
- PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc);
+ PQTabletPrepare({}, {{"aaa", true}}, tc);
- tc.Runtime->UpdateCurrentTime(TInstant::MilliSeconds(1000000));
+ tc.Runtime->UpdateCurrentTime(TInstant::MilliSeconds(1'000'000));
TVector<std::pair<ui64, TString>> data{{1,"abacaba"}};
CmdWrite(0, "sourceid0", data, tc);
- CmdGetOffset(0, "user1", 0, tc, -1, 1000000);
+ CmdGetOffset(0, "user1", 0, tc, -1, 1'000'000);
- PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc, 2, 6_MB, false);
+ PQTabletPrepare({.localDC=false}, {{"aaa", true}}, tc);
RestartTablet(tc);
CmdGetOffset(0, "user1", 0, tc, -1, 0);
- tc.Runtime->UpdateCurrentTime(TInstant::MilliSeconds(2000000));
+ tc.Runtime->UpdateCurrentTime(TInstant::MilliSeconds(2'000'000));
data.front().first = 2;
CmdWrite(0, "sourceid0", data, tc);
- CmdGetOffset(0, "user1", 0, tc, -1, 2000000);
+ CmdGetOffset(0, "user1", 0, tc, -1, 2'000'000);
- CmdUpdateWriteTimestamp(0, 3000000, tc);
+ CmdUpdateWriteTimestamp(0, 3'000'000, tc);
- CmdGetOffset(0, "user1", 0, tc, -1, 3000000);
+ CmdGetOffset(0, "user1", 0, tc, -1, 3'000'000);
}
@@ -2147,9 +2156,9 @@ Y_UNIT_TEST(TestWriteTimeLag) {
tc.Runtime->SetDispatchTimeout(TDuration::Seconds(1));
tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG);
- PQTabletPrepare(20000000, 1_TB, 0, {{"aaa", false}}, tc);
+ PQTabletPrepare({.maxSizeInPartition=1_TB}, {{"aaa", false}}, tc);
- TVector<std::pair<ui64, TString>> data{{1,TString(1024*1024, 'a')}};
+ TVector<std::pair<ui64, TString>> data{{1,TString(1_MB, 'a')}};
for (ui32 i = 0; i < 20; ++i) {
CmdWrite(0, TStringBuilder() << "sourceid" << i, data, tc);
}
@@ -2157,9 +2166,10 @@ Y_UNIT_TEST(TestWriteTimeLag) {
// After restart all caches are empty.
RestartTablet(tc);
- PQTabletPrepare(20000000, 1_TB, 0, {{"aaa", false}, {"important", true}, {"another", true}}, tc);
- PQTabletPrepare(20000000, 1_TB, 0, {{"aaa", false}, {"another1", true}, {"important", true}}, tc);
- PQTabletPrepare(20000000, 1_TB, 0, {{"aaa", false}, {"another1", true}, {"important", true}, {"another", false}}, tc);
+ PQTabletPrepare({.maxSizeInPartition=1_TB}, {{"aaa", false}, {"important", true}, {"another", true}}, tc);
+ PQTabletPrepare({.maxSizeInPartition=1_TB}, {{"aaa", false}, {"another1", true}, {"important", true}}, tc);
+ PQTabletPrepare({.maxSizeInPartition=1_TB},
+ {{"aaa", false}, {"another1", true}, {"important", true}, {"another", false}}, tc);
CmdGetOffset(0, "important", 12, tc, -1, 0);
@@ -2204,7 +2214,7 @@ Y_UNIT_TEST(TestTabletRestoreEventsOrder) {
// Scenario 2: expect EvTabletActive only after partitions init complete
CheckEventSequence(tc, /*scenario=*/[&tc]() {
- PQTabletPrepare(20000000, 100_MB, 0, {{"aaa", true}}, tc, /*partitions=*/2);
+ PQTabletPrepare({}, {{"aaa", true}}, tc);
ForwardToTablet(*tc.Runtime, tc.TabletId, tc.Edge, new TEvents::TEvPoisonPill());
}, /*expectedEvents=*/{
TEvTablet::TEvRestored::EventType,
diff --git a/ydb/core/persqueue/pq_ut.h b/ydb/core/persqueue/pq_ut.h
index 3226115a67..de8425f855 100644
--- a/ydb/core/persqueue/pq_ut.h
+++ b/ydb/core/persqueue/pq_ut.h
@@ -221,24 +221,24 @@ struct TFinalizer {
// SINGLE COMMAND TEST FUNCTIONS
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-void PQTabletPrepare(
- ui32 mcip,
- ui64 msip,
- ui32 deleteTime,
- const TVector<std::pair<TString, bool>>& users,
- TTestContext& tc,
- int partitions = 2,
- ui32 lw = 6_MB,
- bool localDC = true,
- ui64 ts = 0,
- ui64 sidMaxCount = 0,
- ui32 specVersion = 0,
- i32 storageLimitBytes = 0
- ) {
+struct TTabletPreparationParameters {
+ ui32 maxCountInPartition{20'000'000};
+ ui64 maxSizeInPartition{100_MB};
+ ui32 deleteTime{0}; // Delete instantly
+ ui32 partitions{2};
+ ui32 lowWatermark{6_MB};
+ bool localDC{true};
+ ui64 readFromTimestampsMs{0};
+ ui64 sidMaxCount{0};
+ ui32 specVersion{0};
+ i32 storageLimitBytes{0};
+};
+void PQTabletPrepare(const TTabletPreparationParameters& parameters,
+ const TVector<std::pair<TString, bool>>& users, TTestContext& tc) {
TAutoPtr<IEventHandle> handle;
static int version = 0;
- if (specVersion) {
- version = specVersion;
+ if (parameters.specVersion) {
+ version = parameters.specVersion;
} else {
++version;
}
@@ -247,7 +247,7 @@ void PQTabletPrepare(
tc.Runtime->ResetScheduledCount();
THolder<TEvPersQueue::TEvUpdateConfig> request(new TEvPersQueue::TEvUpdateConfig());
- for (i32 i = 0; i < partitions; ++i) {
+ for (ui32 i = 0; i < parameters.partitions; ++i) {
request->Record.MutableTabletConfig()->AddPartitionIds(i);
}
request->Record.MutableTabletConfig()->SetCacheSize(10_MB);
@@ -262,22 +262,22 @@ void PQTabletPrepare(
}
tabletConfig->SetTopic("topic");
tabletConfig->SetVersion(version);
- tabletConfig->SetLocalDC(localDC);
+ tabletConfig->SetLocalDC(parameters.localDC);
tabletConfig->AddReadRules("user");
- tabletConfig->AddReadFromTimestampsMs(ts);
+ tabletConfig->AddReadFromTimestampsMs(parameters.readFromTimestampsMs);
auto config = tabletConfig->MutablePartitionConfig();
- config->SetMaxCountInPartition(mcip);
- config->SetMaxSizeInPartition(msip);
- if (storageLimitBytes > 0) {
- config->SetStorageLimitBytes(storageLimitBytes);
+ config->SetMaxCountInPartition(parameters.maxCountInPartition);
+ config->SetMaxSizeInPartition(parameters.maxSizeInPartition);
+ if (parameters.storageLimitBytes > 0) {
+ config->SetStorageLimitBytes(parameters.storageLimitBytes);
} else {
- config->SetLifetimeSeconds(deleteTime);
+ config->SetLifetimeSeconds(parameters.deleteTime);
}
- config->SetSourceIdLifetimeSeconds(1*60*60);
- if (sidMaxCount > 0)
- config->SetSourceIdMaxCounts(sidMaxCount);
- config->SetMaxWriteInflightSize(90000000);
- config->SetLowWatermark(lw);
+ config->SetSourceIdLifetimeSeconds(TDuration::Hours(1).Seconds());
+ if (parameters.sidMaxCount > 0)
+ config->SetSourceIdMaxCounts(parameters.sidMaxCount);
+ config->SetMaxWriteInflightSize(90'000'000);
+ config->SetLowWatermark(parameters.lowWatermark);
for (auto& u : users) {
if (u.second)
diff --git a/ydb/core/persqueue/pq_ut_slow.cpp b/ydb/core/persqueue/pq_ut_slow.cpp
index 37e90f3533..d3d3678b19 100644
--- a/ydb/core/persqueue/pq_ut_slow.cpp
+++ b/ydb/core/persqueue/pq_ut_slow.cpp
@@ -1,32 +1,28 @@
#include "pq_ut.h"
-#include <ydb/core/testlib/basics/runtime.h>
-#include <ydb/core/tablet_flat/tablet_flat_executed.h>
-#include <ydb/core/tx/schemeshard/schemeshard.h>
-#include <ydb/public/lib/base/msgbus.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/system/sanitizers.h>
+#include <util/system/valgrind.h>
+
+#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
#include <ydb/core/keyvalue/keyvalue_events.h>
-#include <ydb/core/tablet/tablet_counters_aggregator.h>
-#include <ydb/core/persqueue/partition.h>
#include <ydb/core/persqueue/events/global.h>
-#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
+#include <ydb/core/persqueue/partition.h>
#include <ydb/core/security/ticket_parser.h>
-
+#include <ydb/core/tablet/tablet_counters_aggregator.h>
+#include <ydb/core/tablet_flat/tablet_flat_executed.h>
+#include <ydb/core/testlib/basics/runtime.h>
#include <ydb/core/testlib/fake_scheme_shard.h>
#include <ydb/core/testlib/tablet_helpers.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
-#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/public/lib/base/msgbus.h>
-#include <util/system/sanitizers.h>
-#include <util/system/valgrind.h>
namespace NKikimr {
-Y_UNIT_TEST_SUITE(TPQTestSlow) {
-
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// SINGLE COMMAND TEST FUNCTIONS
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+Y_UNIT_TEST_SUITE(TPQTestSlow) {
Y_UNIT_TEST(TestWriteVeryBigMessage) {
TTestContext tc;
@@ -38,16 +34,16 @@ Y_UNIT_TEST(TestWriteVeryBigMessage) {
tc.Runtime->SetScheduledLimit(200);
activeZone = false;
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc); //always delete
+ PQTabletPrepare({}, {}, tc); //always delete
TVector<std::pair<ui64, TString>> data;
data.push_back({1, TString{10, 'b'}});
- CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 40000);
+ CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 40'000);
data.clear();
const ui32 size = PlainOrSoSlow(40_MB, 1_MB);
const ui32 so = PlainOrSoSlow(1, 0);
data.push_back({2, TString{size, 'a'}});
- CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 80000);
+ CmdWrite(1, "sourceIdx", data, tc, false, {}, false, "", -1, 80'000);
CmdWrite(0, "sourceIdx", data, tc, false, {}, false, "", -1, 0);
activeZone = true;
PQGetPartInfo(so, 1, tc);
@@ -56,7 +52,6 @@ Y_UNIT_TEST(TestWriteVeryBigMessage) {
});
}
-
Y_UNIT_TEST(TestOnDiskStoredSourceIds) {
TTestContext tc;
RunTestWithReboots(tc.TabletIds, [&]() {
@@ -66,7 +61,8 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) {
tc.Prepare(dispatchName, setup, activeZone);
tc.Runtime->SetScheduledLimit(200);
- PQTabletPrepare(20000000, 100_MB, 0, {}, tc, 2, 6_MB, true, 0, 3); //no important client, lifetimeseconds=0 - delete right now
+ // No important client, lifetimeseconds=0 - delete right now
+ PQTabletPrepare({.sidMaxCount=3}, {}, tc);
TVector<TString> writtenSourceIds;
@@ -89,32 +85,34 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) {
req->SetOwnerCookie(cookie);
req->SetMessageNo(0);
req->SetCmdWriteOffset(offset);
- for (ui32 i=0; i < NUM_SOURCEIDS; ++i) {
- auto write = req->AddCmdWrite();
- write->SetSourceId(TStringBuilder() << "sourceid_" << i);
- write->SetSeqNo(0);
- write->SetData(ss);
+ for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) {
+ auto write = req->AddCmdWrite();
+ write->SetSourceId(TStringBuilder() << "sourceid_" << i);
+ write->SetSeqNo(0);
+ write->SetData(ss);
}
tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
TAutoPtr<IEventHandle> handle;
TEvPersQueue::TEvResponse *result;
result = tc.Runtime->GrabEdgeEventIf<TEvPersQueue::TEvResponse>(handle, [](const TEvPersQueue::TEvResponse& ev){
- if (!ev.Record.HasPartitionResponse() || !ev.Record.GetPartitionResponse().HasCmdReadResult())
- return true;
- return false;
- }); //there could be outgoing reads in TestReadSubscription test
+ if (!ev.Record.HasPartitionResponse() ||
+ !ev.Record.GetPartitionResponse().HasCmdReadResult()) {
+ return true;
+ }
+ return false;
+ }); //there could be outgoing reads in TestReadSubscription test
UNIT_ASSERT(result);
UNIT_ASSERT(result->Record.HasStatus());
if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
- tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
- retriesLeft = 2;
- continue;
+ tc.Runtime->DispatchEvents(); // Dispatch events so that initialization can make progress
+ retriesLeft = 2;
+ continue;
}
if (result->Record.GetErrorCode() == NPersQueue::NErrorCode::WRITE_ERROR_BAD_OFFSET) {
- ++offset;
- continue;
+ ++offset;
+ continue;
}
Cout << "Error code is " << static_cast<int>(result->Record.GetErrorCode()) << Endl;
@@ -122,13 +120,9 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) {
UNIT_ASSERT(result->Record.GetPartitionResponse().CmdWriteResultSize() == NUM_SOURCEIDS);
for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) {
- UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten());
- UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset());
+ UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasAlreadyWritten());
+ UNIT_ASSERT(result->Record.GetPartitionResponse().GetCmdWriteResult(i).HasOffset());
}
- // for (ui32 i = 0; i < NUM_SOURCEIDS; ++i) {
- // auto res = result->Record.GetPartitionResponse().GetCmdWriteResult(i);
- // UNIT_ASSERT(!result->Record.GetPartitionResponse().GetCmdWriteResult(i).GetAlreadyWritten());
- // }
retriesLeft = 0;
} catch (NActors::TSchedulingLimitReachedException) {
@@ -136,11 +130,11 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) {
retriesLeft = 3;
}
}
- for (ui64 i=100; i < 104; ++i) {
+ for (ui64 i = 100; i < 104; ++i) {
CmdWrite(0, TStringBuilder() << "sourceid_" << i, data, tc, false, {}, false, "", -1, offset + i);
Cout << TInstant::Now() << " written sourceid_" << i << Endl;
}
- for (ui64 i=100; i < 104; ++i) {
+ for (ui64 i = 100; i < 104; ++i) {
writtenSourceIds.push_back(TStringBuilder() << "sourceid_" << i);
}
Cout << TInstant::Now() << " now check list of sourceIds" << Endl;
@@ -155,8 +149,5 @@ Y_UNIT_TEST(TestOnDiskStoredSourceIds) {
});
}
-
-
-
} // TKeyValueTest
-} // NKikimr
+} // namespace NKikimr