diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-02-10 17:55:22 +0300 |
---|---|---|
committer | alexnick <alexnick@yandex-team.ru> | 2022-02-10 17:55:22 +0300 |
commit | 2439f49704638bada6debcff03071bc66a882e04 (patch) | |
tree | 8a89ecc44f40e4b7b288ae85d5891c841c600339 | |
parent | 3c42b81c83c7d28a49b3136d35e70cf75a1272fa (diff) | |
download | ydb-2439f49704638bada6debcff03071bc66a882e04.tar.gz |
support of pqv0 in first class citizen mode
ref:ba3d4119773e0c06b49a13106e85453605a22a90
-rw-r--r-- | ydb/core/persqueue/read_balancer.cpp | 6 | ||||
-rw-r--r-- | ydb/core/testlib/test_pq_client.h | 8 |
2 files changed, 11 insertions, 3 deletions
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 4eb9c7206dd..490e6ece3ce 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -370,8 +370,10 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req if (record.GetOperation() == NKikimrPQ::EOperation::READ_OP) { if (!Consumers.contains(user)) { - RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '" << NPersQueue::ConvertOldConsumerName(user) << "' that allows to read topic from cluster '" - << NPersQueue::GetDC(Topic) <<"'; may be there is read rule with mode all-original only and you are reading with mirrored topics. Change read-rule to mirror-to-<cluster> or options of reading process.", ctx); + RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '" + << (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? user : NPersQueue::ConvertOldConsumerName(user)) + << "' that allows to read topic from cluster '" << NPersQueue::GetDC(Topic) + << "'; may be there is read rule with mode all-original only and you are reading with mirrored topics. Change read-rule to mirror-to-<cluster> or options of reading process.", ctx); return; } } diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index f0599232508..d9a7b38cc0f 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -853,7 +853,7 @@ public: } - void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true) { + void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true, TVector<TString> rr = {}) { TString path = name; if (UseConfigTables) { path = TStringBuilder() << "/Root/PQ/" << name; @@ -861,6 +861,12 @@ public: auto pqClient = NYdb::NPersQueue::TPersQueueClient(*Driver); auto settings = NYdb::NPersQueue::TCreateTopicSettings().PartitionsCount(partsCount).ClientWriteDisabled(!canWrite); + TVector<NYdb::NPersQueue::TReadRuleSettings> rrSettings; + for (auto &user : rr) { + rrSettings.push_back({NYdb::NPersQueue::TReadRuleSettings{}.ConsumerName(user)}); + } + settings.ReadRules(rrSettings); + Cerr << "===Create topic: " << path << Endl; auto res = pqClient.CreateTopic(path, settings); //ToDo - hack, cannot avoid legacy compat yet as PQv1 still uses RequestProcessor from core/client/server |