aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-03-18 18:09:27 +0300
committeralexnick <alexnick@yandex-team.ru>2022-03-18 18:09:27 +0300
commitb568647b0c3df5ee1cf3f6a505fb2eaa8c29c1e4 (patch)
treea8443386568e9b7cb808bdfbbc16632066652ea4
parentf503ca1c2f03a8387374d95a55c32637dd09bff9 (diff)
downloadydb-b568647b0c3df5ee1cf3f6a505fb2eaa8c29c1e4.tar.gz
fix for read from timestamp 2.0 LOGBROKER-7158
ref:11127b0c0f81e0e0f39b4b4ae709d36b96343847
-rw-r--r--CMakeLists.darwin.txt1
-rw-r--r--CMakeLists.linux.txt1
-rw-r--r--ydb/core/persqueue/events/internal.h5
-rw-r--r--ydb/core/persqueue/partition.cpp93
-rw-r--r--ydb/core/persqueue/pq_impl.cpp3
-rw-r--r--ydb/core/persqueue/read.h2
-rw-r--r--ydb/core/persqueue/read_balancer.cpp2
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.h4
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp18
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_actor.h4
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp14
-rw-r--r--ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp2
-rw-r--r--ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin.txt59
-rw-r--r--ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux.txt60
-rw-r--r--ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.txt13
15 files changed, 228 insertions, 53 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 22e1b582be..730d1f5090 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -986,6 +986,7 @@ add_subdirectory(ydb/services/cms/ut)
add_subdirectory(ydb/services/datastreams/ut)
add_subdirectory(ydb/services/persqueue_cluster_discovery/ut)
add_subdirectory(ydb/services/persqueue_v1/ut)
+add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut)
add_subdirectory(ydb/services/rate_limiter/ut)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index a3112c0e0d..43e757b33c 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -1079,6 +1079,7 @@ add_subdirectory(ydb/services/cms/ut)
add_subdirectory(ydb/services/datastreams/ut)
add_subdirectory(ydb/services/persqueue_cluster_discovery/ut)
add_subdirectory(ydb/services/persqueue_v1/ut)
+add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut)
add_subdirectory(ydb/services/rate_limiter/ut)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter)
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index bc892988b4..ced72293ac 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -4,6 +4,7 @@
#include <ydb/core/protos/pqconfig.pb.h>
#include <ydb/core/tablet/tablet_counters.h>
+#include <ydb/core/persqueue/key.h>
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/core/actorid.h>
@@ -22,10 +23,11 @@ namespace NPQ {
ui32 Size;
TString Value;
bool Cached;
+ TKey Key;
TRequestedBlob() = delete;
- TRequestedBlob(ui64 offset, ui16 partNo, ui32 count, ui16 internalPartsCount, ui32 size, TString value)
+ TRequestedBlob(ui64 offset, ui16 partNo, ui32 count, ui16 internalPartsCount, ui32 size, TString value, const TKey& key)
: Offset(offset)
, PartNo(partNo)
, Count(count)
@@ -33,6 +35,7 @@ namespace NPQ {
, Size(size)
, Value(value)
, Cached(false)
+ , Key(key)
{}
};
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index 06f9161843..af28a8f8e2 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -2324,10 +2324,29 @@ TReadAnswer TReadInfo::FormAnswer(
AddResultDebugInfo(response, readResult);
- ui32 cnt = 0, pcnt = 0;
- ui32 size = 0, psize = 0;
+ ui32 cnt = 0;
+ ui32 size = 0;
+
+ ui32 lastBlobSize = 0;
const TVector<TRequestedBlob>& blobs = response->GetBlobs();
+ auto updateUsage = [&](const TClientBlob& blob){
+ size += blob.GetBlobSize();
+ lastBlobSize += blob.GetBlobSize();
+ if (blob.IsLastPart()) {
+ bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() &&
+ ReadTimestampMs > blob.WriteTimestamp.MilliSeconds();
+ ++cnt;
+ if (messageSkippingBehaviour) {
+ --cnt;
+ size -= lastBlobSize;
+ }
+ lastBlobSize = 0;
+ return (size >= Size || cnt >= Count);
+ }
+ return !AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && (size >= Size || cnt >= Count);
+ };
+
Y_VERIFY(blobs.size() == Blobs.size());
response->Check();
bool needStop = false;
@@ -2381,12 +2400,10 @@ TReadAnswer TReadInfo::FormAnswer(
<< (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount() << " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());
ui32 i = 0;
- for (i = pos; i < batch.Blobs.size() && size < Size && cnt < Count; ++i) {
- pcnt = cnt;
- psize = size;
+ for (i = pos; i < batch.Blobs.size(); ++i) {
TClientBlob &res = batch.Blobs[i];
VERIFY_RESULT_BLOB(res, i);
- size += res.GetBlobSize();
+
Y_VERIFY(PartNo == res.GetPartNo(), "pos %" PRIu32 " i %" PRIu32 " Offset %" PRIu64 " PartNo %" PRIu16 " offset %" PRIu64 " partNo %" PRIu16,
pos, i, Offset, PartNo, offset, res.GetPartNo());
@@ -2398,9 +2415,6 @@ TReadAnswer TReadInfo::FormAnswer(
}
AddResultBlob(readResult, res, Offset);
- if (res.IsLastPart()) {
- ++cnt;
- }
if (res.IsLastPart()) {
PartNo = 0;
@@ -2408,6 +2422,9 @@ TReadAnswer TReadInfo::FormAnswer(
} else {
++PartNo;
}
+ if (updateUsage(res)) {
+ break;
+ }
}
if (i != batch.Blobs.size()) {//not fully processed batch - next definetely will not be processed
@@ -2415,24 +2432,18 @@ TReadAnswer TReadInfo::FormAnswer(
}
}
}
- Y_VERIFY(pcnt <= Count && psize <= Size);
- Y_VERIFY(pcnt <= cnt && psize <= size);
- if (!needStop) { // body blobs are fully processed
+ if (!needStop && cnt < Count && size < Size) { // body blobs are fully processed and need to take more data
if (CachedOffset > Offset) {
+ lastBlobSize = 0;
Offset = CachedOffset;
}
for (const auto& writeBlob : Cached) {
- if (cnt >= Count || size >= Size)
- break;
- pcnt = cnt;
- psize = size;
VERIFY_RESULT_BLOB(writeBlob, 0u);
readResult->SetBlobsCachedSize(readResult->GetBlobsCachedSize() + writeBlob.GetBlobSize());
- size += writeBlob.GetBlobSize();
if (userInfo) {
userInfo->AddTimestampToCache(
Offset, writeBlob.WriteTimestamp, writeBlob.CreateTimestamp,
@@ -2440,14 +2451,15 @@ TReadAnswer TReadInfo::FormAnswer(
);
}
AddResultBlob(readResult, writeBlob, Offset);
+
if (writeBlob.IsLastPart()) {
++Offset;
- ++cnt;
+ }
+ if (updateUsage(writeBlob)) {
+ break;
}
}
}
- Y_VERIFY(pcnt <= Count && psize <= Size);
- Y_VERIFY(pcnt <= cnt && psize <= size);
Y_VERIFY(Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, Offset);
ui64 answerSize = answer->Response.ByteSize();
if (userInfo && Destination != 0) {
@@ -2508,11 +2520,11 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffse
cnt = it->Key.GetCount() - (startOffset - it->Key.GetOffset()); //don't count all elements from first blob
sz = (cnt == it->Key.GetCount() ? it->Size : 0); //not readed client blobs can be of ~8Mb, so don't count this size at all
}
- while (it != DataKeysBody.end() && size < maxSize && count < maxCount) {
+ while (it != DataKeysBody.end() && (size < maxSize && count < maxCount || count == 0)) { //count== 0 grants that blob with offset from ReadFromTimestamp will be readed
size += sz;
count += cnt;
TRequestedBlob reqBlob(it->Key.GetOffset(), it->Key.GetPartNo(), it->Key.GetCount(),
- it->Key.GetInternalPartsCount(), it->Size, TString());
+ it->Key.GetInternalPartsCount(), it->Size, TString(), it->Key);
blobs.push_back(reqBlob);
++it;
@@ -2539,6 +2551,7 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset,
pos = Head.FindPos(startOffset, partNo);
Y_VERIFY(pos != Max<ui32>());
}
+ ui32 lastBlobSize = 0;
for (;pos < Head.Batches.size(); ++pos)
{
@@ -2557,21 +2570,32 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset,
if (blobs[i].IsLastPart()) {
++offset;
pno = 0;
- if (!skip)
- ++count;
} else {
++pno;
}
- if (skip)
- continue;
- if (count > maxCount) // blob is counted already
- break;
- if (size >= maxSize)
- break;
+ if (skip) continue;
+ if (blobs[i].IsLastPart()) {
+ bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() &&
+ readTimestampMs > blobs[i].WriteTimestamp.MilliSeconds();
+ ++count;
+ if (messageSkippingBehaviour) { //do not count in limits; message will be skippend in proxy
+ --count;
+ size -= lastBlobSize;
+ }
+ lastBlobSize = 0;
+
+ if (count > maxCount) // blob is counted already
+ break;
+ if (size > maxSize)
+ break;
+ }
size += blobs[i].GetBlobSize();
+ lastBlobSize += blobs[i].GetBlobSize();
res.push_back(blobs[i]);
+
if (!firstAddedBlobOffset)
firstAddedBlobOffset = curOffset;
+
}
if (i < blobs.size()) // already got limit
break;
@@ -4593,10 +4617,11 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "read cookie " << cookie << " added " << info.Blobs.size()
<< " blobs, size " << size << " count " << count << " last offset " << lastOffset);
- ui64 insideHeadOffset{0};
- info.Cached = GetReadRequestFromHead(info.Offset, info.PartNo, info.Count, info.Size, info.ReadTimestampMs, &count, &size, &insideHeadOffset);
- info.CachedOffset = insideHeadOffset;
-
+ if (blobs.empty() || blobs.back().Key == DataKeysBody.back().Key) { // read from head only when all blobs from body processed
+ ui64 insideHeadOffset{0};
+ info.Cached = GetReadRequestFromHead(info.Offset, info.PartNo, info.Count, info.Size, info.ReadTimestampMs, &count, &size, &insideHeadOffset);
+ info.CachedOffset = insideHeadOffset;
+ }
if (info.Destination != 0) {
++userInfo.ActiveReads;
userInfo.UpdateReadingTimeAndState(ctx.Now());
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index f1b6397724..7aecc34eec 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -150,6 +150,8 @@ private:
partResp->SetSizeLag(res.GetSizeLag());
partResp->SetWaitQuotaTimeMs(partResp->GetWaitQuotaTimeMs() + res.GetWaitQuotaTimeMs());
+ partResp->SetRealReadOffset(Max(partResp->GetRealReadOffset(), res.GetRealReadOffset()));
+
for (ui32 i = 0; i < res.ResultSize(); ++i) {
bool isNewMsg = !res.GetResult(i).HasPartNo() || res.GetResult(i).GetPartNo() == 0;
if (!isStart) {
@@ -207,6 +209,7 @@ private:
records.Swap(partResp->MutableResult());
partResp->ClearResult();
for (auto & rec : records) {
+ partResp->SetRealReadOffset(Max(partResp->GetRealReadOffset(), rec.GetOffset()));
if (rec.GetWriteTimestampMS() >= readFromTimestampMs) {
auto result = partResp->AddResult();
result->CopyFrom(rec);
diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h
index d0bc79c692..92d9101051 100644
--- a/ydb/core/persqueue/read.h
+++ b/ydb/core/persqueue/read.h
@@ -275,7 +275,7 @@ namespace NPQ {
Y_VERIFY(strKey.size() == TKey::KeySize(), "Unexpected key size: %" PRIu64, strKey.size());
TString value = cmd.GetValue();
kvReq.Partition = key.GetPartition();
- TRequestedBlob blob(key.GetOffset(), key.GetPartNo(), key.GetCount(), key.GetInternalPartsCount(), value.size(), value);
+ TRequestedBlob blob(key.GetOffset(), key.GetPartNo(), key.GetCount(), key.GetInternalPartsCount(), value.size(), value, key);
kvReq.Blobs.push_back(blob);
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "CacheProxy. Passthrough blob. Partition "
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index 490e6ece3c..9c7f1e5c88 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -372,7 +372,7 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req
if (!Consumers.contains(user)) {
RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '"
<< (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? user : NPersQueue::ConvertOldConsumerName(user))
- << "' that allows to read topic from cluster '" << NPersQueue::GetDC(Topic)
+ << "' that allows to read topic from original cluster '" << NPersQueue::GetDC(Topic)
<< "'; may be there is read rule with mode all-original only and you are reading with mirrored topics. Change read-rule to mirror-to-<cluster> or options of reading process.", ctx);
return;
}
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h
index 4f5ade9c99..6dd1cf10e1 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.h
+++ b/ydb/library/persqueue/topic_parser/topic_parser.h
@@ -358,8 +358,8 @@ public:
TConverterPtr GetWriteTopicConverter(const TString& clientName, const TString& database);
TConverterFactoryPtr GetConverterFactory() const;
- bool GetHaveClusters() const { return HaveClusters;};
-
+ bool GetHaveClusters() const { return HaveClusters; }
+ TString GetLocalCluster() const { return LocalCluster; }
private:
TConverterFactoryPtr ConverterFactory;
bool HaveClusters;
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 03a6d06213..8a20c29de6 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -1079,14 +1079,16 @@ namespace NKikimr::NDataStreams::V1 {
auto topicInfo = navigate->ResultSet.begin();
StreamName = NKikimr::CanonizePath(topicInfo->Path);
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
+ NACLib::TUserToken token(this->Request_->GetInternalToken());
+
if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow,
- this->Request_->GetInternalToken())) {
+ token)) {
return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED,
Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
TStringBuilder() << "Access to stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
- << this->Request_->GetInternalToken(), ctx);
+ << token.GetUserSID(), ctx);
}
}
@@ -1223,14 +1225,16 @@ namespace NKikimr::NDataStreams::V1 {
const auto response = result->ResultSet.front();
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
+ NACLib::TUserToken token(this->Request_->GetInternalToken());
+
if (!response.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow,
- this->Request_->GetInternalToken())) {
+ token)) {
return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED,
Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
TStringBuilder() << "Access to stream "
<< ShardIterator.GetStreamName()
<< " is denied for subject "
- << this->Request_->GetInternalToken(), ctx);
+ << token.GetUserSID(), ctx);
}
}
@@ -1402,14 +1406,16 @@ namespace NKikimr::NDataStreams::V1 {
const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get();
auto topicInfo = navigate->ResultSet.front();
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
+ NACLib::TUserToken token(this->Request_->GetInternalToken());
+
if (!topicInfo.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow,
- this->Request_->GetInternalToken())) {
+ token)) {
return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED,
Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
TStringBuilder() << "Access to stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
- << this->Request_->GetInternalToken(), ctx);
+ << token.GetUserSID(), ctx);
}
}
diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h
index 55127d3f03..0e86e0fd7f 100644
--- a/ydb/services/persqueue_v1/grpc_pq_actor.h
+++ b/ydb/services/persqueue_v1/grpc_pq_actor.h
@@ -688,7 +688,7 @@ public:
TReadInitAndAuthActor(const TActorContext& ctx, const TActorId& parentId, const TString& clientId, const ui64 cookie,
const TString& session, const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TIntrusivePtr<NACLib::TUserToken> token,
- const NPersQueue::TTopicsToConverter& topics);
+ const NPersQueue::TTopicsToConverter& topics, const TString& localCluster);
~TReadInitAndAuthActor();
@@ -744,6 +744,8 @@ private:
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
bool DoCheckACL;
+
+ TString LocalCluster;
};
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 87591c7543..ba6b9fe5ef 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -781,7 +781,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo
}
AuthInitActor = ctx.Register(new TReadInitAndAuthActor(
ctx, ctx.SelfID, ClientId, Cookie, Session, SchemeCache, NewSchemeCache, Counters, Token,
- TopicsHandler.GetReadTopicsList(TopicsToResolve, ReadOnlyLocal, Request->GetDatabaseName().GetOrElse(TString()))
+ TopicsHandler.GetReadTopicsList(TopicsToResolve, ReadOnlyLocal, Request->GetDatabaseName().GetOrElse(TString())), TopicsHandler.GetLocalCluster()
));
@@ -1697,7 +1697,7 @@ void TReadSessionActor::HandleWakeup(const TActorContext& ctx) {
AuthInitActor = ctx.Register(new TReadInitAndAuthActor(
ctx, ctx.SelfID, ClientId, Cookie, Session, SchemeCache, NewSchemeCache, Counters, Token,
- TopicsHandler.GetReadTopicsList(TopicsToResolve, ReadOnlyLocal, Request->GetDatabaseName().GetOrElse(TString()))
+ TopicsHandler.GetReadTopicsList(TopicsToResolve, ReadOnlyLocal, Request->GetDatabaseName().GetOrElse(TString())), TopicsHandler.GetLocalCluster()
));
}
}
@@ -2617,7 +2617,7 @@ TReadInitAndAuthActor::TReadInitAndAuthActor(
const TActorContext& ctx, const TActorId& parentId, const TString& clientId, const ui64 cookie,
const TString& session, const NActors::TActorId& metaCache, const NActors::TActorId& newSchemeCache,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TIntrusivePtr<NACLib::TUserToken> token,
- const NPersQueue::TTopicsToConverter& topics
+ const NPersQueue::TTopicsToConverter& topics, const TString& localCluster
)
: ParentId(parentId)
, Cookie(cookie)
@@ -2628,6 +2628,7 @@ TReadInitAndAuthActor::TReadInitAndAuthActor(
, ClientPath(NPersQueue::ConvertOldConsumerName(ClientId, ctx))
, Token(token)
, Counters(counters)
+ , LocalCluster(localCluster)
{
for (const auto& t : topics) {
Topics[t.first].TopicNameConverter = t.second;
@@ -2739,7 +2740,7 @@ bool TReadInitAndAuthActor::CheckTopicACL(
)) {
return false;
}
- if (Token) {
+ if (Token || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
bool found = false;
for (auto& cons : pqDescr.GetPQTabletConfig().GetReadRules() ) {
if (cons == ClientId) {
@@ -2749,7 +2750,7 @@ bool TReadInitAndAuthActor::CheckTopicACL(
}
if (!found) {
CloseSession(
- TStringBuilder() << "no read rule provided for consumer '" << ClientPath << "' in topic '" << topic << "'",
+ TStringBuilder() << "no read rule provided for consumer '" << ClientPath << "' in topic '" << topic << "' in current cluster '" << LocalCluster,
PersQueue::ErrorCode::BAD_REQUEST, ctx
);
return false;
@@ -2874,7 +2875,8 @@ void TReadInfoActor::Bootstrap(const TActorContext& ctx) {
AuthInitActor = ctx.Register(new TReadInitAndAuthActor(
ctx, ctx.SelfID, ClientId, 0, TString("read_info:") + Request().GetPeerName(),
SchemeCache, NewSchemeCache, Counters, token,
- TopicsHandler.GetReadTopicsList(topicsToResolve, readOnlyLocal, Request().GetDatabaseName().GetOrElse(TString()))
+ TopicsHandler.GetReadTopicsList(topicsToResolve, readOnlyLocal, Request().GetDatabaseName().GetOrElse(TString())),
+ TopicsHandler.GetLocalCluster()
));
}
diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
index f5cccddafc..3d53b1c955 100644
--- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
@@ -537,7 +537,7 @@ namespace NKikimr::NPersQueueTests {
TPersQueueV1TestServer server;
{
- auto res = server.PersQueueClient->AddReadRule("/Root/acc/topic1", TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user")));
+ auto res = server.PersQueueClient->AddReadRule("/Root/acc/topic1", TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user1")));
res.Wait();
Cerr << "ADD RESULT " << res.GetValue().GetIssues().ToString() << "\n";
UNIT_ASSERT(res.GetValue().IsSuccess());
diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..25ffd55cc4
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin.txt
@@ -0,0 +1,59 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-persqueue_v1-ut-new_schemecache_ut)
+target_compile_options(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE
+ -DACTORLIB_HUGE_PB_SIZE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1
+)
+target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-persqueue_v1
+ library-cpp-getopt
+ library-cpp-svnversion
+ ydb-core-testlib
+ api-grpc
+ cpp-client-resources
+ ydb_persqueue_core-ut-ut_utils
+ cpp-client-ydb_table
+)
+target_sources(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
+)
+add_test(
+ NAME
+ ydb-services-persqueue_v1-ut-new_schemecache_ut
+ COMMAND
+ ydb-services-persqueue_v1-ut-new_schemecache_ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+target_link_flags(ydb-services-persqueue_v1-ut-new_schemecache_ut
+ PUBLIC
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+ -framework
+ CoreFoundation
+)
+vcs_info(ydb-services-persqueue_v1-ut-new_schemecache_ut)
diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux.txt
new file mode 100644
index 0000000000..e094561240
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux.txt
@@ -0,0 +1,60 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-persqueue_v1-ut-new_schemecache_ut)
+target_compile_options(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE
+ -DACTORLIB_HUGE_PB_SIZE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1
+)
+target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-persqueue_v1
+ library-cpp-getopt
+ library-cpp-svnversion
+ ydb-core-testlib
+ api-grpc
+ cpp-client-resources
+ ydb_persqueue_core-ut-ut_utils
+ cpp-client-ydb_table
+)
+target_sources(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
+)
+add_test(
+ NAME
+ ydb-services-persqueue_v1-ut-new_schemecache_ut
+ COMMAND
+ ydb-services-persqueue_v1-ut-new_schemecache_ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+target_link_flags(ydb-services-persqueue_v1-ut-new_schemecache_ut
+ PUBLIC
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+vcs_info(ydb-services-persqueue_v1-ut-new_schemecache_ut)
diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.txt
new file mode 100644
index 0000000000..a681d385f3
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.txt
@@ -0,0 +1,13 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (UNIX)
+ include(CMakeLists.linux.txt)
+endif()