diff options
author | alexnick <alexnick@ydb.tech> | 2022-07-22 15:55:07 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2022-07-22 15:55:07 +0300 |
commit | 62e74eb168507c173a2f3663ff7cbb7a87618463 (patch) | |
tree | cedd2374ce8caceeff4c748c3ef297b0abf2126b | |
parent | 52117eee30cec2122c9f5073c1299c6ea363ae6f (diff) | |
download | ydb-62e74eb168507c173a2f3663ff7cbb7a87618463.tar.gz |
fix for quoting
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 25 | ||||
-rw-r--r-- | ydb/core/testlib/test_pq_client.h | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_common_ut.cpp | 44 |
3 files changed, 41 insertions, 29 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 00e46d4046..197c154968 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -4920,33 +4920,24 @@ void TPartition::CalcTopicWriteQuotaParams() Y_VERIFY(quotingConfig.GetTopicWriteQuotaEntityToLimit() != NKikimrPQ::TPQConfig::TQuotingConfig::UNSPECIFIED); // ToDo[migration] - double check - TString topicPath = Config.GetTopicPath(); - if (topicPath.empty()) { - topicPath = Config.GetTopicName(); - } - Y_VERIFY(!topicPath.empty()); - TFsPath fsPath(topicPath); - if (fsPath.IsSubpathOf(pqConfig.GetRoot())) { - topicPath = fsPath.RelativePath(TFsPath(pqConfig.GetRoot())).GetPath(); - } + auto topicPath = TopicConverter->GetFederationPath(); + // ToDo[migration] - separate quoter paths? - TVector<TString> topicParts = {WRITE_QUOTA_ROOT_PATH}; - auto split = SplitPath(topicPath); // account/folder/topic // account is first element - if (split.size() < 2) { + auto topicParts = SplitPath(topicPath); // account/folder/topic // account is first element + if (topicParts.size() < 2) { LOG_WARN_S(TActivationContext::AsActorContext(), NKikimrServices::PERSQUEUE, "tablet " << TabletID << " topic '" << topicPath << "' Bad topic name. Disable quoting for topic"); return; } - topicParts.insert(topicParts.end(), split.begin(), split.end()); - //const TString account = topicParts[0]; topicParts[0] = WRITE_QUOTA_ROOT_PATH; // write-quota/folder/topic TopicWriteQuotaResourcePath = JoinPath(topicParts); TopicWriteQuoterPath = TStringBuilder() << quotingConfig.GetQuotersDirectoryPath() << "/" << TopicConverter->GetAccount(); + LOG_DEBUG_S(TActivationContext::AsActorContext(), NKikimrServices::PERSQUEUE, - "topicWriteQuutaResourcePath " << TopicWriteQuotaResourcePath - << " topicWriteQuoterPath '" << TopicWriteQuoterPath - << " account " << TopicConverter->GetAccount() + "topicWriteQuutaResourcePath '" << TopicWriteQuotaResourcePath + << "' topicWriteQuoterPath '" << TopicWriteQuoterPath + << "' account " << TopicConverter->GetAccount() ); } } diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index 51c11d780e..432fd8cca8 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -42,6 +42,7 @@ inline Tests::TServerSettings PQSettings(ui16 port, ui32 nodesCount = 2, bool ro pqConfig.SetTopicsAreFirstClassCitizen(false); pqConfig.SetRoot("/Root/PQ"); pqConfig.MutableQuotingConfig()->SetEnableQuoting(false); + pqConfig.MutableQuotingConfig()->SetQuotersDirectoryPath("/Root/PersQueue/System/Quoters"); for (int i = 0; i < 12; ++i) { auto profile = pqConfig.AddChannelProfiles(); diff --git a/ydb/services/persqueue_v1/persqueue_common_ut.cpp b/ydb/services/persqueue_v1/persqueue_common_ut.cpp index a28ed47235..c3b3a2a616 100644 --- a/ydb/services/persqueue_v1/persqueue_common_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_common_ut.cpp @@ -277,38 +277,42 @@ Y_UNIT_TEST_SUITE(TPersQueueCommonTest) { serverMessage.server_message_case(), serverMessage); } - void TestWriteWithRateLimiter(TPersQueueV1TestServerWithRateLimiter& server) { + void TestWriteWithRateLimiter(TPersQueueV1TestServerWithRateLimiter& server, const TDuration& minTime) { const std::vector<TString> differentTopicPathsTypes = { "account1/topic", // without folder "account2/folder/topic", // with folder "account3/folder1/folder2/topic", // complex }; - const TString data = TString("12345") * 100; + const TString data = TString("1234567890") * 120000; // 1200000 bytes for (const TString &topicPath : differentTopicPathsTypes) { - server.CreateTopicWithQuota(topicPath); - + server.CreateTopicWithQuota(topicPath, true, 10000000); auto driver = server.Server->AnnoyingClient->GetDriver(); + auto start = TInstant::Now(); - { - auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123"); + for (ui32 i = 0; i < 7; ++i) { + auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, TStringBuilder() << "123" << i, {}, "raw"); writer->Write(data); bool res = writer->Close(TDuration::Seconds(10)); UNIT_ASSERT(res); } + + Cerr << "DURATION " << (TInstant::Now() - start) << "\n"; + UNIT_ASSERT(TInstant::Now() - start > minTime); } } Y_UNIT_TEST(TestWriteWithRateLimiterWithBlobsRateLimit) { TPersQueueV1TestServerWithRateLimiter server; server.InitAll(NKikimrPQ::TPQConfig::TQuotingConfig::WRITTEN_BLOB_SIZE); - server.EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_INFO); - TestWriteWithRateLimiter(server); + server.EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_DEBUG); + TestWriteWithRateLimiter(server, TDuration::MilliSeconds(5200)); } Y_UNIT_TEST(TestWriteWithRateLimiterWithUserPayloadRateLimit) { TPersQueueV1TestServerWithRateLimiter server; server.InitAll(NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE); - TestWriteWithRateLimiter(server); + server.EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_DEBUG); + TestWriteWithRateLimiter(server, TDuration::MilliSeconds(2500)); } void TestRateLimiterLimitsWrite(TPersQueueV1TestServerWithRateLimiter& server) { @@ -321,7 +325,7 @@ Y_UNIT_TEST_SUITE(TPersQueueCommonTest) { // Warm up write { - auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123"); + auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123", {}, "raw"); writer->Write(data); bool res = writer->Close(TDuration::Seconds(10)); @@ -334,7 +338,7 @@ Y_UNIT_TEST_SUITE(TPersQueueCommonTest) { { - auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123"); + auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123", {}, "raw"); writer->Write(data); bool res = writer->Close(TDuration::Seconds(10)); @@ -342,7 +346,7 @@ Y_UNIT_TEST_SUITE(TPersQueueCommonTest) { } { - auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123"); + auto writer = CreateSimpleWriter(*driver, server.TenantModeEnabled() ? "/Root/PQ/" + topicPath : topicPath, "123", {}, "raw"); writer->Write(data); bool res = writer->Close(TDuration::Seconds(10)); @@ -361,6 +365,22 @@ Y_UNIT_TEST_SUITE(TPersQueueCommonTest) { } } + + Y_UNIT_TEST(TestLimiterLimitsWithBlobsRateLimit) { + TPersQueueV1TestServerWithRateLimiter server; + server.InitAll(NKikimrPQ::TPQConfig::TQuotingConfig::WRITTEN_BLOB_SIZE); + server.EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_DEBUG); + TestRateLimiterLimitsWrite(server); + } + + Y_UNIT_TEST(TestLimiterLimitsWithUserPayloadRateLimit) { + TPersQueueV1TestServerWithRateLimiter server; + server.InitAll(NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE); + server.EnablePQLogs({NKikimrServices::PERSQUEUE}, NLog::EPriority::PRI_DEBUG); + + TestRateLimiterLimitsWrite(server); + } + } } |