diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-02-24 01:41:59 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-24 01:41:59 +0300 |
commit | ce558ec5cf887e0e74ea38ef4430cd81eca39a15 (patch) | |
tree | 3e2eb3a24afa3146a0b3f94c2bcafe945c6797d4 | |
parent | c1382909f2f37f901364dfbb92cd72dcbf9e0f8d (diff) | |
download | ydb-ce558ec5cf887e0e74ea38ef4430cd81eca39a15.tar.gz |
merge from trunk: support of pqv0, remove old read-rules LOGBROKER-7163
fix for old read-rules LOGBROKER-7163
REVIEW: 2342989
remove tvm dependecy KIKIMR-14386
REVIEW: 2335742
add missing peerdirs
Note: mandatory check (NEED_CHECK) was skipped
support of pqv0 in first class citizen mode
REVIEW: 2316816
add more tests for pqv0 and firstClassCitizen
Note: mandatory check (NEED_CHECK) was skipped
fix for race on destruction
Note: mandatory check (NEED_CHECK) was skipped
REVIEW: 2346379
x-ydb-stable-ref: cba3e236548e68a4ea06193a6bfabf12e707c265
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 22 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_ut.cpp | 67 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_ut.h | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/read_balancer.cpp | 6 | ||||
-rw-r--r-- | ydb/core/testlib/test_pq_client.h | 44 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/ya.make | 1 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h | 1 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_read_actor.cpp | 11 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 17 |
11 files changed, 112 insertions, 63 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index cc8e2419cf..0df5db92d2 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -414,10 +414,8 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config for (auto& userInfo : UsersInfoStorage.GetAll()) { userInfo.second.ReadFromTimestamp = TInstant::Zero(); - if (userInfo.second.HasReadRule) { - userInfo.second.HasReadRule = false; - hasReadRule.insert(userInfo.first); - } + userInfo.second.HasReadRule = false; + hasReadRule.insert(userInfo.first); } for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { const auto& consumer = config.GetReadRules(i); @@ -435,16 +433,16 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config hasReadRule.erase(consumer); TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); if (!ts) ts += TDuration::MilliSeconds(1); - if (!userInfo.ReadFromTimestamp|| userInfo.ReadFromTimestamp > ts) + if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts) userInfo.ReadFromTimestamp = ts; } for (auto& consumer : hasReadRule) { auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx); + THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer, + 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); if (!userInfo.Important) { ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo.LabeledCounters.GetGroup())); } - THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer, - 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); userInfo.Session = ""; userInfo.Offset = 0; userInfo.Step = userInfo.Generation = 0; @@ -1805,6 +1803,10 @@ void TPartition::Handle(TEvPQ::TEvChangeConfig::TPtr& ev, const TActorContext& c if (CurrentStateFunc() != &TThis::StateInit) { InitUserInfoForImportantClients(ctx); FillReadFromTimestamps(Config, ctx); + + for (auto& ui : UsersInfoStorage.GetAll()) { + ProcessUserActs(ui.second, ctx); + } } if (Config.GetPartitionConfig().HasMirrorFrom()) { @@ -3831,6 +3833,12 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T range->SetTo(ikey.Data(), ikey.Size()); range->SetIncludeFrom(true); range->SetIncludeTo(true); + del = request->Record.AddCmdDeleteRange(); + range = del->MutableRange(); + range->SetFrom(ikeyDeprecated.Data(), ikeyDeprecated.Size()); + range->SetTo(ikeyDeprecated.Data(), ikeyDeprecated.Size()); + range->SetIncludeFrom(true); + range->SetIncludeTo(true); request->Record.SetCookie(cookie); ctx.Send(Tablet, request.Release()); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 9937dbe561..caba19df02 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1934,6 +1934,7 @@ bool TPersQueue::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc { if (!ev) return true; + if (ev->Get()->Cgi().Has("kv")) { return TKeyValueFlat::OnRenderAppHtmlPage(ev, ctx); } diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp index 13a6122910..f62f28f18a 100644 --- a/ydb/core/persqueue/pq_ut.cpp +++ b/ydb/core/persqueue/pq_ut.cpp @@ -168,7 +168,6 @@ Y_UNIT_TEST(TestUserInfoCompatibility) { FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 3, 0); FillUserInfo(request->Record.AddCmdWrite(), client, 3, 1); - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); TAutoPtr<IEventHandle> handle; TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); @@ -194,7 +193,8 @@ Y_UNIT_TEST(TestReadRuleVersions) { tc.Prepare(dispatchName, setup, activeZone); activeZone = false; TString client = "test"; - PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{client, false}}, tc, 3); + + PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{client, false}, {"another-user", false}}, tc, 3); tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); @@ -208,18 +208,75 @@ Y_UNIT_TEST(TestReadRuleVersions) { CmdSetOffset(0, client, 1, false, tc); CmdSetOffset(1, client, 2, false, tc); + { + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + + FillUserInfo(request->Record.AddCmdWrite(), "old_consumer", 0, 0); + FillDeprecatedUserInfo(request->Record.AddCmdWrite(), "old_consumer", 0, 0); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); + Y_UNUSED(result); + + } + RestartTablet(tc); CmdGetOffset(0, client, 1, tc); CmdGetOffset(1, client, 2, tc); - PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 3); + { + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + auto read = request->Record.AddCmdReadRange(); + auto range = read->MutableRange(); + NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUser); + range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); + range->SetIncludeFrom(true); + NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkUser); + range->SetTo(ikeyTo.Data(), ikeyTo.Size()); + range->SetIncludeTo(true); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); - CmdCreateSession(0, client, "session1", tc, 0, 0, 0, true); - CmdCreateSession(1, client, "session2", tc, 0, 0, 0, true); + Cerr << result->Record << "\n"; + + UNIT_ASSERT(result->Record.GetReadRangeResult(0).GetPair().size() == 7); + } + + PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 3); CmdGetOffset(0, client, 0, tc); CmdGetOffset(1, client, 0, tc); + + { + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + auto read = request->Record.AddCmdReadRange(); + auto range = read->MutableRange(); + NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUser); + range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); + range->SetIncludeFrom(true); + NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkUser); + range->SetTo(ikeyTo.Data(), ikeyTo.Size()); + range->SetIncludeTo(true); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); + + Cerr << result->Record << "\n"; + + UNIT_ASSERT(result->Record.GetReadRangeResult(0).GetPair().size() == 3); + } + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, new NActors::NMon::TEvRemoteHttpInfo(TStringBuilder() << "localhost:8765/tablets/app?TabletID=" << tc.TabletId), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + + tc.Runtime->GrabEdgeEvent<NMon::TEvRemoteHttpInfoRes>(handle); + TString rs = handle->Get<NMon::TEvRemoteHttpInfoRes>()->Html; + Cerr << rs << "\n"; }); } diff --git a/ydb/core/persqueue/pq_ut.h b/ydb/core/persqueue/pq_ut.h index 3db93bc104..cbba48c173 100644 --- a/ydb/core/persqueue/pq_ut.h +++ b/ydb/core/persqueue/pq_ut.h @@ -1065,7 +1065,7 @@ void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui } -void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, TString& client, ui32 partition, ui64 offset) { +void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) { NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser); ikey.Append(client.c_str(), client.size()); @@ -1086,7 +1086,7 @@ void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, TString& cli write->SetValue(idata.Data(), idata.Size()); } -void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, TString& client, ui32 partition, ui64 offset) { +void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) { TString session = "test-session"; ui32 gen = 1; ui32 step = 2; diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 4eb9c7206d..490e6ece3c 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -370,8 +370,10 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req if (record.GetOperation() == NKikimrPQ::EOperation::READ_OP) { if (!Consumers.contains(user)) { - RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '" << NPersQueue::ConvertOldConsumerName(user) << "' that allows to read topic from cluster '" - << NPersQueue::GetDC(Topic) <<"'; may be there is read rule with mode all-original only and you are reading with mirrored topics. Change read-rule to mirror-to-<cluster> or options of reading process.", ctx); + RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '" + << (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? user : NPersQueue::ConvertOldConsumerName(user)) + << "' that allows to read topic from cluster '" << NPersQueue::GetDC(Topic) + << "'; may be there is read rule with mode all-original only and you are reading with mirrored topics. Change read-rule to mirror-to-<cluster> or options of reading process.", ctx); return; } } diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index ac88c80f27..59a93052f7 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -11,7 +11,6 @@ #include <ydb/library/aclib/aclib.h> #include <ydb/library/persqueue/topic_parser/topic_parser.h> -#include <library/cpp/tvmauth/unittest.h> #include <library/cpp/testing/unittest/registar.h> #include <util/string/printf.h> @@ -31,18 +30,7 @@ inline Tests::TServerSettings PQSettings(ui16 port, ui32 nodesCount = 2, bool ro authConfig.SetUseAccessService(false); authConfig.SetUseAccessServiceTLS(false); authConfig.SetUseStaff(false); - authConfig.MutableTVMConfig()->SetEnabled(true); - authConfig.MutableTVMConfig()->SetServiceTVMId(10); - authConfig.MutableTVMConfig()->SetPublicKeys(NTvmAuth::NUnittest::TVMKNIFE_PUBLIC_KEYS); - authConfig.MutableTVMConfig()->SetUpdatePublicKeys(false); pqConfig.SetRoundRobinPartitionMapping(roundrobin); - const TString query = R"___( - DECLARE $userNameHint AS Utf8; DECLARE $uid AS Uint64; - SELECT DISTINCT(name) FROM (SELECT name FROM [/Root/PQ/Config/V2/Producer] WHERE tvmClientId = YQL::ToString($uid) AND ($userNameHint = name OR $userNameHint = "") - UNION ALL SELECT name FROM [/Root/PQ/Config/V2/Consumer] WHERE tvmClientId = YQL::ToString($uid) AND ($userNameHint = name OR $userNameHint = "")); - )___"; - - authConfig.MutableUserRegistryConfig()->SetQuery(query); pqConfig.SetEnabled(true); pqConfig.SetMaxReadCookies(10); @@ -85,7 +73,7 @@ struct TRequestCreatePQ { ui64 writeSpeed = 20000000, const TString& user = "", ui64 readSpeed = 20000000, - const TVector<TString>& readRules = {}, + const TVector<TString>& readRules = {"user"}, const TVector<TString>& important = {}, std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom = {}, ui64 sourceIdMaxCount = 6000000, @@ -161,9 +149,9 @@ struct TRequestCreatePQ { config->AddReadRuleVersions(0); config->AddConsumerCodecs()->AddIds(0); } - if (!ReadRules.empty()) { - config->SetRequireAuthRead(true); - } +// if (!ReadRules.empty()) { +// config->SetRequireAuthRead(true); +// } if (!User.empty()) { auto rq = config->MutablePartitionConfig()->AddReadQuota(); rq->SetSpeedInBytesPerSecond(ReadSpeed); @@ -693,16 +681,6 @@ public: ); )___"); - RunYqlDataQuery(R"___( - UPSERT INTO [/Root/PQ/Config/V2/Consumer] (name, tvmClientId) VALUES - ("user1", "1"), - ("user2", "1"), - ("user5", "1"), - ("user3", "2"); - UPSERT INTO [/Root/PQ/Config/V2/Producer] (name, tvmClientId) VALUES - ("user4", "2"), - ("topic1", "1"); - )___"); } void UpdateDC(const TString& name, bool local, bool enabled) { @@ -853,7 +831,7 @@ public: } - void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true) { + void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true, TVector<TString> rr = {"user"}) { TString path = name; if (UseConfigTables) { path = TStringBuilder() << "/Root/PQ/" << name; @@ -861,6 +839,12 @@ public: auto pqClient = NYdb::NPersQueue::TPersQueueClient(*Driver); auto settings = NYdb::NPersQueue::TCreateTopicSettings().PartitionsCount(partsCount).ClientWriteDisabled(!canWrite); + TVector<NYdb::NPersQueue::TReadRuleSettings> rrSettings; + for (auto &user : rr) { + rrSettings.push_back({NYdb::NPersQueue::TReadRuleSettings{}.ConsumerName(user)}); + } + settings.ReadRules(rrSettings); + Cerr << "===Create topic: " << path << Endl; auto res = pqClient.CreateTopic(path, settings); //ToDo - hack, cannot avoid legacy compat yet as PQv1 still uses RequestProcessor from core/client/server @@ -902,7 +886,7 @@ public: const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request, reply); UNIT_ASSERT(response); UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK, - "proxy failure"); + TStringBuilder() << "proxy failure: " << response->Record.DebugString()); AddTopic(createRequest.Topic); while (doWait && TopicRealCreated(createRequest.Topic) != prevVersion + 1) { @@ -923,7 +907,7 @@ public: ui64 writeSpeed = 20000000, TString user = "", ui64 readSpeed = 200000000, - TVector<TString> rr = {}, + TVector<TString> rr = {"user"}, TVector<TString> important = {}, std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom = {}, ui64 sourceIdMaxCount = 6000000, @@ -1227,7 +1211,7 @@ public: auto t = res.GetTopicResult(i); count += t.PartitionResultSize(); for (ui32 j = 0; j < t.PartitionResultSize(); ++j) { - if (t.GetPartitionResult(j).HasClientOffset()) + if (t.GetPartitionResult(j).HasClientOffset() && t.GetPartitionResult(j).GetClientOffset() > 0) ++clientOffsetCount; } } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/ya.make b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/ya.make index 21c92fa949..7027e78931 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/ya.make @@ -24,6 +24,7 @@ PEERDIR( ydb/public/sdk/cpp/client/impl/ydb_internal/make_request ydb/public/sdk/cpp/client/ydb_common_client/impl ydb/public/sdk/cpp/client/ydb_driver + ydb/public/api/grpc/draft ) END() diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h index ddb1b2d82b..ee6a5fc64a 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h @@ -245,6 +245,7 @@ struct TTopicSettings : public TOperationRequestSettings<TDerived> { FLUENT_SETTING_OPTIONAL(ui32, AbcId); FLUENT_SETTING_OPTIONAL(TString, AbcSlug); + //TODO: FLUENT_SETTING_VECTOR FLUENT_SETTING_DEFAULT(TVector<TReadRuleSettings>, ReadRules, {}); FLUENT_SETTING_OPTIONAL(TRemoteMirrorRuleSettings, RemoteMirrorRule); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make index 728ca44d6b..5adaf587b1 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make @@ -16,6 +16,7 @@ PEERDIR( library/cpp/retry ydb/public/sdk/cpp/client/ydb_persqueue_core/impl ydb/public/sdk/cpp/client/ydb_proto + ydb/public/sdk/cpp/client/ydb_driver ) END() diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 82b7a0658a..509eb2a231 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -2674,20 +2674,13 @@ bool TReadInitAndAuthActor::CheckTopicACL( ) { auto& pqDescr = entry.PQGroupInfo->Description; //ToDo[migration] - proper auth setup - bool alwaysCheckPermissions = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen(); - bool reqAuthRead = DoCheckACL && ( - pqDescr.GetPQTabletConfig().GetRequireAuthRead() || alwaysCheckPermissions - ); - - if (reqAuthRead && !CheckACLPermissionsForNavigate( + if (Token && !CheckACLPermissionsForNavigate( entry.SecurityObject, topic, NACLib::EAccessRights::SelectRow, "No ReadTopic permissions", ctx )) { return false; } - //ToDo[migration] - proper auth setup - bool doCheckReadRules = AppData(ctx)->PQConfig.GetCheckACL() && (Token || alwaysCheckPermissions); - if (doCheckReadRules) { + if (Token) { bool found = false; for (auto& cons : pqDescr.GetPQTabletConfig().GetReadRules() ) { if (cons == ClientId) { diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index f7392816f2..6e5c662a7a 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -617,7 +617,7 @@ namespace { writer.Write(SHORT_TOPIC_NAME, {"valuevaluevalue8"}); writer.Write(SHORT_TOPIC_NAME, {"valuevaluevalue9"}); - writer.Read(SHORT_TOPIC_NAME, "user1", "", false, false); + writer.Read(SHORT_TOPIC_NAME, "user", "", false, false); } Y_UNIT_TEST(SetupReadSession) { @@ -769,7 +769,7 @@ namespace { Y_UNIT_TEST(BigRead) { NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root")); - server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8*1024*1024, 86400, 20000000, "user1", 2000000); + server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8*1024*1024, 86400, 20000000, "user", 2000000); server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE }); @@ -778,12 +778,12 @@ namespace { server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value); // trying to read small PQ messages in a big messagebus event - auto info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user1"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb + auto info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromDisk, 0); UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromCache, 4); TInstant now(TInstant::Now()); - info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user1"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb + info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb TDuration dur = TInstant::Now() - now; UNIT_ASSERT_C(dur > TDuration::Seconds(7) && dur < TDuration::Seconds(20), "dur = " << dur); //speed limit is 2000kb/s and burst is 2000kb, so to read 24mb it will take at least 11 seconds @@ -803,8 +803,8 @@ namespace { for (ui32 i = 0; i < 32; ++i) server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value); - auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user1"}, 16); - auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user1"}, 16); + auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16); + auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16); UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 3); UNIT_ASSERT_VALUES_EQUAL(info16.BlobsFromCache, 2); @@ -813,8 +813,8 @@ namespace { for (ui32 i = 0; i < 8; ++i) server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value); - info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user1"}, 16); - info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user1"}, 16); + info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16); + info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16); ui32 fromDisk = info0.BlobsFromDisk + info16.BlobsFromDisk; ui32 fromCache = info0.BlobsFromCache + info16.BlobsFromCache; @@ -3180,6 +3180,7 @@ namespace { const ui32 topicsCount = 4; for (ui32 i = 1; i <= topicsCount; ++i) { TRequestCreatePQ createTopicRequest(TStringBuilder() << "rt3.dc1--topic_" << i, 1); + createTopicRequest.ReadRules.clear(); createTopicRequest.ReadRules.push_back("acc@user1"); createTopicRequest.ReadRules.push_back("acc@user2"); createTopicRequest.ReadRules.push_back("acc@user3"); |