diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-03-18 18:09:27 +0300 |
---|---|---|
committer | alexnick <alexnick@yandex-team.ru> | 2022-03-18 18:09:27 +0300 |
commit | b568647b0c3df5ee1cf3f6a505fb2eaa8c29c1e4 (patch) | |
tree | a8443386568e9b7cb808bdfbbc16632066652ea4 | |
parent | f503ca1c2f03a8387374d95a55c32637dd09bff9 (diff) | |
download | ydb-b568647b0c3df5ee1cf3f6a505fb2eaa8c29c1e4.tar.gz |
fix for read from timestamp 2.0 LOGBROKER-7158
ref:11127b0c0f81e0e0f39b4b4ae709d36b96343847
-rw-r--r-- | CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 5 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 93 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 3 | ||||
-rw-r--r-- | ydb/core/persqueue/read.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/read_balancer.cpp | 2 | ||||
-rw-r--r-- | ydb/library/persqueue/topic_parser/topic_parser.h | 4 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.cpp | 18 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_actor.h | 4 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_read_actor.cpp | 14 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin.txt | 59 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux.txt | 60 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.txt | 13 |
15 files changed, 228 insertions, 53 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 22e1b582be..730d1f5090 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -986,6 +986,7 @@ add_subdirectory(ydb/services/cms/ut) add_subdirectory(ydb/services/datastreams/ut) add_subdirectory(ydb/services/persqueue_cluster_discovery/ut) add_subdirectory(ydb/services/persqueue_v1/ut) +add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut) add_subdirectory(ydb/services/rate_limiter/ut) add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination) add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index a3112c0e0d..43e757b33c 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -1079,6 +1079,7 @@ add_subdirectory(ydb/services/cms/ut) add_subdirectory(ydb/services/datastreams/ut) add_subdirectory(ydb/services/persqueue_cluster_discovery/ut) add_subdirectory(ydb/services/persqueue_v1/ut) +add_subdirectory(ydb/services/persqueue_v1/ut/new_schemecache_ut) add_subdirectory(ydb/services/rate_limiter/ut) add_subdirectory(ydb/public/sdk/cpp/client/ydb_coordination) add_subdirectory(ydb/public/sdk/cpp/client/ydb_rate_limiter) diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index bc892988b4..ced72293ac 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -4,6 +4,7 @@ #include <ydb/core/protos/pqconfig.pb.h> #include <ydb/core/tablet/tablet_counters.h> +#include <ydb/core/persqueue/key.h> #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/core/actorid.h> @@ -22,10 +23,11 @@ namespace NPQ { ui32 Size; TString Value; bool Cached; + TKey Key; TRequestedBlob() = delete; - TRequestedBlob(ui64 offset, ui16 partNo, ui32 count, ui16 internalPartsCount, ui32 size, TString value) + TRequestedBlob(ui64 offset, ui16 partNo, ui32 count, ui16 internalPartsCount, ui32 size, TString value, const TKey& key) : Offset(offset) , PartNo(partNo) , Count(count) @@ -33,6 +35,7 @@ namespace NPQ { , Size(size) , Value(value) , Cached(false) + , Key(key) {} }; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 06f9161843..af28a8f8e2 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2324,10 +2324,29 @@ TReadAnswer TReadInfo::FormAnswer( AddResultDebugInfo(response, readResult); - ui32 cnt = 0, pcnt = 0; - ui32 size = 0, psize = 0; + ui32 cnt = 0; + ui32 size = 0; + + ui32 lastBlobSize = 0; const TVector<TRequestedBlob>& blobs = response->GetBlobs(); + auto updateUsage = [&](const TClientBlob& blob){ + size += blob.GetBlobSize(); + lastBlobSize += blob.GetBlobSize(); + if (blob.IsLastPart()) { + bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && + ReadTimestampMs > blob.WriteTimestamp.MilliSeconds(); + ++cnt; + if (messageSkippingBehaviour) { + --cnt; + size -= lastBlobSize; + } + lastBlobSize = 0; + return (size >= Size || cnt >= Count); + } + return !AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && (size >= Size || cnt >= Count); + }; + Y_VERIFY(blobs.size() == Blobs.size()); response->Check(); bool needStop = false; @@ -2381,12 +2400,10 @@ TReadAnswer TReadInfo::FormAnswer( << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount() << " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size()); ui32 i = 0; - for (i = pos; i < batch.Blobs.size() && size < Size && cnt < Count; ++i) { - pcnt = cnt; - psize = size; + for (i = pos; i < batch.Blobs.size(); ++i) { TClientBlob &res = batch.Blobs[i]; VERIFY_RESULT_BLOB(res, i); - size += res.GetBlobSize(); + Y_VERIFY(PartNo == res.GetPartNo(), "pos %" PRIu32 " i %" PRIu32 " Offset %" PRIu64 " PartNo %" PRIu16 " offset %" PRIu64 " partNo %" PRIu16, pos, i, Offset, PartNo, offset, res.GetPartNo()); @@ -2398,9 +2415,6 @@ TReadAnswer TReadInfo::FormAnswer( } AddResultBlob(readResult, res, Offset); - if (res.IsLastPart()) { - ++cnt; - } if (res.IsLastPart()) { PartNo = 0; @@ -2408,6 +2422,9 @@ TReadAnswer TReadInfo::FormAnswer( } else { ++PartNo; } + if (updateUsage(res)) { + break; + } } if (i != batch.Blobs.size()) {//not fully processed batch - next definetely will not be processed @@ -2415,24 +2432,18 @@ TReadAnswer TReadInfo::FormAnswer( } } } - Y_VERIFY(pcnt <= Count && psize <= Size); - Y_VERIFY(pcnt <= cnt && psize <= size); - if (!needStop) { // body blobs are fully processed + if (!needStop && cnt < Count && size < Size) { // body blobs are fully processed and need to take more data if (CachedOffset > Offset) { + lastBlobSize = 0; Offset = CachedOffset; } for (const auto& writeBlob : Cached) { - if (cnt >= Count || size >= Size) - break; - pcnt = cnt; - psize = size; VERIFY_RESULT_BLOB(writeBlob, 0u); readResult->SetBlobsCachedSize(readResult->GetBlobsCachedSize() + writeBlob.GetBlobSize()); - size += writeBlob.GetBlobSize(); if (userInfo) { userInfo->AddTimestampToCache( Offset, writeBlob.WriteTimestamp, writeBlob.CreateTimestamp, @@ -2440,14 +2451,15 @@ TReadAnswer TReadInfo::FormAnswer( ); } AddResultBlob(readResult, writeBlob, Offset); + if (writeBlob.IsLastPart()) { ++Offset; - ++cnt; + } + if (updateUsage(writeBlob)) { + break; } } } - Y_VERIFY(pcnt <= Count && psize <= Size); - Y_VERIFY(pcnt <= cnt && psize <= size); Y_VERIFY(Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, Offset); ui64 answerSize = answer->Response.ByteSize(); if (userInfo && Destination != 0) { @@ -2508,11 +2520,11 @@ TVector<TRequestedBlob> TPartition::GetReadRequestFromBody(const ui64 startOffse cnt = it->Key.GetCount() - (startOffset - it->Key.GetOffset()); //don't count all elements from first blob sz = (cnt == it->Key.GetCount() ? it->Size : 0); //not readed client blobs can be of ~8Mb, so don't count this size at all } - while (it != DataKeysBody.end() && size < maxSize && count < maxCount) { + while (it != DataKeysBody.end() && (size < maxSize && count < maxCount || count == 0)) { //count== 0 grants that blob with offset from ReadFromTimestamp will be readed size += sz; count += cnt; TRequestedBlob reqBlob(it->Key.GetOffset(), it->Key.GetPartNo(), it->Key.GetCount(), - it->Key.GetInternalPartsCount(), it->Size, TString()); + it->Key.GetInternalPartsCount(), it->Size, TString(), it->Key); blobs.push_back(reqBlob); ++it; @@ -2539,6 +2551,7 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, pos = Head.FindPos(startOffset, partNo); Y_VERIFY(pos != Max<ui32>()); } + ui32 lastBlobSize = 0; for (;pos < Head.Batches.size(); ++pos) { @@ -2557,21 +2570,32 @@ TVector<TClientBlob> TPartition::GetReadRequestFromHead(const ui64 startOffset, if (blobs[i].IsLastPart()) { ++offset; pno = 0; - if (!skip) - ++count; } else { ++pno; } - if (skip) - continue; - if (count > maxCount) // blob is counted already - break; - if (size >= maxSize) - break; + if (skip) continue; + if (blobs[i].IsLastPart()) { + bool messageSkippingBehaviour = AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && + readTimestampMs > blobs[i].WriteTimestamp.MilliSeconds(); + ++count; + if (messageSkippingBehaviour) { //do not count in limits; message will be skippend in proxy + --count; + size -= lastBlobSize; + } + lastBlobSize = 0; + + if (count > maxCount) // blob is counted already + break; + if (size > maxSize) + break; + } size += blobs[i].GetBlobSize(); + lastBlobSize += blobs[i].GetBlobSize(); res.push_back(blobs[i]); + if (!firstAddedBlobOffset) firstAddedBlobOffset = curOffset; + } if (i < blobs.size()) // already got limit break; @@ -4593,10 +4617,11 @@ void TPartition::ProcessRead(const TActorContext& ctx, TReadInfo&& info, const u LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "read cookie " << cookie << " added " << info.Blobs.size() << " blobs, size " << size << " count " << count << " last offset " << lastOffset); - ui64 insideHeadOffset{0}; - info.Cached = GetReadRequestFromHead(info.Offset, info.PartNo, info.Count, info.Size, info.ReadTimestampMs, &count, &size, &insideHeadOffset); - info.CachedOffset = insideHeadOffset; - + if (blobs.empty() || blobs.back().Key == DataKeysBody.back().Key) { // read from head only when all blobs from body processed + ui64 insideHeadOffset{0}; + info.Cached = GetReadRequestFromHead(info.Offset, info.PartNo, info.Count, info.Size, info.ReadTimestampMs, &count, &size, &insideHeadOffset); + info.CachedOffset = insideHeadOffset; + } if (info.Destination != 0) { ++userInfo.ActiveReads; userInfo.UpdateReadingTimeAndState(ctx.Now()); diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index f1b6397724..7aecc34eec 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -150,6 +150,8 @@ private: partResp->SetSizeLag(res.GetSizeLag()); partResp->SetWaitQuotaTimeMs(partResp->GetWaitQuotaTimeMs() + res.GetWaitQuotaTimeMs()); + partResp->SetRealReadOffset(Max(partResp->GetRealReadOffset(), res.GetRealReadOffset())); + for (ui32 i = 0; i < res.ResultSize(); ++i) { bool isNewMsg = !res.GetResult(i).HasPartNo() || res.GetResult(i).GetPartNo() == 0; if (!isStart) { @@ -207,6 +209,7 @@ private: records.Swap(partResp->MutableResult()); partResp->ClearResult(); for (auto & rec : records) { + partResp->SetRealReadOffset(Max(partResp->GetRealReadOffset(), rec.GetOffset())); if (rec.GetWriteTimestampMS() >= readFromTimestampMs) { auto result = partResp->AddResult(); result->CopyFrom(rec); diff --git a/ydb/core/persqueue/read.h b/ydb/core/persqueue/read.h index d0bc79c692..92d9101051 100644 --- a/ydb/core/persqueue/read.h +++ b/ydb/core/persqueue/read.h @@ -275,7 +275,7 @@ namespace NPQ { Y_VERIFY(strKey.size() == TKey::KeySize(), "Unexpected key size: %" PRIu64, strKey.size()); TString value = cmd.GetValue(); kvReq.Partition = key.GetPartition(); - TRequestedBlob blob(key.GetOffset(), key.GetPartNo(), key.GetCount(), key.GetInternalPartsCount(), value.size(), value); + TRequestedBlob blob(key.GetOffset(), key.GetPartNo(), key.GetCount(), key.GetInternalPartsCount(), value.size(), value, key); kvReq.Blobs.push_back(blob); LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "CacheProxy. Passthrough blob. Partition " diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 490e6ece3c..9c7f1e5c88 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -372,7 +372,7 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req if (!Consumers.contains(user)) { RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '" << (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() ? user : NPersQueue::ConvertOldConsumerName(user)) - << "' that allows to read topic from cluster '" << NPersQueue::GetDC(Topic) + << "' that allows to read topic from original cluster '" << NPersQueue::GetDC(Topic) << "'; may be there is read rule with mode all-original only and you are reading with mirrored topics. Change read-rule to mirror-to-<cluster> or options of reading process.", ctx); return; } diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h index 4f5ade9c99..6dd1cf10e1 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.h +++ b/ydb/library/persqueue/topic_parser/topic_parser.h @@ -358,8 +358,8 @@ public: TConverterPtr GetWriteTopicConverter(const TString& clientName, const TString& database); TConverterFactoryPtr GetConverterFactory() const; - bool GetHaveClusters() const { return HaveClusters;}; - + bool GetHaveClusters() const { return HaveClusters; } + TString GetLocalCluster() const { return LocalCluster; } private: TConverterFactoryPtr ConverterFactory; bool HaveClusters; diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 03a6d06213..8a20c29de6 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -1079,14 +1079,16 @@ namespace NKikimr::NDataStreams::V1 { auto topicInfo = navigate->ResultSet.begin(); StreamName = NKikimr::CanonizePath(topicInfo->Path); if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { + NACLib::TUserToken token(this->Request_->GetInternalToken()); + if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow, - this->Request_->GetInternalToken())) { + token)) { return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED, TStringBuilder() << "Access to stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " - << this->Request_->GetInternalToken(), ctx); + << token.GetUserSID(), ctx); } } @@ -1223,14 +1225,16 @@ namespace NKikimr::NDataStreams::V1 { const auto response = result->ResultSet.front(); if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { + NACLib::TUserToken token(this->Request_->GetInternalToken()); + if (!response.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow, - this->Request_->GetInternalToken())) { + token)) { return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED, TStringBuilder() << "Access to stream " << ShardIterator.GetStreamName() << " is denied for subject " - << this->Request_->GetInternalToken(), ctx); + << token.GetUserSID(), ctx); } } @@ -1402,14 +1406,16 @@ namespace NKikimr::NDataStreams::V1 { const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get(); auto topicInfo = navigate->ResultSet.front(); if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { + NACLib::TUserToken token(this->Request_->GetInternalToken()); + if (!topicInfo.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow, - this->Request_->GetInternalToken())) { + token)) { return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED, TStringBuilder() << "Access to stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " - << this->Request_->GetInternalToken(), ctx); + << token.GetUserSID(), ctx); } } diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h index 55127d3f03..0e86e0fd7f 100644 --- a/ydb/services/persqueue_v1/grpc_pq_actor.h +++ b/ydb/services/persqueue_v1/grpc_pq_actor.h @@ -688,7 +688,7 @@ public: TReadInitAndAuthActor(const TActorContext& ctx, const TActorId& parentId, const TString& clientId, const ui64 cookie, const TString& session, const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TIntrusivePtr<NACLib::TUserToken> token, - const NPersQueue::TTopicsToConverter& topics); + const NPersQueue::TTopicsToConverter& topics, const TString& localCluster); ~TReadInitAndAuthActor(); @@ -744,6 +744,8 @@ private: TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; bool DoCheckACL; + + TString LocalCluster; }; diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 87591c7543..ba6b9fe5ef 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -781,7 +781,7 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo } AuthInitActor = ctx.Register(new TReadInitAndAuthActor( ctx, ctx.SelfID, ClientId, Cookie, Session, SchemeCache, NewSchemeCache, Counters, Token, - TopicsHandler.GetReadTopicsList(TopicsToResolve, ReadOnlyLocal, Request->GetDatabaseName().GetOrElse(TString())) + TopicsHandler.GetReadTopicsList(TopicsToResolve, ReadOnlyLocal, Request->GetDatabaseName().GetOrElse(TString())), TopicsHandler.GetLocalCluster() )); @@ -1697,7 +1697,7 @@ void TReadSessionActor::HandleWakeup(const TActorContext& ctx) { AuthInitActor = ctx.Register(new TReadInitAndAuthActor( ctx, ctx.SelfID, ClientId, Cookie, Session, SchemeCache, NewSchemeCache, Counters, Token, - TopicsHandler.GetReadTopicsList(TopicsToResolve, ReadOnlyLocal, Request->GetDatabaseName().GetOrElse(TString())) + TopicsHandler.GetReadTopicsList(TopicsToResolve, ReadOnlyLocal, Request->GetDatabaseName().GetOrElse(TString())), TopicsHandler.GetLocalCluster() )); } } @@ -2617,7 +2617,7 @@ TReadInitAndAuthActor::TReadInitAndAuthActor( const TActorContext& ctx, const TActorId& parentId, const TString& clientId, const ui64 cookie, const TString& session, const NActors::TActorId& metaCache, const NActors::TActorId& newSchemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TIntrusivePtr<NACLib::TUserToken> token, - const NPersQueue::TTopicsToConverter& topics + const NPersQueue::TTopicsToConverter& topics, const TString& localCluster ) : ParentId(parentId) , Cookie(cookie) @@ -2628,6 +2628,7 @@ TReadInitAndAuthActor::TReadInitAndAuthActor( , ClientPath(NPersQueue::ConvertOldConsumerName(ClientId, ctx)) , Token(token) , Counters(counters) + , LocalCluster(localCluster) { for (const auto& t : topics) { Topics[t.first].TopicNameConverter = t.second; @@ -2739,7 +2740,7 @@ bool TReadInitAndAuthActor::CheckTopicACL( )) { return false; } - if (Token) { + if (Token || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { bool found = false; for (auto& cons : pqDescr.GetPQTabletConfig().GetReadRules() ) { if (cons == ClientId) { @@ -2749,7 +2750,7 @@ bool TReadInitAndAuthActor::CheckTopicACL( } if (!found) { CloseSession( - TStringBuilder() << "no read rule provided for consumer '" << ClientPath << "' in topic '" << topic << "'", + TStringBuilder() << "no read rule provided for consumer '" << ClientPath << "' in topic '" << topic << "' in current cluster '" << LocalCluster, PersQueue::ErrorCode::BAD_REQUEST, ctx ); return false; @@ -2874,7 +2875,8 @@ void TReadInfoActor::Bootstrap(const TActorContext& ctx) { AuthInitActor = ctx.Register(new TReadInitAndAuthActor( ctx, ctx.SelfID, ClientId, 0, TString("read_info:") + Request().GetPeerName(), SchemeCache, NewSchemeCache, Counters, token, - TopicsHandler.GetReadTopicsList(topicsToResolve, readOnlyLocal, Request().GetDatabaseName().GetOrElse(TString())) + TopicsHandler.GetReadTopicsList(topicsToResolve, readOnlyLocal, Request().GetDatabaseName().GetOrElse(TString())), + TopicsHandler.GetLocalCluster() )); } diff --git a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp index f5cccddafc..3d53b1c955 100644 --- a/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp @@ -537,7 +537,7 @@ namespace NKikimr::NPersQueueTests { TPersQueueV1TestServer server; { - auto res = server.PersQueueClient->AddReadRule("/Root/acc/topic1", TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user"))); + auto res = server.PersQueueClient->AddReadRule("/Root/acc/topic1", TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user1"))); res.Wait(); Cerr << "ADD RESULT " << res.GetValue().GetIssues().ToString() << "\n"; UNIT_ASSERT(res.GetValue().IsSuccess()); diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin.txt new file mode 100644 index 0000000000..25ffd55cc4 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.darwin.txt @@ -0,0 +1,59 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-persqueue_v1-ut-new_schemecache_ut) +target_compile_options(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE + -DACTORLIB_HUGE_PB_SIZE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1 +) +target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-persqueue_v1 + library-cpp-getopt + library-cpp-svnversion + ydb-core-testlib + api-grpc + cpp-client-resources + ydb_persqueue_core-ut-ut_utils + cpp-client-ydb_table +) +target_sources(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp +) +add_test( + NAME + ydb-services-persqueue_v1-ut-new_schemecache_ut + COMMAND + ydb-services-persqueue_v1-ut-new_schemecache_ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +target_link_flags(ydb-services-persqueue_v1-ut-new_schemecache_ut + PUBLIC + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation + -framework + CoreFoundation +) +vcs_info(ydb-services-persqueue_v1-ut-new_schemecache_ut) diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux.txt new file mode 100644 index 0000000000..e094561240 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.linux.txt @@ -0,0 +1,60 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-persqueue_v1-ut-new_schemecache_ut) +target_compile_options(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE + -DACTORLIB_HUGE_PB_SIZE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1 +) +target_link_libraries(ydb-services-persqueue_v1-ut-new_schemecache_ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-persqueue_v1 + library-cpp-getopt + library-cpp-svnversion + ydb-core-testlib + api-grpc + cpp-client-resources + ydb_persqueue_core-ut-ut_utils + cpp-client-ydb_table +) +target_sources(ydb-services-persqueue_v1-ut-new_schemecache_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_new_schemecache_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_common_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp +) +add_test( + NAME + ydb-services-persqueue_v1-ut-new_schemecache_ut + COMMAND + ydb-services-persqueue_v1-ut-new_schemecache_ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +target_link_flags(ydb-services-persqueue_v1-ut-new_schemecache_ut + PUBLIC + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +vcs_info(ydb-services-persqueue_v1-ut-new_schemecache_ut) diff --git a/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.txt b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.txt new file mode 100644 index 0000000000..a681d385f3 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/new_schemecache_ut/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX) + include(CMakeLists.linux.txt) +endif() |