aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-02-10 17:55:22 +0300
committeralexnick <alexnick@yandex-team.ru>2022-02-10 17:55:22 +0300
commit2439f49704638bada6debcff03071bc66a882e04 (patch)
tree8a89ecc44f40e4b7b288ae85d5891c841c600339
parent3c42b81c83c7d28a49b3136d35e70cf75a1272fa (diff)
downloadydb-2439f49704638bada6debcff03071bc66a882e04.tar.gz
support of pqv0 in first class citizen mode
ref:ba3d4119773e0c06b49a13106e85453605a22a90
-rw-r--r--ydb/core/persqueue/read_balancer.cpp6
-rw-r--r--ydb/core/testlib/test_pq_client.h8
2 files changed, 11 insertions, 3 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index 4eb9c7206dd..490e6ece3ce 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -370,8 +370,10 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req
if (record.GetOperation() == NKikimrPQ::EOperation::READ_OP) {
if (!Consumers.contains(user)) {
- RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '" << NPersQueue::ConvertOldConsumerName(user) << "' that allows to read topic from cluster '"
- << NPersQueue::GetDC(Topic) <<"'; may be there is read rule with mode all-original only and you are reading with mirrored topics. Change read-rule to mirror-to-<cluster> or options of reading process.", ctx);
+ RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '"
+ << (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? user : NPersQueue::ConvertOldConsumerName(user))
+ << "' that allows to read topic from cluster '" << NPersQueue::GetDC(Topic)
+ << "'; may be there is read rule with mode all-original only and you are reading with mirrored topics. Change read-rule to mirror-to-<cluster> or options of reading process.", ctx);
return;
}
}
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index f0599232508..d9a7b38cc0f 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -853,7 +853,7 @@ public:
}
- void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true) {
+ void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true, TVector<TString> rr = {}) {
TString path = name;
if (UseConfigTables) {
path = TStringBuilder() << "/Root/PQ/" << name;
@@ -861,6 +861,12 @@ public:
auto pqClient = NYdb::NPersQueue::TPersQueueClient(*Driver);
auto settings = NYdb::NPersQueue::TCreateTopicSettings().PartitionsCount(partsCount).ClientWriteDisabled(!canWrite);
+ TVector<NYdb::NPersQueue::TReadRuleSettings> rrSettings;
+ for (auto &user : rr) {
+ rrSettings.push_back({NYdb::NPersQueue::TReadRuleSettings{}.ConsumerName(user)});
+ }
+ settings.ReadRules(rrSettings);
+
Cerr << "===Create topic: " << path << Endl;
auto res = pqClient.CreateTopic(path, settings);
//ToDo - hack, cannot avoid legacy compat yet as PQv1 still uses RequestProcessor from core/client/server