aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-02-24 01:41:59 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-24 01:41:59 +0300
commitce558ec5cf887e0e74ea38ef4430cd81eca39a15 (patch)
tree3e2eb3a24afa3146a0b3f94c2bcafe945c6797d4
parentc1382909f2f37f901364dfbb92cd72dcbf9e0f8d (diff)
downloadydb-ce558ec5cf887e0e74ea38ef4430cd81eca39a15.tar.gz
merge from trunk: support of pqv0, remove old read-rules LOGBROKER-7163
fix for old read-rules LOGBROKER-7163 REVIEW: 2342989 remove tvm dependecy KIKIMR-14386 REVIEW: 2335742 add missing peerdirs Note: mandatory check (NEED_CHECK) was skipped support of pqv0 in first class citizen mode REVIEW: 2316816 add more tests for pqv0 and firstClassCitizen Note: mandatory check (NEED_CHECK) was skipped fix for race on destruction Note: mandatory check (NEED_CHECK) was skipped REVIEW: 2346379 x-ydb-stable-ref: cba3e236548e68a4ea06193a6bfabf12e707c265
-rw-r--r--ydb/core/persqueue/partition.cpp22
-rw-r--r--ydb/core/persqueue/pq_impl.cpp1
-rw-r--r--ydb/core/persqueue/pq_ut.cpp67
-rw-r--r--ydb/core/persqueue/pq_ut.h4
-rw-r--r--ydb/core/persqueue/read_balancer.cpp6
-rw-r--r--ydb/core/testlib/test_pq_client.h44
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/ya.make1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make1
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp11
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp17
11 files changed, 112 insertions, 63 deletions
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index cc8e2419cf..0df5db92d2 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -414,10 +414,8 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
for (auto& userInfo : UsersInfoStorage.GetAll()) {
userInfo.second.ReadFromTimestamp = TInstant::Zero();
- if (userInfo.second.HasReadRule) {
- userInfo.second.HasReadRule = false;
- hasReadRule.insert(userInfo.first);
- }
+ userInfo.second.HasReadRule = false;
+ hasReadRule.insert(userInfo.first);
}
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
const auto& consumer = config.GetReadRules(i);
@@ -435,16 +433,16 @@ void TPartition::FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config
hasReadRule.erase(consumer);
TInstant ts = i < config.ReadFromTimestampsMsSize() ? TInstant::MilliSeconds(config.GetReadFromTimestampsMs(i)) : TInstant::Zero();
if (!ts) ts += TDuration::MilliSeconds(1);
- if (!userInfo.ReadFromTimestamp|| userInfo.ReadFromTimestamp > ts)
+ if (!userInfo.ReadFromTimestamp || userInfo.ReadFromTimestamp > ts)
userInfo.ReadFromTimestamp = ts;
}
for (auto& consumer : hasReadRule) {
auto& userInfo = UsersInfoStorage.GetOrCreate(consumer, ctx);
+ THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer,
+ 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0);
if (!userInfo.Important) {
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCountersDrop(Partition, userInfo.LabeledCounters.GetGroup()));
}
- THolder<TEvPQ::TEvSetClientInfo> event = MakeHolder<TEvPQ::TEvSetClientInfo>(0, consumer,
- 0, "", 0, 0, TEvPQ::TEvSetClientInfo::ESCI_DROP_READ_RULE, 0);
userInfo.Session = "";
userInfo.Offset = 0;
userInfo.Step = userInfo.Generation = 0;
@@ -1805,6 +1803,10 @@ void TPartition::Handle(TEvPQ::TEvChangeConfig::TPtr& ev, const TActorContext& c
if (CurrentStateFunc() != &TThis::StateInit) {
InitUserInfoForImportantClients(ctx);
FillReadFromTimestamps(Config, ctx);
+
+ for (auto& ui : UsersInfoStorage.GetAll()) {
+ ProcessUserActs(ui.second, ctx);
+ }
}
if (Config.GetPartitionConfig().HasMirrorFrom()) {
@@ -3831,6 +3833,12 @@ void TPartition::WriteClientInfo(const ui64 cookie, TUserInfo& userInfo, const T
range->SetTo(ikey.Data(), ikey.Size());
range->SetIncludeFrom(true);
range->SetIncludeTo(true);
+ del = request->Record.AddCmdDeleteRange();
+ range = del->MutableRange();
+ range->SetFrom(ikeyDeprecated.Data(), ikeyDeprecated.Size());
+ range->SetTo(ikeyDeprecated.Data(), ikeyDeprecated.Size());
+ range->SetIncludeFrom(true);
+ range->SetIncludeTo(true);
request->Record.SetCookie(cookie);
ctx.Send(Tablet, request.Release());
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 9937dbe561..caba19df02 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1934,6 +1934,7 @@ bool TPersQueue::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TAc
{
if (!ev)
return true;
+
if (ev->Get()->Cgi().Has("kv")) {
return TKeyValueFlat::OnRenderAppHtmlPage(ev, ctx);
}
diff --git a/ydb/core/persqueue/pq_ut.cpp b/ydb/core/persqueue/pq_ut.cpp
index 13a6122910..f62f28f18a 100644
--- a/ydb/core/persqueue/pq_ut.cpp
+++ b/ydb/core/persqueue/pq_ut.cpp
@@ -168,7 +168,6 @@ Y_UNIT_TEST(TestUserInfoCompatibility) {
FillDeprecatedUserInfo(request->Record.AddCmdWrite(), client, 3, 0);
FillUserInfo(request->Record.AddCmdWrite(), client, 3, 1);
-
tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
TAutoPtr<IEventHandle> handle;
TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
@@ -194,7 +193,8 @@ Y_UNIT_TEST(TestReadRuleVersions) {
tc.Prepare(dispatchName, setup, activeZone);
activeZone = false;
TString client = "test";
- PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{client, false}}, tc, 3);
+
+ PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {{client, false}, {"another-user", false}}, tc, 3);
tc.Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG);
@@ -208,18 +208,75 @@ Y_UNIT_TEST(TestReadRuleVersions) {
CmdSetOffset(0, client, 1, false, tc);
CmdSetOffset(1, client, 2, false, tc);
+ {
+ THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
+
+ FillUserInfo(request->Record.AddCmdWrite(), "old_consumer", 0, 0);
+ FillDeprecatedUserInfo(request->Record.AddCmdWrite(), "old_consumer", 0, 0);
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ TAutoPtr<IEventHandle> handle;
+ TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
+ Y_UNUSED(result);
+
+ }
+
RestartTablet(tc);
CmdGetOffset(0, client, 1, tc);
CmdGetOffset(1, client, 2, tc);
- PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 3);
+ {
+ THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
+ auto read = request->Record.AddCmdReadRange();
+ auto range = read->MutableRange();
+ NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUser);
+ range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size());
+ range->SetIncludeFrom(true);
+ NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkUser);
+ range->SetTo(ikeyTo.Data(), ikeyTo.Size());
+ range->SetIncludeTo(true);
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ TAutoPtr<IEventHandle> handle;
+ TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
- CmdCreateSession(0, client, "session1", tc, 0, 0, 0, true);
- CmdCreateSession(1, client, "session2", tc, 0, 0, 0, true);
+ Cerr << result->Record << "\n";
+
+ UNIT_ASSERT(result->Record.GetReadRangeResult(0).GetPair().size() == 7);
+ }
+
+ PQTabletPrepare(20000000, 100 * 1024 * 1024, 0, {}, tc, 3);
CmdGetOffset(0, client, 0, tc);
CmdGetOffset(1, client, 0, tc);
+
+ {
+ THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
+ auto read = request->Record.AddCmdReadRange();
+ auto range = read->MutableRange();
+ NPQ::TKeyPrefix ikeyFrom(NPQ::TKeyPrefix::TypeInfo, 0, NPQ::TKeyPrefix::MarkUser);
+ range->SetFrom(ikeyFrom.Data(), ikeyFrom.Size());
+ range->SetIncludeFrom(true);
+ NPQ::TKeyPrefix ikeyTo(NPQ::TKeyPrefix::TypeInfo, 1, NPQ::TKeyPrefix::MarkUser);
+ range->SetTo(ikeyTo.Data(), ikeyTo.Size());
+ range->SetIncludeTo(true);
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, request.Release(), 0, GetPipeConfigWithRetries());
+ TAutoPtr<IEventHandle> handle;
+ TEvKeyValue::TEvResponse* result = tc.Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>(handle);
+
+ Cerr << result->Record << "\n";
+
+ UNIT_ASSERT(result->Record.GetReadRangeResult(0).GetPair().size() == 3);
+ }
+
+ tc.Runtime->SendToPipe(tc.TabletId, tc.Edge, new NActors::NMon::TEvRemoteHttpInfo(TStringBuilder() << "localhost:8765/tablets/app?TabletID=" << tc.TabletId), 0, GetPipeConfigWithRetries());
+ TAutoPtr<IEventHandle> handle;
+
+ tc.Runtime->GrabEdgeEvent<NMon::TEvRemoteHttpInfoRes>(handle);
+ TString rs = handle->Get<NMon::TEvRemoteHttpInfoRes>()->Html;
+ Cerr << rs << "\n";
});
}
diff --git a/ydb/core/persqueue/pq_ut.h b/ydb/core/persqueue/pq_ut.h
index 3db93bc104..cbba48c173 100644
--- a/ydb/core/persqueue/pq_ut.h
+++ b/ydb/core/persqueue/pq_ut.h
@@ -1065,7 +1065,7 @@ void CmdRead(const ui32 partition, const ui64 offset, const ui32 count, const ui
}
-void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, TString& client, ui32 partition, ui64 offset) {
+void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) {
NPQ::TKeyPrefix ikey(NPQ::TKeyPrefix::TypeInfo, partition, NPQ::TKeyPrefix::MarkUser);
ikey.Append(client.c_str(), client.size());
@@ -1086,7 +1086,7 @@ void FillUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, TString& cli
write->SetValue(idata.Data(), idata.Size());
}
-void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, TString& client, ui32 partition, ui64 offset) {
+void FillDeprecatedUserInfo(NKikimrClient::TKeyValueRequest_TCmdWrite* write, const TString& client, ui32 partition, ui64 offset) {
TString session = "test-session";
ui32 gen = 1;
ui32 step = 2;
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index 4eb9c7206d..490e6ece3c 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -370,8 +370,10 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req
if (record.GetOperation() == NKikimrPQ::EOperation::READ_OP) {
if (!Consumers.contains(user)) {
- RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '" << NPersQueue::ConvertOldConsumerName(user) << "' that allows to read topic from cluster '"
- << NPersQueue::GetDC(Topic) <<"'; may be there is read rule with mode all-original only and you are reading with mirrored topics. Change read-rule to mirror-to-<cluster> or options of reading process.", ctx);
+ RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '"
+ << (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? user : NPersQueue::ConvertOldConsumerName(user))
+ << "' that allows to read topic from cluster '" << NPersQueue::GetDC(Topic)
+ << "'; may be there is read rule with mode all-original only and you are reading with mirrored topics. Change read-rule to mirror-to-<cluster> or options of reading process.", ctx);
return;
}
}
diff --git a/ydb/core/testlib/test_pq_client.h b/ydb/core/testlib/test_pq_client.h
index ac88c80f27..59a93052f7 100644
--- a/ydb/core/testlib/test_pq_client.h
+++ b/ydb/core/testlib/test_pq_client.h
@@ -11,7 +11,6 @@
#include <ydb/library/aclib/aclib.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
-#include <library/cpp/tvmauth/unittest.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/string/printf.h>
@@ -31,18 +30,7 @@ inline Tests::TServerSettings PQSettings(ui16 port, ui32 nodesCount = 2, bool ro
authConfig.SetUseAccessService(false);
authConfig.SetUseAccessServiceTLS(false);
authConfig.SetUseStaff(false);
- authConfig.MutableTVMConfig()->SetEnabled(true);
- authConfig.MutableTVMConfig()->SetServiceTVMId(10);
- authConfig.MutableTVMConfig()->SetPublicKeys(NTvmAuth::NUnittest::TVMKNIFE_PUBLIC_KEYS);
- authConfig.MutableTVMConfig()->SetUpdatePublicKeys(false);
pqConfig.SetRoundRobinPartitionMapping(roundrobin);
- const TString query = R"___(
- DECLARE $userNameHint AS Utf8; DECLARE $uid AS Uint64;
- SELECT DISTINCT(name) FROM (SELECT name FROM [/Root/PQ/Config/V2/Producer] WHERE tvmClientId = YQL::ToString($uid) AND ($userNameHint = name OR $userNameHint = "")
- UNION ALL SELECT name FROM [/Root/PQ/Config/V2/Consumer] WHERE tvmClientId = YQL::ToString($uid) AND ($userNameHint = name OR $userNameHint = ""));
- )___";
-
- authConfig.MutableUserRegistryConfig()->SetQuery(query);
pqConfig.SetEnabled(true);
pqConfig.SetMaxReadCookies(10);
@@ -85,7 +73,7 @@ struct TRequestCreatePQ {
ui64 writeSpeed = 20000000,
const TString& user = "",
ui64 readSpeed = 20000000,
- const TVector<TString>& readRules = {},
+ const TVector<TString>& readRules = {"user"},
const TVector<TString>& important = {},
std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom = {},
ui64 sourceIdMaxCount = 6000000,
@@ -161,9 +149,9 @@ struct TRequestCreatePQ {
config->AddReadRuleVersions(0);
config->AddConsumerCodecs()->AddIds(0);
}
- if (!ReadRules.empty()) {
- config->SetRequireAuthRead(true);
- }
+// if (!ReadRules.empty()) {
+// config->SetRequireAuthRead(true);
+// }
if (!User.empty()) {
auto rq = config->MutablePartitionConfig()->AddReadQuota();
rq->SetSpeedInBytesPerSecond(ReadSpeed);
@@ -693,16 +681,6 @@ public:
);
)___");
- RunYqlDataQuery(R"___(
- UPSERT INTO [/Root/PQ/Config/V2/Consumer] (name, tvmClientId) VALUES
- ("user1", "1"),
- ("user2", "1"),
- ("user5", "1"),
- ("user3", "2");
- UPSERT INTO [/Root/PQ/Config/V2/Producer] (name, tvmClientId) VALUES
- ("user4", "2"),
- ("topic1", "1");
- )___");
}
void UpdateDC(const TString& name, bool local, bool enabled) {
@@ -853,7 +831,7 @@ public:
}
- void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true) {
+ void CreateTopicNoLegacy(const TString& name, ui32 partsCount, bool doWait = true, bool canWrite = true, TVector<TString> rr = {"user"}) {
TString path = name;
if (UseConfigTables) {
path = TStringBuilder() << "/Root/PQ/" << name;
@@ -861,6 +839,12 @@ public:
auto pqClient = NYdb::NPersQueue::TPersQueueClient(*Driver);
auto settings = NYdb::NPersQueue::TCreateTopicSettings().PartitionsCount(partsCount).ClientWriteDisabled(!canWrite);
+ TVector<NYdb::NPersQueue::TReadRuleSettings> rrSettings;
+ for (auto &user : rr) {
+ rrSettings.push_back({NYdb::NPersQueue::TReadRuleSettings{}.ConsumerName(user)});
+ }
+ settings.ReadRules(rrSettings);
+
Cerr << "===Create topic: " << path << Endl;
auto res = pqClient.CreateTopic(path, settings);
//ToDo - hack, cannot avoid legacy compat yet as PQv1 still uses RequestProcessor from core/client/server
@@ -902,7 +886,7 @@ public:
const NMsgBusProxy::TBusResponse* response = SendAndGetReply(request, reply);
UNIT_ASSERT(response);
UNIT_ASSERT_VALUES_EQUAL_C((ui32)response->Record.GetErrorCode(), (ui32)NPersQueue::NErrorCode::OK,
- "proxy failure");
+ TStringBuilder() << "proxy failure: " << response->Record.DebugString());
AddTopic(createRequest.Topic);
while (doWait && TopicRealCreated(createRequest.Topic) != prevVersion + 1) {
@@ -923,7 +907,7 @@ public:
ui64 writeSpeed = 20000000,
TString user = "",
ui64 readSpeed = 200000000,
- TVector<TString> rr = {},
+ TVector<TString> rr = {"user"},
TVector<TString> important = {},
std::optional<NKikimrPQ::TMirrorPartitionConfig> mirrorFrom = {},
ui64 sourceIdMaxCount = 6000000,
@@ -1227,7 +1211,7 @@ public:
auto t = res.GetTopicResult(i);
count += t.PartitionResultSize();
for (ui32 j = 0; j < t.PartitionResultSize(); ++j) {
- if (t.GetPartitionResult(j).HasClientOffset())
+ if (t.GetPartitionResult(j).HasClientOffset() && t.GetPartitionResult(j).GetClientOffset() > 0)
++clientOffsetCount;
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/ya.make b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/ya.make
index 21c92fa949..7027e78931 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/ya.make
@@ -24,6 +24,7 @@ PEERDIR(
ydb/public/sdk/cpp/client/impl/ydb_internal/make_request
ydb/public/sdk/cpp/client/ydb_common_client/impl
ydb/public/sdk/cpp/client/ydb_driver
+ ydb/public/api/grpc/draft
)
END()
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
index ddb1b2d82b..ee6a5fc64a 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
@@ -245,6 +245,7 @@ struct TTopicSettings : public TOperationRequestSettings<TDerived> {
FLUENT_SETTING_OPTIONAL(ui32, AbcId);
FLUENT_SETTING_OPTIONAL(TString, AbcSlug);
+ //TODO: FLUENT_SETTING_VECTOR
FLUENT_SETTING_DEFAULT(TVector<TReadRuleSettings>, ReadRules, {});
FLUENT_SETTING_OPTIONAL(TRemoteMirrorRuleSettings, RemoteMirrorRule);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make
index 728ca44d6b..5adaf587b1 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ya.make
@@ -16,6 +16,7 @@ PEERDIR(
library/cpp/retry
ydb/public/sdk/cpp/client/ydb_persqueue_core/impl
ydb/public/sdk/cpp/client/ydb_proto
+ ydb/public/sdk/cpp/client/ydb_driver
)
END()
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 82b7a0658a..509eb2a231 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -2674,20 +2674,13 @@ bool TReadInitAndAuthActor::CheckTopicACL(
) {
auto& pqDescr = entry.PQGroupInfo->Description;
//ToDo[migration] - proper auth setup
- bool alwaysCheckPermissions = AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen();
- bool reqAuthRead = DoCheckACL && (
- pqDescr.GetPQTabletConfig().GetRequireAuthRead() || alwaysCheckPermissions
- );
-
- if (reqAuthRead && !CheckACLPermissionsForNavigate(
+ if (Token && !CheckACLPermissionsForNavigate(
entry.SecurityObject, topic, NACLib::EAccessRights::SelectRow,
"No ReadTopic permissions", ctx
)) {
return false;
}
- //ToDo[migration] - proper auth setup
- bool doCheckReadRules = AppData(ctx)->PQConfig.GetCheckACL() && (Token || alwaysCheckPermissions);
- if (doCheckReadRules) {
+ if (Token) {
bool found = false;
for (auto& cons : pqDescr.GetPQTabletConfig().GetReadRules() ) {
if (cons == ClientId) {
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index f7392816f2..6e5c662a7a 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -617,7 +617,7 @@ namespace {
writer.Write(SHORT_TOPIC_NAME, {"valuevaluevalue8"});
writer.Write(SHORT_TOPIC_NAME, {"valuevaluevalue9"});
- writer.Read(SHORT_TOPIC_NAME, "user1", "", false, false);
+ writer.Read(SHORT_TOPIC_NAME, "user", "", false, false);
}
Y_UNIT_TEST(SetupReadSession) {
@@ -769,7 +769,7 @@ namespace {
Y_UNIT_TEST(BigRead) {
NPersQueue::TTestServer server(PQSettings(0).SetDomainName("Root"));
- server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8*1024*1024, 86400, 20000000, "user1", 2000000);
+ server.AnnoyingClient->CreateTopic(DEFAULT_TOPIC_NAME, 1, 8*1024*1024, 86400, 20000000, "user", 2000000);
server.EnableLogs({ NKikimrServices::FLAT_TX_SCHEMESHARD, NKikimrServices::PERSQUEUE });
@@ -778,12 +778,12 @@ namespace {
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value);
// trying to read small PQ messages in a big messagebus event
- auto info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user1"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb
+ auto info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb
UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromDisk, 0);
UNIT_ASSERT_VALUES_EQUAL(info.BlobsFromCache, 4);
TInstant now(TInstant::Now());
- info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user1"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb
+ info = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 32, "user"}, 23, "", NMsgBusProxy::MSTATUS_OK); //will read 21mb
TDuration dur = TInstant::Now() - now;
UNIT_ASSERT_C(dur > TDuration::Seconds(7) && dur < TDuration::Seconds(20), "dur = " << dur); //speed limit is 2000kb/s and burst is 2000kb, so to read 24mb it will take at least 11 seconds
@@ -803,8 +803,8 @@ namespace {
for (ui32 i = 0; i < 32; ++i)
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value);
- auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user1"}, 16);
- auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user1"}, 16);
+ auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
+ auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);
UNIT_ASSERT_VALUES_EQUAL(info0.BlobsFromCache, 3);
UNIT_ASSERT_VALUES_EQUAL(info16.BlobsFromCache, 2);
@@ -813,8 +813,8 @@ namespace {
for (ui32 i = 0; i < 8; ++i)
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value);
- info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user1"}, 16);
- info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user1"}, 16);
+ info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
+ info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);
ui32 fromDisk = info0.BlobsFromDisk + info16.BlobsFromDisk;
ui32 fromCache = info0.BlobsFromCache + info16.BlobsFromCache;
@@ -3180,6 +3180,7 @@ namespace {
const ui32 topicsCount = 4;
for (ui32 i = 1; i <= topicsCount; ++i) {
TRequestCreatePQ createTopicRequest(TStringBuilder() << "rt3.dc1--topic_" << i, 1);
+ createTopicRequest.ReadRules.clear();
createTopicRequest.ReadRules.push_back("acc@user1");
createTopicRequest.ReadRules.push_back("acc@user2");
createTopicRequest.ReadRules.push_back("acc@user3");