diff options
author | komels <komels@yandex-team.ru> | 2022-03-14 18:58:57 +0300 |
---|---|---|
committer | komels <komels@yandex-team.ru> | 2022-03-14 18:58:57 +0300 |
commit | eed629ecd3803fa1423a5b83195610fd4703f27a (patch) | |
tree | 26e25ee8d350b55b5a2d22f50a6f53b46e69263c | |
parent | 61e7c7a08a105da2e4c779cd6a9ed2fb0f0ca1ad (diff) | |
download | ydb-eed629ecd3803fa1423a5b83195610fd4703f27a.tar.gz |
Refactor metacache in trunk and enable tests
ref:64500761c99ed6016f451306eb2bd6b719501902
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.cpp | 4 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_persqueue.h | 2 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.cpp | 192 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.h | 24 | ||||
-rw-r--r-- | ydb/core/client/server/pq_metacache_v2_ut.cpp | 157 | ||||
-rw-r--r-- | ydb/core/client/server/ut/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/client/server/ut/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/client/server/ut/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/testlib/mock_pq_metacache.h | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_write_actor.cpp | 2 |
10 files changed, 243 insertions, 143 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp index d14fd75ebef..347471e787b 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.cpp +++ b/ydb/core/client/server/msgbus_server_persqueue.cpp @@ -33,7 +33,7 @@ const TString& TopicPrefix(const TActorContext& ctx) { } TProcessingResult ProcessMetaCacheAllTopicsResponse(TEvPqNewMetaCache::TEvDescribeAllTopicsResponse::TPtr& ev) { - auto *res = ev->Get()->Result.Get(); + auto& res = ev->Get()->Result; const TString& path = ev->Get()->Path; TProcessingResult result; if (!ev->Get()->Success) { @@ -874,7 +874,7 @@ public: void Handle(TEvAllTopicsDescribeResponse::TPtr& ev, const TActorContext& ctx) { --DescribeRequests; - auto* res = ev->Get()->Result.Get(); + auto& res = ev->Get()->Result; auto processResult = ProcessMetaCacheAllTopicsResponse(ev); if (processResult.IsFatal) { ErrorReason = processResult.Reason; diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h index 52c67832e4e..1a915080e0c 100644 --- a/ydb/core/client/server/msgbus_server_persqueue.h +++ b/ydb/core/client/server/msgbus_server_persqueue.h @@ -145,7 +145,7 @@ protected: const TActorId PqMetaCache; THashMap<TActorId, THolder<TPerTopicInfo>> Children; size_t ChildrenAnswered = 0; - THolder<NSchemeCache::TSchemeCacheNavigate> SchemeCacheResponse; + std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> SchemeCacheResponse; // Nodes info const bool ListNodes; diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index 5fb9cf55e13..39f253e75a7 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -33,7 +33,7 @@ public: const NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval) : Counters(counters) - , ClientWrapper(std::move(std::make_unique<TClientWrapper>(grpcPort))) + , ClientWrapper(std::make_unique<TClientWrapper>(grpcPort)) , VersionCheckInterval(versionCheckInterval) , Generation(std::make_shared<TAtomicCounter>()) { @@ -47,6 +47,16 @@ public: { } + TPersQueueMetaCacheActor( + const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient, const NActors::TActorId& schemeBoardCacheId, + const TDuration& versionCheckInterval + ) + : ClientWrapper(std::make_unique<TClientWrapper>(tableClient)) + , VersionCheckInterval(versionCheckInterval) + , SchemeCacheId(schemeBoardCacheId) + , Generation(std::make_shared<TAtomicCounter>()) + { + } void Bootstrap(const TActorContext& ctx) { if (ClientWrapper == nullptr) { auto* driver = AppData(ctx)->YdbDriver; @@ -60,7 +70,7 @@ public: } ClientWrapper.reset(new TClientWrapper(driver)); } - SkipVersionCheck = !AppData(ctx)->PQConfig.GetMetaCacheSkipVersionCheck(); + SkipVersionCheck = AppData(ctx)->PQConfig.GetMetaCacheSkipVersionCheck(); PathPrefix = TopicPrefix(ctx); TopicsQuery = TStringBuilder() << "--!syntax_v1\n" << "DECLARE $Path as Utf8; DECLARE $Cluster as Utf8; " @@ -208,8 +218,8 @@ private: void HandleGetTopicsResult(const TActorContext& ctx) { auto result = AsyncQueryResult.GetValue(); if (!result.IsSuccess()) { - LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE, - "Got error trying to get topics: " << result.GetIssues().ToString()); + LOG_INFO_S(ctx, NKikimrServices::PQ_METACACHE, + "Got error trying to get topics: " << result.GetIssues().ToString()); return Reset(ctx); } Y_VERIFY(result.GetResultSets().size() == 1); @@ -233,18 +243,8 @@ private: FullTopicsCache = nullptr; while (!ListTopicsWaiters.empty()) { auto& waiter = ListTopicsWaiters.front(); - switch (waiter.Type) { - case EWaiterType::ListTopics: - SendListTopicsResponse(waiter.WaiterId, ctx); - break; - case EWaiterType::DescribeAllTopics: - ProcessDescribeAllTopics(waiter.WaiterId, ctx); - break; - default: - Y_FAIL(); - break; - } - ListTopicsWaiters.pop_front(); + ProcessDescribeAllTopics(waiter, ctx); + ListTopicsWaiters.pop(); } LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics list with : " << CurrentTopics.size() << " topics"); @@ -268,26 +268,25 @@ private: RunQuery(ev->Get()->Type, ctx); } - void HandleListTopics(TEvPqNewMetaCache::TEvListTopicsRequest::TPtr& ev, const TActorContext& ctx) { - if (!LastVersionUpdate || !EverGotTopics) { - ListTopicsWaiters.emplace_back(TWaiter{ev->Sender, {}, EWaiterType::ListTopics}); - } else { - SendListTopicsResponse(ev->Sender, ctx); - } + void HandleGetVersion(TEvPqNewMetaCache::TEvGetVersionRequest::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send version response: " << CurrentTopicsVersion); + + ctx.Send(ev->Sender, new TEvPqNewMetaCache::TEvGetVersionResponse{CurrentTopicsVersion}); } void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics with prefix: " << ev->Get()->PathPrefix); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics"); - SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), ctx); + SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ctx); auto inserted = DescribeTopicsWaiters.insert(std::make_pair( RequestId, - TWaiter{ev->Sender, std::move(ev->Get()->Topics), EWaiterType::DescribeCustomTopics} + TWaiter{ev->Sender, std::move(ev->Get()->Topics)} )).second; Y_VERIFY(inserted); } void HandleDescribeAllTopics(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe all topics"); if (ev->Get()->PathPrefix && ev->Get()->PathPrefix != PathPrefix) { auto* response = new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(ev->Get()->PathPrefix); response->Success = false; @@ -295,40 +294,34 @@ private: return; } if (!EverGotTopics) { - ListTopicsWaiters.emplace_back(TWaiter{ev->Sender, {}, EWaiterType::DescribeAllTopics}); + ListTopicsWaiters.push(ev->Sender); return; } return ProcessDescribeAllTopics(ev->Sender, ctx); } - void ProcessDescribeAllTopics(const TActorId& waiterId, const TActorContext& ctx) { + void ProcessDescribeAllTopics(const TActorId& waiter, const TActorContext& ctx) { if (EverGotTopics && CurrentTopics.empty()) { - SendDescribeAllTopicsResponse(waiterId, ctx, true); + SendDescribeAllTopicsResponse(waiter, ctx, true); return; } if (FullTopicsCache && !FullTopicsCacheOutdated) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Respond from cache"); - - return SendDescribeAllTopicsResponse(waiterId, ctx); + return SendDescribeAllTopicsResponse(waiter, ctx); + } + if (DescribeAllTopicsWaiters.empty()) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Make full list SC request"); + SendSchemeCacheRequest(CurrentTopics, true, true, ctx); } LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Store waiter"); - SendSchemeCacheRequest(CurrentTopics, true, ctx); - auto inserted = DescribeTopicsWaiters.insert(std::make_pair( - RequestId, - TWaiter{waiterId, {}, EWaiterType::DescribeAllTopics} - )).second; - Y_VERIFY(inserted); + DescribeAllTopicsWaiters.push(waiter); FullTopicsCacheOutdated = false; FullTopicsCache = nullptr; } void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { - auto* result = ev->Get()->Request.Release(); - auto waiterIter = DescribeTopicsWaiters.find(result->Instant); - Y_VERIFY(!waiterIter.IsEnd()); - auto& waiter = waiterIter->second; - - if (waiter.Type == EWaiterType::DescribeAllTopics) { + auto& result = ev->Get()->Request; + if (result->Instant == 0) { for (const auto& entry : result->ResultSet) { if (!entry.PQGroupInfo) { continue; @@ -342,23 +335,32 @@ private: FullTopicsCacheOutdated = true; } - FullTopicsCache.Reset(result); + FullTopicsCache.reset(result.Release()); LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics cache with " << FullTopicsCache->ResultSet.size()); - SendDescribeAllTopicsResponse(waiter.WaiterId, ctx); + while (!DescribeAllTopicsWaiters.empty()) { + SendDescribeAllTopicsResponse(DescribeAllTopicsWaiters.front(), ctx); + DescribeAllTopicsWaiters.pop(); + } } else { - Y_VERIFY(waiter.Type == EWaiterType::DescribeCustomTopics); + auto waiterIter = DescribeTopicsWaiters.find(result->Instant); + Y_VERIFY(!waiterIter.IsEnd()); + auto& waiter = waiterIter->second; + Y_VERIFY(waiter.Topics.size() == result->ResultSet.size()); auto *response = new TEvPqNewMetaCache::TEvDescribeTopicsResponse{ - std::move(waiter.Topics), result + std::move(waiter.Topics), result.Release() }; ctx.Send(waiter.WaiterId, response); + DescribeTopicsWaiters.erase(waiterIter); } - DescribeTopicsWaiters.erase(waiterIter); } - void SendSchemeCacheRequest(const TVector<TString>& topics, bool addDefaultPathPrefix, const TActorContext& ctx) { - auto schemeCacheRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(++RequestId); + void SendSchemeCacheRequest( + const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, const TActorContext& ctx + ) { + auto instant = isFullListingRequest ? 0 : ++RequestId; + auto schemeCacheRequest = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(instant); for (const auto& path : topics) { auto split = NKikimr::SplitPath(path); NSchemeCache::TSchemeCacheNavigate::TEntry entry; @@ -367,32 +369,33 @@ private: } entry.Path.insert(entry.Path.end(), split.begin(), split.end()); - entry.SyncVersion = true; + entry.SyncVersion = !isFullListingRequest; entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic; schemeCacheRequest->ResultSet.emplace_back(std::move(entry)); } - ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.release())); - } - - void SendListTopicsResponse(const TActorId& recipient, const TActorContext& ctx) { - auto* response = new TEvPqNewMetaCache::TEvListTopicsResponse(); - response->Topics = CurrentTopics; - response->TopicsVersion = CurrentTopicsVersion; - ctx.Send(recipient, response); + ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.Release())); } void SendDescribeAllTopicsResponse(const TActorId& recipient, const TActorContext& ctx, bool empty = false) { - NSchemeCache::TSchemeCacheNavigate* scResponse; if (empty) { - scResponse = new NSchemeCache::TSchemeCacheNavigate(); + ctx.Send( + recipient, + new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse( + PathPrefix, std::make_shared<NSchemeCache::TSchemeCacheNavigate>() + ) + ); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send empty describe all topics response"); + return; } else { - scResponse = new NSchemeCache::TSchemeCacheNavigate(*FullTopicsCache); + ctx.Send( + recipient, + new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse( + PathPrefix, FullTopicsCache + ) + ); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send describe all topics response with " << FullTopicsCache->ResultSet.size() << " topics"); + } - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send describe all topics response with " << scResponse->ResultSet.size() << " topics"); - auto* response = new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse( - PathPrefix, scResponse - ); - ctx.Send(recipient, response); } static NActors::TActorSystem* GetActorSystem() { @@ -406,16 +409,16 @@ public: } STRICT_STFUNC(StateFunc, - SFunc(NActors::TEvents::TEvWakeup, StartSession) - SFunc(TEvPqNewMetaCache::TEvSessionStarted, HandleSessionStarted) - SFunc(TEvPqNewMetaCache::TEvQueryPrepared, HandleTopicsQueryPrepared) - HFunc(TEvPqNewMetaCache::TEvQueryComplete, HandleQueryComplete) - HFunc(TEvPqNewMetaCache::TEvRestartQuery, HandleRestartQuery) - - HFunc(TEvPqNewMetaCache::TEvListTopicsRequest, HandleListTopics) - HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics) - HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics) - HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse) + SFunc(NActors::TEvents::TEvWakeup, StartSession) + SFunc(TEvPqNewMetaCache::TEvSessionStarted, HandleSessionStarted) + SFunc(TEvPqNewMetaCache::TEvQueryPrepared, HandleTopicsQueryPrepared) + HFunc(TEvPqNewMetaCache::TEvQueryComplete, HandleQueryComplete) + HFunc(TEvPqNewMetaCache::TEvRestartQuery, HandleRestartQuery) + + HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion) + HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics) + HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics) + HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse) ) private: @@ -428,11 +431,6 @@ private: struct TWaiter { TActorId WaiterId; TVector<TString> Topics; - EWaiterType Type; - - static TWaiter AllTopicsWaiter(const TActorId& waiterId) { - return TWaiter{waiterId, {}, EWaiterType::DescribeAllTopics}; - } }; struct TTopicKey { @@ -443,11 +441,6 @@ private: class TClientWrapper { public: - TClientWrapper(const TClientWrapper&) = delete; - TClientWrapper& operator=(const TClientWrapper&) = delete; - TClientWrapper(TClientWrapper&&) = default; - TClientWrapper& operator=(TClientWrapper&&) = default; - TClientWrapper(ui64 driverPort) : DriverPort(driverPort) {} @@ -456,6 +449,10 @@ private: : Driver(driver) {} + TClientWrapper(const TAtomicSharedPtr<TTableClient>& tableClient) + : TableClient(tableClient) + {} + void Reset() { if (DriverPort.Defined()) { // Own driver if (DriverHolder == nullptr) { @@ -463,18 +460,16 @@ private: auto driverConfig = NYdb::TDriverConfig().SetEndpoint(endpoint); DriverHolder.Reset(new NYdb::TDriver(driverConfig)); Driver = DriverHolder.Get(); - TableClient.reset(new NYdb::NTable::TTableClient(*Driver)); + TableClient.Reset(new NYdb::NTable::TTableClient(*Driver)); } } else if (Driver != nullptr) { - TableClient.reset(new NYdb::NTable::TTableClient(*Driver)); + TableClient = MakeAtomicShared<NYdb::NTable::TTableClient>(*Driver); } } - NYdb::NTable::TTableClient* GetClient() { Y_VERIFY(TableClient); - return TableClient.get(); + return TableClient.Get(); } - void Stop() { if (DriverHolder != nullptr) { TableClient->Stop(); @@ -487,8 +482,9 @@ private: private: THolder<NYdb::TDriver> DriverHolder; NYdb::TDriver* Driver = nullptr; + TAtomicSharedPtr<NYdb::NTable::TTableClient> TableClient; TMaybe<ui64> DriverPort; - std::unique_ptr<NYdb::NTable::TTableClient> TableClient; + }; NMonitoring::TDynamicCounterPtr Counters; @@ -512,11 +508,12 @@ private: TDuration VersionCheckInterval = TDuration::Seconds(1); TInstant LastVersionUpdate = TInstant::Zero(); - TDeque<TWaiter> ListTopicsWaiters; + TQueue<TActorId> ListTopicsWaiters; + TQueue<TActorId> DescribeAllTopicsWaiters; THashMap<ui64, TWaiter> DescribeTopicsWaiters; ui64 RequestId = 1; - THolder<NSchemeCache::TSchemeCacheNavigate> FullTopicsCache; + std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> FullTopicsCache; bool FullTopicsCacheOutdated = false; NActors::TActorId SchemeCacheId; TString PathPrefix; @@ -533,6 +530,13 @@ IActor* CreatePQMetaCache(ui64 grpcPort, const NMonitoring::TDynamicCounterPtr& return new TPersQueueMetaCacheActor(grpcPort, counters, versionCheckInterval); } +IActor* CreatePQMetaCache( + const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient, const NActors::TActorId& schemeBoardCacheId, + const TDuration& versionCheckInterval +) { + return new TPersQueueMetaCacheActor(tableClient, schemeBoardCacheId, versionCheckInterval); +} + } // namespace NPqMetaCacheV2 } // namespace NKikimr::NMsgBusProxy diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h index 6c12b76b041..a125e7b81ac 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.h +++ b/ydb/core/client/server/msgbus_server_pq_metacache.h @@ -42,8 +42,8 @@ struct TEvPqNewMetaCache { EvQueryPrepared, EvQueryComplete, EvRestartQuery, - EvListTopicsRequest, - EvListTopicsResponse, + EvGetVersionRequest, + EvGetVersionResponse, EvDescribeTopicsRequest, EvDescribeTopicsResponse, EvDescribeAllTopicsRequest, @@ -74,11 +74,14 @@ struct TEvPqNewMetaCache { : Type(type) {} }; - struct TEvListTopicsRequest : public TEventLocal<TEvListTopicsRequest, EvListTopicsRequest> { + struct TEvGetVersionRequest : public TEventLocal<TEvGetVersionRequest, EvGetVersionRequest> { }; - struct TEvListTopicsResponse : public TEventLocal<TEvListTopicsResponse, EvListTopicsResponse> { - TVector<TString> Topics; + struct TEvGetVersionResponse : public TEventLocal<TEvGetVersionResponse, EvGetVersionResponse> { + TEvGetVersionResponse(ui64 version) + : TopicsVersion(version) + {} + ui64 TopicsVersion; }; @@ -100,7 +103,7 @@ struct TEvPqNewMetaCache { struct TEvDescribeTopicsResponse : public TEventLocal<TEvDescribeTopicsResponse, EvDescribeTopicsResponse> { TVector<TString> TopicsRequested; - THolder<NSchemeCache::TSchemeCacheNavigate> Result; + std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> Result; explicit TEvDescribeTopicsResponse(const TVector<TString>& topics, NSchemeCache::TSchemeCacheNavigate* result) @@ -120,11 +123,11 @@ struct TEvPqNewMetaCache { struct TEvDescribeAllTopicsResponse : public TEventLocal<TEvDescribeAllTopicsResponse, EvDescribeAllTopicsResponse> { bool Success = true; TString Path; - THolder<NSchemeCache::TSchemeCacheNavigate> Result; + std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> Result; explicit TEvDescribeAllTopicsResponse(const TString& path) : Path(path) {} - TEvDescribeAllTopicsResponse(const TString& path, NSchemeCache::TSchemeCacheNavigate* result) + TEvDescribeAllTopicsResponse(const TString& path, const std::shared_ptr<NSchemeCache::TSchemeCacheNavigate>& result) : Path(path) , Result(result) {} @@ -137,6 +140,11 @@ IActor* CreatePQMetaCache(ui64 grpcPort, const NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval = TDuration::Seconds(1)); +IActor* CreatePQMetaCache(const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient, + const NActors::TActorId& schemeBoardCacheId, + const TDuration& versionCheckInterval = TDuration::Seconds(1)); + + } // namespace NPqMetaCacheV2 } //namespace NKikimr::NMsgBusProxy diff --git a/ydb/core/client/server/pq_metacache_v2_ut.cpp b/ydb/core/client/server/pq_metacache_v2_ut.cpp index f02fd420c9a..39383a462b1 100644 --- a/ydb/core/client/server/pq_metacache_v2_ut.cpp +++ b/ydb/core/client/server/pq_metacache_v2_ut.cpp @@ -27,7 +27,8 @@ class TPqMetaCacheV2Test: public TTestBase { settings.SetDomainName("Root"); settings.SetDomain(0); settings.SetUseRealThreads(true); - Server = new NKikimr::Tests::TServer(settings); + //settings.PQConfig.SetMetaCacheSkipVersionCheck(true); + Server = MakeHolder<NKikimr::Tests::TServer>(settings); Server->EnableGRpc(NGrpc::TServerOptions().SetHost("localhost").SetPort(GrpcServerPort)); auto* runtime = Server->GetRuntime(); @@ -38,15 +39,15 @@ class TPqMetaCacheV2Test: public TTestBase { runtime->SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_EMERG); runtime->SetLogPriority(NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, NActors::NLog::PRI_EMERG); - Client = MakeHolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(settings, GrpcServerPort); + Client = std::make_shared<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(settings, GrpcServerPort); Client->InitRootScheme(); NYdb::TDriverConfig driverCfg; driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << GrpcServerPort); - YdbDriver = new NYdb::TDriver(driverCfg); - TableClient = new NYdb::NTable::TTableClient(*YdbDriver); - PQClient = new NYdb::NPersQueue::TPersQueueClient(*YdbDriver); + YdbDriver = std::make_shared<NYdb::TDriver>(driverCfg); + TableClient.Reset(new NYdb::NTable::TTableClient(*YdbDriver)); + PQClient = std::make_shared<NYdb::NPersQueue::TPersQueueClient>(*YdbDriver); Client->MkDir("/Root", "PQ"); Client->MkDir("/Root/PQ", "Config"); @@ -63,7 +64,7 @@ class TPqMetaCacheV2Test: public TTestBase { tableDesc = TTableBuilder() .AddNullableColumn("name", EPrimitiveType::String) - .AddNullableColumn("version", EPrimitiveType::Uint64) + .AddNullableColumn("version", EPrimitiveType::Int64) .SetPrimaryKeyColumns({"name", "version"}) .Build(); CheckYdbResult(session.CreateTable("/Root/PQ/Config/V2/Versions", std::move(tableDesc))); @@ -78,7 +79,7 @@ class TPqMetaCacheV2Test: public TTestBase { } SchemeCacheId = runtime->Register(CreateSchemeBoardSchemeCache(config.Get())); MetaCacheId = runtime->Register( - NPqMetaCacheV2::CreatePQMetaCache(config->Counters, TDuration::MilliSeconds(50)) + NPqMetaCacheV2::CreatePQMetaCache(GrpcServerPort, config->Counters, TDuration::MilliSeconds(50)) ); runtime->EnableScheduleForActor(SchemeCacheId, true); runtime->EnableScheduleForActor(MetaCacheId, true); @@ -119,7 +120,7 @@ class TPqMetaCacheV2Test: public TTestBase { auto builder = versionPrepared.GetParamsBuilder(); { auto ¶m = builder.AddParam("$Version"); - param.Uint64(++Version); + param.Int64(++Version); param.Build(); } CheckYdbResult(versionPrepared.Execute(txControl, builder.Build())); @@ -130,24 +131,33 @@ class TPqMetaCacheV2Test: public TTestBase { } } - THolder<TEvPqNewMetaCache::TEvDescribeTopicsResponse> DoMetaCacheRequest(const TVector<TTopicInfo>& topicList) { + template<class TEvType> + THolder<TEvType> DoMetaCacheRequest(const TVector<TTopicInfo>& topicList = {}) { IEventBase* ev; if (topicList.empty()) { ev = new TEvPqNewMetaCache::TEvDescribeAllTopicsRequest(); } else { TVector<TString> topicNames; for (const auto &topic : topicList) { - topicNames.emplace_back(std::move(::NPersQueue::BuildFullTopicName(topic.Path, topic.Cluster))); + topicNames.emplace_back(TString("/Root/PQ/") + ::NPersQueue::BuildFullTopicName(topic.Path, topic.Cluster)); } ev = new TEvPqNewMetaCache::TEvDescribeTopicsRequest(topicNames); } auto handle = new IEventHandle(MetaCacheId, EdgeActorId, ev); Server->GetRuntime()->Send(handle); - auto response = Server->GetRuntime()->GrabEdgeEvent<TEvPqNewMetaCache::TEvDescribeTopicsResponse>(); + auto response = Server->GetRuntime()->GrabEdgeEvent<TEvType>(); return std::move(response); } - void CheckTopicInfo(const TVector<TTopicInfo>& expected, TSchemeCacheNavigate* result) { + THolder<TEvPqNewMetaCache::TEvDescribeTopicsResponse> DescribeTopics(const TVector<TTopicInfo>& topicList) { + return DoMetaCacheRequest<TEvPqNewMetaCache::TEvDescribeTopicsResponse>(topicList); + } + + THolder<TEvPqNewMetaCache::TEvDescribeAllTopicsResponse> DescribeAllTopics() { + return DoMetaCacheRequest<TEvPqNewMetaCache::TEvDescribeAllTopicsResponse>(); + } + + void CheckTopicInfo(const TVector<TTopicInfo>& expected, std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> result) { ui64 i = 0; Cerr << "=== Got cache navigate response: \n" << result->ToString(NKikimr::NScheme::TTypeRegistry()) << Endl; Cerr << "=== Expect to have " << expected.size() << " records, got: " << result->ResultSet.size() << " records" << Endl; @@ -170,9 +180,9 @@ class TPqMetaCacheV2Test: public TTestBase { auto endTime = TInstant::Now() + timeout; auto* runtime = Server->GetRuntime(); while (endTime > TInstant::Now()) { - auto handle = new IEventHandle(MetaCacheId, EdgeActorId, new TEvPqNewMetaCache::TEvListTopicsRequest()); + auto handle = new IEventHandle(MetaCacheId, EdgeActorId, new TEvPqNewMetaCache::TEvGetVersionRequest()); runtime->Send(handle); - auto response = runtime->GrabEdgeEvent<TEvPqNewMetaCache::TEvListTopicsResponse>(); + auto response = runtime->GrabEdgeEvent<TEvPqNewMetaCache::TEvGetVersionResponse>(); currentVersion = response->TopicsVersion; if (currentVersion >= version) { Cerr << "=== Got current topics version: " << currentVersion << Endl; @@ -183,51 +193,126 @@ class TPqMetaCacheV2Test: public TTestBase { UNIT_FAIL("Wait for topics version timed out"); } + TActorId MakeEdgeTargetedMetaCache() { + auto anotherMetaCacheId = Server->GetRuntime()->Register( + NPqMetaCacheV2::CreatePQMetaCache(TableClient, EdgeActorId) + + ); + return anotherMetaCacheId; + } + void TestDescribeTopics() { auto topics = TVector<TTopicInfo>({ - {"topic1", "man", true}, - {"topic2", "man", true}, - {"topic3", "man", false} - }); + {"topic1", "man", true}, + {"topic2", "man", true}, + {"topic3", "man", false} + }); AddTopics(topics); - auto ev = DoMetaCacheRequest(topics); - CheckTopicInfo(topics, ev->Result.Get()); + auto ev = DescribeTopics(topics); + CheckTopicInfo(topics, ev->Result); UNIT_ASSERT(ev); } + + void TestDescribeManyTopics() { + auto topics = TVector<TTopicInfo>({ + {"topic1", "man", true}, + {"topic2", "man", false}, + {"topic3", "man", false} + }); + + AddTopics(topics); + + Cerr << "===Wait base response\n"; + auto baseResponse = DescribeTopics(topics); + Cerr << "===Got base response\n"; + auto baseEntry = baseResponse->Result->ResultSet[0]; + + TVector<TTopicInfo> lotOfTopics; + for (auto i = 0u; i < 10000u; i++) { + lotOfTopics.emplace_back(TTopicInfo{TString("topic0") + ToString(i), "man", false}); + } + AddTopics(lotOfTopics); + ui64 totalTopics = topics.size() + lotOfTopics.size(); + auto secondMetaCache = MakeEdgeTargetedMetaCache(); + Cerr << "===Registered secondary meta cache: " << secondMetaCache.ToString() << Endl; + auto* runtime = Server->GetRuntime(); + + auto sendRequest = [&]() { + auto* ev = new TEvPqNewMetaCache::TEvDescribeAllTopicsRequest(); + auto handle = new IEventHandle(secondMetaCache, EdgeActorId, ev); + runtime->Send(handle); + }; + Cerr << "===Send first request\n"; + sendRequest(); + + auto scRequest = runtime->GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySet>(); + Cerr << "===Got SC request\n"; + auto& entries = scRequest->Request->ResultSet; + UNIT_ASSERT_VALUES_EQUAL(entries.size(), totalTopics); + for (auto i = 0u; i < entries.size(); i++) { + auto initialPath = entries[i].Path; + entries[i] = baseEntry; + entries[i].Path = initialPath; + } + auto* scResponse = new TEvTxProxySchemeCache::TEvNavigateKeySetResult(scRequest->Request); + + auto handle = new IEventHandle(secondMetaCache, EdgeActorId, scResponse); + Cerr << "===Send fake SC response\n"; + runtime->Send(handle); + + Cerr << "===Wait secondary metacache response\n"; + auto response = runtime->GrabEdgeEvent<TEvPqNewMetaCache::TEvDescribeAllTopicsResponse>(); + Cerr << "===Got secondary metacache response\n"; + + // Preparation done, now can send many requests + Cerr << "===Started pushing requests at: " << TInstant::Now() << Endl; + for (auto i = 0u; i < 1000; i++) { + sendRequest(); + } + Cerr << "===Done pushing requests at: " << TInstant::Now() << Endl; + for (auto i = 0u; i < 1000; i++) { + response = runtime->GrabEdgeEvent<TEvPqNewMetaCache::TEvDescribeAllTopicsResponse>(); + if (!i) { + Cerr << "===Done gathering first response at: " << TInstant::Now() << Endl; + } + } + Cerr << "===Done gathering responses at: " << TInstant::Now() << Endl; + } + void TestDescribeAllTopics() { auto topics = TVector<TTopicInfo>({ - {"topic1", "man", true}, - {"topic2", "man", true}, - {"topic3", "man", false} + {"topic1", "man", true}, + {"topic2", "man", true}, + {"topic3", "man", false} }); AddTopics(topics); - auto ev = DoMetaCacheRequest({}); - CheckTopicInfo(topics, ev->Result.Get()); + auto ev = DescribeAllTopics(); + CheckTopicInfo(topics, ev->Result); UNIT_ASSERT(ev); } void TestTopicsUpdate() { auto topics = TVector<TTopicInfo>({ - {"topic1", "man", true}, - {"topic2", "man", true}, - {"topic3", "man", false} - }); + {"topic1", "man", true}, + {"topic2", "man", true}, + {"topic3", "man", false} + }); AddTopics(topics); - CheckTopicInfo(topics, DoMetaCacheRequest(topics)->Result.Get()); + CheckTopicInfo(topics, DescribeTopics(topics)->Result); WaitForVersion(Version); TTopicInfo topic{"topic1", "sas", true}; AddTopics({topic}, false); - CheckTopicInfo(topics, DoMetaCacheRequest({})->Result.Get()); + CheckTopicInfo(topics, DescribeAllTopics()->Result); AddTopics({}, true); - Cerr << "===Wait for version: " << Version << Endl; WaitForVersion(Version); topics.insert(topics.end() - 1, topic); - CheckTopicInfo(topics, DoMetaCacheRequest({})->Result.Get()); + CheckTopicInfo(topics, DescribeAllTopics()->Result); } + template<class T> T CheckYdbResult(NThreading::TFuture<T>&& asyncResult) { auto res = asyncResult.GetValueSync(); @@ -259,7 +344,7 @@ private: std::shared_ptr<NKikimr::NPersQueueTests::TFlatMsgBusPQClient> Client; std::shared_ptr<NYdb::TDriver> YdbDriver; - std::shared_ptr<NYdb::NTable::TTableClient> TableClient; + TAtomicSharedPtr<NYdb::NTable::TTableClient> TableClient; std::shared_ptr<NYdb::NPersQueue::TPersQueueClient> PQClient; TString UpsertTopicQuery = TStringBuilder() << "--!syntax_v1\n" @@ -268,8 +353,8 @@ private: << "(path, dc) VALUES ($Path, $Cluster);"; TString UpdateVersionQuery = TStringBuilder() << "--!syntax_v1\n" - << "DECLARE $Version as Uint64; " - << "UPSERT INTO `/Root/PQ/Config/V2/Version`" + << "DECLARE $Version as Int64; " + << "UPSERT INTO `/Root/PQ/Config/V2/Versions`" << "(name, version) VALUES ('Topics', $Version);"; diff --git a/ydb/core/client/server/ut/CMakeLists.darwin.txt b/ydb/core/client/server/ut/CMakeLists.darwin.txt index 15ec30f11a2..b993911e557 100644 --- a/ydb/core/client/server/ut/CMakeLists.darwin.txt +++ b/ydb/core/client/server/ut/CMakeLists.darwin.txt @@ -27,6 +27,7 @@ target_link_libraries(ydb-core-client-server-ut PUBLIC core-testlib-actors ) target_sources(ydb-core-client-server-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/client/server/pq_metacache_v2_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp ) add_test( diff --git a/ydb/core/client/server/ut/CMakeLists.linux.txt b/ydb/core/client/server/ut/CMakeLists.linux.txt index f4b6f53ae04..8e7efa1338f 100644 --- a/ydb/core/client/server/ut/CMakeLists.linux.txt +++ b/ydb/core/client/server/ut/CMakeLists.linux.txt @@ -28,6 +28,7 @@ target_link_libraries(ydb-core-client-server-ut PUBLIC core-testlib-actors ) target_sources(ydb-core-client-server-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/client/server/pq_metacache_v2_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp ) add_test( diff --git a/ydb/core/client/server/ut/ya.make b/ydb/core/client/server/ut/ya.make index 409979461df..493afd12e85 100644 --- a/ydb/core/client/server/ut/ya.make +++ b/ydb/core/client/server/ut/ya.make @@ -21,6 +21,7 @@ PEERDIR( YQL_LAST_ABI_VERSION() SRCS( + pq_metacache_v2_ut.cpp msgbus_server_pq_metarequest_ut.cpp ) diff --git a/ydb/core/testlib/mock_pq_metacache.h b/ydb/core/testlib/mock_pq_metacache.h index 1280f877792..e132cd5f9ac 100644 --- a/ydb/core/testlib/mock_pq_metacache.h +++ b/ydb/core/testlib/mock_pq_metacache.h @@ -69,7 +69,7 @@ public: response->Success = success; auto* result = new NSchemeCache::TSchemeCacheNavigate(); result->ResultSet = resultSet; - response->Result.Reset(result); + response->Result.reset(result); ctx.Send(ev->Sender, std::move(response)); }; diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp index f208728706a..6531afc7d3b 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp @@ -384,7 +384,7 @@ void TWriteSessionActor::InitCheckSchema(const TActorContext& ctx, bool needWait } void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx) { - auto* res = ev->Get()->Result.Get(); + auto& res = ev->Get()->Result; Y_VERIFY(res->ResultSet.size() == 1); auto& entry = res->ResultSet[0]; |