diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-02-21 19:17:18 +0300 |
---|---|---|
committer | alexnick <alexnick@yandex-team.ru> | 2022-02-21 19:17:18 +0300 |
commit | da31d8464e4dd7b0e44b7b304c413ad8ca4ae084 (patch) | |
tree | e19ec8f7161d7ca046c8f92af29c5d081bc54891 | |
parent | 2eaa9cc03ffb5c9b1642b09282e2f920f3a30328 (diff) | |
download | ydb-da31d8464e4dd7b0e44b7b304c413ad8ca4ae084.tar.gz |
fix for old read-rules LOGBROKER-7163
ref:928e5aa5a0881cd0813ec4f1b9dbf34ee9eea616
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 22 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_ut.cpp | 67 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_ut.h | 4 | ||||
-rw-r--r-- | ydb/core/testlib/test_pq_client.h | 16 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_read_actor.cpp | 11 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_write_actor.cpp | 14 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 17 |
8 files changed, 106 insertions, 46 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index cc8e2419cf2..0df5db92d2b 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -414,10 +414,8 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config for (auto& userInfo : UsersInfoStorage.GetAll()) { userInfo.second.ReadFromTimestamp = TInstant::Zero(); - if (userInfo.second.HasReadRule) { - userInfo.second.HasReadRule = false; - hasReadRule.insert(userInfo.first); - } + userInfo.second.HasReadRule = false; + hasReadRule.insert(userInfo.first); } for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { const auto& consumer = config.GetReadRules(i); @@ -435,16 +433,16 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config hasReadRule.erase(consumer); TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero(); if (!ts) ts += TDuration::MilliSeconds(1); - if (!userInfo.ReadFromTimestamp|| userInfo.ReadFromTimestamp > ts) + if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts) userInfo.ReadFromTimestamp = ts; } for (auto& consumer : hasReadRule) { auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx); + THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer, + 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); if (!userInfo.Important) { ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo.LabeledCounters.GetGroup())); } - THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer, - 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0); userInfo.Session = ""; userInfo.Offset = 0; userInfo.Step = userInfo.Generation = 0; @@ -1805,6 +1803,10 @@ void TPartition::Handle(TEvPQ::TEvChangeConfig::TPtr& ev, const TActorContext& c if (CurrentStateFunc() != &TThis::StateInit) { InitUserInfoForImportantClients(ctx); FillReadFromTimestamps(Config, ctx); + + for (auto& ui : UsersInfoStorage.GetAll()) { + ProcessUserActs(ui.second, ctx); + } } if (Config.GetPartitionConfig().HasMirrorFrom()) { @@ -3831,6 +3833,12 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T range->SetTo(ikey.Data(), ikey.Size()); range->SetIncludeFrom(true); range->SetIncludeTo(true); + del = request->Record.AddCmdDeleteRange(); + range = del->MutableRange(); + range->SetFrom(ikeyDeprecated.Data(), ikeyDeprecated.Size()); + range->SetTo(ikeyDeprecated.Data(), ikeyDeprecated.Size()); + range->SetIncludeFrom(true); + range->SetIncludeTo(true); request->Record.SetCookie(cookie); ctx.Send(Tablet, request.Release()); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 4ffbcf49c1f..09ec4b94fa9 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1934,6 +1934,7 @@ bool TPersQueue::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc { if (!ev) return true; + if (ev->Get()->Cgi().Has("kv")) { return TKeyValueFlat::OnRenderAppHtmlPage(ev, ctx); } diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp index 13a61229105..f62f28f18a0 100644 --- a/ydb/core/persqueue/pq_ut.cpp +++ b/ydb/core/persqueue/pq_ut.cpp @@ -168,7 +168,6 @@ Y_UNIT_TEST(TestUserInfoCompatibility) { FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 3, 0); FillUserInfo(request->Record.AddCmdWrite(), client, 3, 1); - tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); TAutoPtr<IEventHandle> handle; TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); @@ -194,7 +193,8 @@ Y_UNIT_TEST(TestReadRuleVersions) { tc.Prepare(dispatchName, setup, activeZone); activeZone = false; TString client = "test"; - PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{client, false}}, tc, 3); + + PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{client, false}, {"another-user", false}}, tc, 3); tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); @@ -208,18 +208,75 @@ Y_UNIT_TEST(TestReadRuleVersions) { CmdSetOffset(0, client, 1, false, tc); CmdSetOffset(1, client, 2, false, tc); + { + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + + FillUserInfo(request->Record.AddCmdWrite(), "old_consumer", 0, 0); + FillDeprecatedUserInfo(request->Record.AddCmdWrite(), "old_consumer", 0, 0); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); + Y_UNUSED(result); + + } + RestartTablet(tc); CmdGetOffset(0, client, 1, tc); CmdGetOffset(1, client, 2, tc); - PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 3); + { + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + auto read = request->Record.AddCmdReadRange(); + auto range = read->MutableRange(); + NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUser); + range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); + range->SetIncludeFrom(true); + NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkUser); + range->SetTo(ikeyTo.Data(), ikeyTo.Size()); + range->SetIncludeTo(true); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); - CmdCreateSession(0, client, "session1", tc, 0, 0, 0, true); - CmdCreateSession(1, client, "session2", tc, 0, 0, 0, true); + Cerr << result->Record << "\n"; + + UNIT_ASSERT(result->Record.GetReadRangeResult(0).GetPair().size() == 7); + } + + PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 3); CmdGetOffset(0, client, 0, tc); CmdGetOffset(1, client, 0, tc); + + { + THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest); + auto read = request->Record.AddCmdReadRange(); + auto range = read->MutableRange(); + NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUser); + range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size()); + range->SetIncludeFrom(true); + NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkUser); + range->SetTo(ikeyTo.Data(), ikeyTo.Size()); + range->SetIncludeTo(true); + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle); + + Cerr << result->Record << "\n"; + + UNIT_ASSERT(result->Record.GetReadRangeResult(0).GetPair().size() == 3); + } + + tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, new NActors::NMon::TEvRemoteHttpInfo(TStringBuilder() << "localhost:8765/tablets/app?TabletID=" << tc.TabletId), 0, GetPipeConfigWithRetries()); + TAutoPtr<IEventHandle> handle; + + tc.Runtime->GrabEdgeEvent<NMon::TEvRemoteHttpInfoRes>(handle); + TString rs = handle->Get<NMon::TEvRemoteHttpInfoRes>()->Html; + Cerr << rs << "\n"; }); } diff --git a/ydb/core/persqueue/pq_ut.h b/ydb/core/persqueue/pq_ut.h index 3db93bc1047..cbba48c1736 100644 --- a/ydb/core/persqueue/pq_ut.h +++ b/ydb/core/persqueue/pq_ut.h @@ -1065,7 +1065,7 @@ void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui } -void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, TString& client, ui32 partition, ui64 offset) { +void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) { NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser); ikey.Append(client.c_str(), client.size()); @@ -1086,7 +1086,7 @@ void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, TString& cli write->SetValue(idata.Data(), idata.Size()); } -void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, TString& client, ui32 partition, ui64 offset) { +void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) { TString session = "test-session"; ui32 gen = 1; ui32 step = 2; diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h index 142277c9983..ab2c32c9f49 100644 --- a/ydb/core/testlib/test_pq_client.h +++ b/ydb/core/testlib/test_pq_client.h @@ -73,7 +73,7 @@ struct TRequestCreatePQ { ui64 writeSpeed = 20000000, const TString& user = "", ui64 readSpeed = 20000000, - const TVector<TString>& readRules = {}, + const TVector<TString>& readRules = {"user"}, const TVector<TString>& important = {}, std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom = {}, ui64 sourceIdMaxCount = 6000000, @@ -149,9 +149,9 @@ struct TRequestCreatePQ { config->AddReadRuleVersions(0); config->AddConsumerCodecs()->AddIds(0); } - if (!ReadRules.empty()) { - config->SetRequireAuthRead(true); - } +// if (!ReadRules.empty()) { +// config->SetRequireAuthRead(true); +// } if (!User.empty()) { auto rq = config->MutablePartitionConfig()->AddReadQuota(); rq->SetSpeedInBytesPerSecond(ReadSpeed); @@ -831,7 +831,7 @@ public: } - void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true, TVector<TString> rr = {}) { + void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true, TVector<TString> rr = {"user"}) { TString path = name; if (UseConfigTables) { path = TStringBuilder() << "/Root/PQ/" << name; @@ -886,7 +886,7 @@ public: const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request, reply); UNIT_ASSERT(response); UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK, - "proxy failure"); + TStringBuilder() << "proxy failure: " << response->Record.DebugString()); AddTopic(createRequest.Topic); while (doWait && TopicRealCreated(createRequest.Topic) != prevVersion + 1) { @@ -907,7 +907,7 @@ public: ui64 writeSpeed = 20000000, TString user = "", ui64 readSpeed = 200000000, - TVector<TString> rr = {}, + TVector<TString> rr = {"user"}, TVector<TString> important = {}, std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom = {}, ui64 sourceIdMaxCount = 6000000, @@ -1211,7 +1211,7 @@ public: auto t = res.GetTopicResult(i); count += t.PartitionResultSize(); for (ui32 j = 0; j < t.PartitionResultSize(); ++j) { - if (t.GetPartitionResult(j).HasClientOffset()) + if (t.GetPartitionResult(j).HasClientOffset() && t.GetPartitionResult(j).GetClientOffset() > 0) ++clientOffsetCount; } } diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 82b7a0658aa..509eb2a2316 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -2674,20 +2674,13 @@ bool TReadInitAndAuthActor::CheckTopicACL( ) { auto& pqDescr = entry.PQGroupInfo->Description; //ToDo[migration] - proper auth setup - bool alwaysCheckPermissions = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen(); - bool reqAuthRead = DoCheckACL && ( - pqDescr.GetPQTabletConfig().GetRequireAuthRead() || alwaysCheckPermissions - ); - - if (reqAuthRead && !CheckACLPermissionsForNavigate( + if (Token && !CheckACLPermissionsForNavigate( entry.SecurityObject, topic, NACLib::EAccessRights::SelectRow, "No ReadTopic permissions", ctx )) { return false; } - //ToDo[migration] - proper auth setup - bool doCheckReadRules = AppData(ctx)->PQConfig.GetCheckACL() && (Token || alwaysCheckPermissions); - if (doCheckReadRules) { + if (Token) { bool found = false; for (auto& cons : pqDescr.GetPQTabletConfig().GetReadRules() ) { if (cons == ClientId) { diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp index c93999aae3c..d7a4044af27 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp @@ -296,13 +296,13 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor SourceId = init.message_group_id(); try { - // "Bad" hash - in legacy mode calculated from full name ("rt3...", not short name); - // Here we had a bug for all the time being and now have to keep compatibility was invalid hashes - // Generally GetTopicForSrcIdHash for encoding. Do not copy-paste this line; + // "Bad" hash - in legacy mode calculated from full name ("rt3...", not short name); + // Here we had a bug for all the time being and now have to keep compatibility was invalid hashes + // Generally GetTopicForSrcIdHash for encoding. Do not copy-paste this line; EncodedSourceId = NSourceIdEncoding::EncodeSrcId(TopicConverter->GetTopicForSrcId(), SourceId); - - // Good hash and proper way of calcultion; - CompatibleHash = NSourceIdEncoding::EncodeSrcId(TopicConverter->GetTopicForSrcIdHash(), SourceId).Hash; + + // Good hash and proper way of calcultion; + CompatibleHash = NSourceIdEncoding::EncodeSrcId(TopicConverter->GetTopicForSrcIdHash(), SourceId).Hash; } catch (yexception& e) { CloseSession(TStringBuilder() << "incorrect sourceId \"" << SourceId << "\": " << e.what(), PersQueue::ErrorCode::BAD_REQUEST, ctx); return; @@ -672,7 +672,7 @@ THolder<NKqp::TEvKqp::TEvQueryRequest> TWriteSessionActor::MakeUpdateSourceIdMet parameters["$Hash"] = hash; parameters["$Topic"] = topic; parameters["$SourceId"] = EncodedSourceId.EscapedSourceId; - + parameters["$CreateTime"] = SourceIdCreateTime; parameters["$AccessTime"] = TInstant::Now().MilliSeconds(); parameters["$Partition"] = Partition; diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 4c1fd4e8520..84ad280f7e7 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -617,7 +617,7 @@ namespace { writer.Write(SHORT_TOPIC_NAME, {"valuevaluevalue8"}); writer.Write(SHORT_TOPIC_NAME, {"valuevaluevalue9"}); - writer.Read(SHORT_TOPIC_NAME, "user1", "", false, false); + writer.Read(SHORT_TOPIC_NAME, "user", "", false, false); } Y_UNIT_TEST(SetupReadSession) { @@ -769,7 +769,7 @@ namespace { Y_UNIT_TEST(BigRead) { NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root")); - server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8*1024*1024, 86400, 20000000, "user1", 2000000); + server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8*1024*1024, 86400, 20000000, "user", 2000000); server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE }); @@ -778,12 +778,12 @@ namespace { server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value); // trying to read small PQ messages in a big messagebus event - auto info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user1"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb + auto info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromDisk, 0); UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromCache, 4); TInstant now(TInstant::Now()); - info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user1"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb + info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb TDuration dur = TInstant::Now() - now; UNIT_ASSERT_C(dur > TDuration::Seconds(7) && dur < TDuration::Seconds(20), "dur = " << dur); //speed limit is 2000kb/s and burst is 2000kb, so to read 24mb it will take at least 11 seconds @@ -803,8 +803,8 @@ namespace { for (ui32 i = 0; i < 32; ++i) server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value); - auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user1"}, 16); - auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user1"}, 16); + auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16); + auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16); UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 3); UNIT_ASSERT_VALUES_EQUAL(info16.BlobsFromCache, 2); @@ -813,8 +813,8 @@ namespace { for (ui32 i = 0; i < 8; ++i) server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value); - info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user1"}, 16); - info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user1"}, 16); + info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16); + info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16); ui32 fromDisk = info0.BlobsFromDisk + info16.BlobsFromDisk; ui32 fromCache = info0.BlobsFromCache + info16.BlobsFromCache; @@ -3181,6 +3181,7 @@ namespace { const ui32 topicsCount = 4; for (ui32 i = 1; i <= topicsCount; ++i) { TRequestCreatePQ createTopicRequest(TStringBuilder() << "rt3.dc1--topic_" << i, 1); + createTopicRequest.ReadRules.clear(); createTopicRequest.ReadRules.push_back("acc@user1"); createTopicRequest.ReadRules.push_back("acc@user2"); createTopicRequest.ReadRules.push_back("acc@user3"); |