aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2022-07-22 15:55:07 +0300
committeralexnick <alexnick@ydb.tech>2022-07-22 15:55:07 +0300
commit62e74eb168507c173a2f3663ff7cbb7a87618463 (patch)
treecedd2374ce8caceeff4c748c3ef297b0abf2126b
parent52117eee30cec2122c9f5073c1299c6ea363ae6f (diff)
downloadydb-62e74eb168507c173a2f3663ff7cbb7a87618463.tar.gz
fix for quoting
-rw-r--r--ydb/core/persqueue/partition.cpp25
-rw-r--r--ydb/core/testlib/test_pq_client.h1
-rw-r--r--ydb/services/persqueue_v1/persqueue_common_ut.cpp44
3 files changed, 41 insertions, 29 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 00e46d4046..197c154968 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -4920,33 +4920,24 @@ void TPartition::CalcTopicWriteQuotaParams()
Y_VERIFY(quotingConfig.GetTopicWriteQuotaEntityToLimit() != NKikimrPQ::TPQConfig::TQuotingConfig::UNSPECIFIED);
// ToDo[migration] - double check
- TString topicPath = Config.GetTopicPath();
- if (topicPath.empty()) {
- topicPath = Config.GetTopicName();
- }
- Y_VERIFY(!topicPath.empty());
- TFsPath fsPath(topicPath);
- if (fsPath.IsSubpathOf(pqConfig.GetRoot())) {
- topicPath = fsPath.RelativePath(TFsPath(pqConfig.GetRoot())).GetPath();
- }
+ auto topicPath = TopicConverter->GetFederationPath();
+
// ToDo[migration] - separate quoter paths?
- TVector<TString> topicParts = {WRITE_QUOTA_ROOT_PATH};
- auto split = SplitPath(topicPath); // account/folder/topic // account is first element
- if (split.size() < 2) {
+ auto topicParts = SplitPath(topicPath); // account/folder/topic // account is first element
+ if (topicParts.size() < 2) {
LOG_WARN_S(TActivationContext::AsActorContext(), NKikimrServices::PERSQUEUE,
"tablet " << TabletID << " topic '" << topicPath << "' Bad topic name. Disable quoting for topic");
return;
}
- topicParts.insert(topicParts.end(), split.begin(), split.end());
- //const TString account = topicParts[0];
topicParts[0] = WRITE_QUOTA_ROOT_PATH; // write-quota/folder/topic
TopicWriteQuotaResourcePath = JoinPath(topicParts);
TopicWriteQuoterPath = TStringBuilder() << quotingConfig.GetQuotersDirectoryPath() << "/" << TopicConverter->GetAccount();
+
LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PERSQUEUE,
- "topicWriteQuutaResourcePath " << TopicWriteQuotaResourcePath
- << " topicWriteQuoterPath '" << TopicWriteQuoterPath
- << " account " << TopicConverter->GetAccount()
+ "topicWriteQuutaResourcePath '" << TopicWriteQuotaResourcePath
+ << "' topicWriteQuoterPath '" << TopicWriteQuoterPath
+ << "' account " << TopicConverter->GetAccount()
);
}
}
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index 51c11d780e..432fd8cca8 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -42,6 +42,7 @@ inline Tests::TServerSettings PQSettings(ui16 port, ui32 nodesCount = 2, bool ro
pqConfig.SetTopicsAreFirstClassCitizen(false);
pqConfig.SetRoot("/Root/PQ");
pqConfig.MutableQuotingConfig()->SetEnableQuoting(false);
+ pqConfig.MutableQuotingConfig()->SetQuotersDirectoryPath("/Root/PersQueue/System/Quoters");
for (int i = 0; i < 12; ++i) {
auto profile = pqConfig.AddChannelProfiles();
diff --git a/ydb/services/persqueue_v1/persqueue_common_ut.cpp b/ydb/services/persqueue_v1/persqueue_common_ut.cpp
index a28ed47235..c3b3a2a616 100644
--- a/ydb/services/persqueue_v1/persqueue_common_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_common_ut.cpp
@@ -277,38 +277,42 @@ Y_UNIT_TEST_SUITE(TPersQueueCommonTest) {
serverMessage.server_message_case(), serverMessage);
}
- void TestWriteWithRateLimiter(TPersQueueV1TestServerWithRateLimiter& server) {
+ void TestWriteWithRateLimiter(TPersQueueV1TestServerWithRateLimiter& server, const TDuration& minTime) {
const std::vector<TString> differentTopicPathsTypes = {
"account1/topic", // without folder
"account2/folder/topic", // with folder
"account3/folder1/folder2/topic", // complex
};
- const TString data = TString("12345") * 100;
+ const TString data = TString("1234567890") * 120000; // 1200000 bytes
for (const TString &topicPath : differentTopicPathsTypes) {
- server.CreateTopicWithQuota(topicPath);
-
+ server.CreateTopicWithQuota(topicPath, true, 10000000);
auto driver = server.Server->AnnoyingClient->GetDriver();
+ auto start = TInstant::Now();
- {
- auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123");
+ for (ui32 i = 0; i < 7; ++i) {
+ auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, TStringBuilder() << "123" << i, {}, "raw");
writer->Write(data);
bool res = writer->Close(TDuration::Seconds(10));
UNIT_ASSERT(res);
}
+
+ Cerr << "DURATION " << (TInstant::Now() - start) << "\n";
+ UNIT_ASSERT(TInstant::Now() - start > minTime);
}
}
Y_UNIT_TEST(TestWriteWithRateLimiterWithBlobsRateLimit) {
TPersQueueV1TestServerWithRateLimiter server;
server.InitAll(NKikimrPQ::TPQConfig::TQuotingConfig::WRITTEN_BLOB_SIZE);
- server.EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_INFO);
- TestWriteWithRateLimiter(server);
+ server.EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_DEBUG);
+ TestWriteWithRateLimiter(server, TDuration::MilliSeconds(5200));
}
Y_UNIT_TEST(TestWriteWithRateLimiterWithUserPayloadRateLimit) {
TPersQueueV1TestServerWithRateLimiter server;
server.InitAll(NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE);
- TestWriteWithRateLimiter(server);
+ server.EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_DEBUG);
+ TestWriteWithRateLimiter(server, TDuration::MilliSeconds(2500));
}
void TestRateLimiterLimitsWrite(TPersQueueV1TestServerWithRateLimiter& server) {
@@ -321,7 +325,7 @@ Y_UNIT_TEST_SUITE(TPersQueueCommonTest) {
// Warm up write
{
- auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123");
+ auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123", {}, "raw");
writer->Write(data);
bool res = writer->Close(TDuration::Seconds(10));
@@ -334,7 +338,7 @@ Y_UNIT_TEST_SUITE(TPersQueueCommonTest) {
{
- auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123");
+ auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123", {}, "raw");
writer->Write(data);
bool res = writer->Close(TDuration::Seconds(10));
@@ -342,7 +346,7 @@ Y_UNIT_TEST_SUITE(TPersQueueCommonTest) {
}
{
- auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123");
+ auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123", {}, "raw");
writer->Write(data);
bool res = writer->Close(TDuration::Seconds(10));
@@ -361,6 +365,22 @@ Y_UNIT_TEST_SUITE(TPersQueueCommonTest) {
}
}
+
+ Y_UNIT_TEST(TestLimiterLimitsWithBlobsRateLimit) {
+ TPersQueueV1TestServerWithRateLimiter server;
+ server.InitAll(NKikimrPQ::TPQConfig::TQuotingConfig::WRITTEN_BLOB_SIZE);
+ server.EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_DEBUG);
+ TestRateLimiterLimitsWrite(server);
+ }
+
+ Y_UNIT_TEST(TestLimiterLimitsWithUserPayloadRateLimit) {
+ TPersQueueV1TestServerWithRateLimiter server;
+ server.InitAll(NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE);
+ server.EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_DEBUG);
+
+ TestRateLimiterLimitsWrite(server);
+ }
+
}
}