diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-03-14 20:16:27 +0300 |
---|---|---|
committer | alexnick <alexnick@yandex-team.ru> | 2022-03-14 20:16:27 +0300 |
commit | 37fa27eedb4ff54da776b751356b9b4cce2a6961 (patch) | |
tree | e52955d9be907f85e29a54edb639f3a6374232bd | |
parent | b3566c4940538fab9ce3687168f46d228f25db07 (diff) | |
download | ydb-37fa27eedb4ff54da776b751356b9b4cce2a6961.tar.gz |
use KQP instead of driver LOGBROKER-7358
ref:937549cf701e3fefdaecf3f99e9a64eccfdd77a4
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.cpp | 350 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.h | 30 | ||||
-rw-r--r-- | ydb/core/client/server/pq_metacache_v2_ut.cpp | 7 |
3 files changed, 123 insertions, 264 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index 39f253e75a7..45211485f5d 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -2,13 +2,17 @@ #include "msgbus_server_pq_metacache.h" #include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h> +#include <ydb/public/lib/deprecated/kicli/kicli.h> + #include <ydb/library/persqueue/topic_parser/topic_parser.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/tx/scheme_board/cache.h> #include <ydb/core/base/counters.h> - +#include <ydb/core/kqp/kqp.h> #include <ydb/core/base/appdata.h> +#include <ydb/core/persqueue/pq_database.h> + namespace NKikimr::NMsgBusProxy { @@ -16,8 +20,10 @@ using namespace NYdb::NTable; namespace NPqMetaCacheV2 { -IActor* CreateSchemeCache(NActors::TActorSystem* ActorSystem, TIntrusivePtr<NMonitoring::TDynamicCounters> counters) { - auto appData = ActorSystem->AppData<TAppData>(); + + +IActor* CreateSchemeCache(const TActorContext& ctx, TIntrusivePtr<NMonitoring::TDynamicCounters> counters) { + auto appData = AppData(ctx); auto cacheCounters = GetServiceCounters(counters, "pqproxy|schemecache"); auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters); return CreateSchemeBoardSchemeCache(cacheConfig.Get()); @@ -29,48 +35,36 @@ public: TPersQueueMetaCacheActor(TPersQueueMetaCacheActor&&) = default; TPersQueueMetaCacheActor& operator=(TPersQueueMetaCacheActor&&) = default; - TPersQueueMetaCacheActor(ui64 grpcPort, - const NMonitoring::TDynamicCounterPtr& counters, - const TDuration& versionCheckInterval) - : Counters(counters) - , ClientWrapper(std::make_unique<TClientWrapper>(grpcPort)) - , VersionCheckInterval(versionCheckInterval) - , Generation(std::make_shared<TAtomicCounter>()) - { - } - TPersQueueMetaCacheActor(const NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval) : Counters(counters) , VersionCheckInterval(versionCheckInterval) - , Generation(std::make_shared<TAtomicCounter>()) + , Generation(std::make_shared<TAtomicCounter>(100)) { } TPersQueueMetaCacheActor( - const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient, const NActors::TActorId& schemeBoardCacheId, + const NActors::TActorId& schemeBoardCacheId, const TDuration& versionCheckInterval ) - : ClientWrapper(std::make_unique<TClientWrapper>(tableClient)) - , VersionCheckInterval(versionCheckInterval) + : VersionCheckInterval(versionCheckInterval) , SchemeCacheId(schemeBoardCacheId) , Generation(std::make_shared<TAtomicCounter>()) { } + void Bootstrap(const TActorContext& ctx) { - if (ClientWrapper == nullptr) { - auto* driver = AppData(ctx)->YdbDriver; - if (driver == nullptr) { - LOG_WARN_S( - ctx, NKikimrServices::PQ_METACACHE, - "Initialized without valid YDB driver - suppose misconfiguration. Refuse to work. Die." - ); - Die(ctx); - return; - } - ClientWrapper.reset(new TClientWrapper(driver)); + Become(&TPersQueueMetaCacheActor::StateFunc); + + if (!SchemeCacheId) { + SchemeCacheId = Register(CreateSchemeCache(ctx, Counters)); } + + if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) + return; + SkipVersionCheck = AppData(ctx)->PQConfig.GetMetaCacheSkipVersionCheck(); + PathPrefix = TopicPrefix(ctx); TopicsQuery = TStringBuilder() << "--!syntax_v1\n" << "DECLARE $Path as Utf8; DECLARE $Cluster as Utf8; " @@ -80,157 +74,117 @@ public: VersionQuery = TStringBuilder() << "--!syntax_v1\nSELECT version FROM `" << PathPrefix << "Config/V2/Versions` " << "WHERE name = 'Topics';"; PathPrefixParts = NKikimr::SplitPath(PathPrefix); - Become(&TPersQueueMetaCacheActor::StateFunc); - if (!SchemeCacheId) { - SchemeCacheId = Register(CreateSchemeCache(GetActorSystem(), Counters)); - } - ActorSystem = GetActorSystem(); Reset(ctx); } ~TPersQueueMetaCacheActor() { - Generation->Inc(); - if (ClientWrapper) - ClientWrapper->Stop(); } private: - template<class TEventType, class TFutureType, class... TArgs> - void SubscribeEvent(const TFutureType& future, TArgs... args) { - std::weak_ptr<TAtomicCounter> weakGeneration(Generation); - future.Subscribe( - [ - id = SelfId(), - originalGen = Generation->Val(), - weakGen = weakGeneration, - as = ActorSystem, - ... args = std::forward<TArgs>(args) - ](const auto&) mutable { - auto currentGen = weakGen.lock(); - if (currentGen && originalGen == currentGen->Val()) { - as->Send(id, new TEventType(args...)); - } - } - ); - } - void Reset(const TActorContext& ctx) { + void Reset(const TActorContext& ctx, bool error = true) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Metacache: reset"); - Y_VERIFY(ClientWrapper); - ClientWrapper->Reset(); Generation->Inc(); - YdbSession = Nothing(); - PreparedTopicsQuery = Nothing(); LastTopicKey = {}; - ctx.Schedule(QueryRetryInterval, new NActors::TEvents::TEvWakeup()); + Type = EQueryType::ECheckVersion; + ctx.Schedule(error ? QueryRetryInterval : VersionCheckInterval, new NActors::TEvents::TEvWakeup()); } - void StartSession(const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Start new session"); - SessionFuture = ClientWrapper->GetClient()->GetSession(); - SubscribeEvent<TEvPqNewMetaCache::TEvSessionStarted>(SessionFuture); - } + void RunQuery(EQueryType type, const TActorContext& ctx) { - void HandleSessionStarted(const TActorContext& ctx) { - auto& value = SessionFuture.GetValue(); - if (!value.IsSuccess()) { - LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE, "Session start failed: " << value.GetIssues().ToString()); - return Reset(ctx); - } - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Session started"); - YdbSession = value.GetSession(); - PrepareTopicsQuery(ctx); - } + auto req = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); - void PrepareTopicsQuery(const TActorContext& ctx) { - Y_UNUSED(ctx); - TopicsQueryFuture = YdbSession->PrepareDataQuery(TopicsQuery); - SubscribeEvent<TEvPqNewMetaCache::TEvQueryPrepared>(TopicsQueryFuture); - } + req->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + req->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML); + req->Record.MutableRequest()->SetKeepSession(false); + req->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig)); - void HandleTopicsQueryPrepared(const TActorContext& ctx) { - auto& value = TopicsQueryFuture.GetValue(); - if (!value.IsSuccess()) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Topics query prepare failed: " << value.GetIssues().ToString()); - return Reset(ctx); - } - PreparedTopicsQuery = value.GetQuery(); - if (NewTopicsVersion > CurrentTopicsVersion) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Start topics rescan"); - RunQuery(EQueryType::EGetTopics, ctx); - } else { - Y_VERIFY(NewTopicsVersion == CurrentTopicsVersion); - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Check version"); - RunQuery(EQueryType::ECheckVersion, ctx); - } - } + req->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true); + req->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + req->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); + + Type = type; - void RunQuery(EQueryType type, const TActorContext& ctx) { - Y_UNUSED(ctx); - Y_VERIFY(YdbSession); if (type == EQueryType::ECheckVersion) { - AsyncQueryResult = YdbSession->ExecuteDataQuery(VersionQuery, TTxControl::BeginTx().CommitTx()); + req->Record.MutableRequest()->SetQuery(VersionQuery); } else { - Y_VERIFY(PreparedTopicsQuery); - auto builder = PreparedTopicsQuery->GetParamsBuilder(); - { - auto& param = builder.AddParam("$Path"); - param.Utf8(LastTopicKey.Path); - param.Build(); - } - { - auto& param = builder.AddParam("$Cluster"); - param.Utf8(LastTopicKey.Cluster); - param.Build(); - } - AsyncQueryResult = PreparedTopicsQuery->Execute(TTxControl::BeginTx().CommitTx(), builder.Build()); + req->Record.MutableRequest()->SetQuery(TopicsQuery); + NClient::TParameters params; + params["$Path"] = LastTopicKey.Path; + params["$Cluster"] = LastTopicKey.Cluster; + req->Record.MutableRequest()->MutableParameters()->Swap(¶ms); } - SubscribeEvent<TEvPqNewMetaCache::TEvQueryComplete>(AsyncQueryResult, type); + Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), req.Release(), 0, Generation->Val()); } - void HandleCheckVersionResult(const TActorContext& ctx) { - auto result = AsyncQueryResult.GetValue(); - if (!result.IsSuccess()) { + void HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + if (ev->Cookie != (ui64)Generation->Val()) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "stale response with generation " << ev->Cookie << ", actual is " << Generation->Val()); + return; + } + const auto& record = ev->Get()->Record.GetRef(); + + if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) { LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE, - "Got error trying to update config version: " << result.GetIssues().ToString()); - ctx.Schedule(QueryRetryInterval, new TEvPqNewMetaCache::TEvRestartQuery(EQueryType::ECheckVersion)); + "Got error trying to perform request: " << record); + Reset(ctx); return; } - Y_VERIFY(result.GetResultSets().size() == 1); - ui64 newVersion = 0; - { - auto versionQueryResult = result.GetResultSetParser(0); - while (versionQueryResult.TryNextRow()) { - newVersion = versionQueryResult.ColumnParser("version").GetOptionalInt64().GetOrElse(0); - } + + switch (Type) { + case EQueryType::ECheckVersion: + return HandleCheckVersionResult(ev, ctx); + case EQueryType::EGetTopics: + return HandleGetTopicsResult(ev, ctx); + default: + Y_FAIL(); } + } + + void HandleQueryResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; + LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE, "failed to list topics: " << record); + + Reset(ctx); + } + + void HandleCheckVersionResult(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + + const auto& record = ev->Get()->Record.GetRef(); + + Y_VERIFY(record.GetResponse().GetResults().size() == 1); + const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0); + ui64 newVersion = rr.ListSize() == 0 ? 0 : rr.GetList(0).GetStruct(0).GetOptional().GetInt64(); + + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Got config version: " << newVersion); + LastVersionUpdate = ctx.Now(); if (newVersion > CurrentTopicsVersion || CurrentTopicsVersion == 0 || SkipVersionCheck) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Got config version: " << newVersion); NewTopicsVersion = newVersion; RunQuery(EQueryType::EGetTopics, ctx); } else { - ctx.Schedule(VersionCheckInterval, new TEvPqNewMetaCache::TEvRestartQuery(EQueryType::ECheckVersion)); + Reset(ctx, false); } } - void HandleGetTopicsResult(const TActorContext& ctx) { - auto result = AsyncQueryResult.GetValue(); - if (!result.IsSuccess()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_METACACHE, - "Got error trying to get topics: " << result.GetIssues().ToString()); - return Reset(ctx); - } - Y_VERIFY(result.GetResultSets().size() == 1); - auto versionQueryResult = result.GetResultSetParser(0); + void HandleGetTopicsResult(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + + const auto& record = ev->Get()->Record.GetRef(); + + Y_VERIFY(record.GetResponse().GetResults().size() == 1); + TString path, dc; - while (versionQueryResult.TryNextRow()) { - path = *versionQueryResult.ColumnParser("path").GetOptionalUtf8(); - dc = *versionQueryResult.ColumnParser("dc").GetOptionalUtf8(); + const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0); + for (const auto& row : rr.GetList()) { + + path = row.GetStruct(0).GetOptional().GetText(); + dc = row.GetStruct(1).GetOptional().GetText(); + NewTopics.emplace_back(NPersQueue::BuildFullTopicName(path, dc)); } - if (result.GetResultSet(0).Truncated()) { + if (rr.ListSize() > 0) { LastTopicKey = {path, dc}; return RunQuery(EQueryType::EGetTopics, ctx); } else { @@ -248,25 +202,10 @@ private: } LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics list with : " << CurrentTopics.size() << " topics"); - ctx.Schedule(VersionCheckInterval, new TEvPqNewMetaCache::TEvRestartQuery(EQueryType::ECheckVersion)); + Reset(ctx, false); } } - void HandleQueryComplete(TEvPqNewMetaCache::TEvQueryComplete::TPtr& ev, const TActorContext& ctx) { - switch (ev->Get()->Type) { - case EQueryType::ECheckVersion: - return HandleCheckVersionResult(ctx); - case EQueryType::EGetTopics: - return HandleGetTopicsResult(ctx); - default: - Y_FAIL(); - } - } - - void HandleRestartQuery(TEvPqNewMetaCache::TEvRestartQuery::TPtr& ev, const TActorContext& ctx) { - Y_VERIFY(ev->Get()->Type == EQueryType::ECheckVersion); - RunQuery(ev->Get()->Type, ctx); - } void HandleGetVersion(TEvPqNewMetaCache::TEvGetVersionRequest::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send version response: " << CurrentTopicsVersion); @@ -286,13 +225,15 @@ private: } void HandleDescribeAllTopics(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest::TPtr& ev, const TActorContext& ctx) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe all topics"); - if (ev->Get()->PathPrefix && ev->Get()->PathPrefix != PathPrefix) { + if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() || ev->Get()->PathPrefix && ev->Get()->PathPrefix != PathPrefix) { auto* response = new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(ev->Get()->PathPrefix); response->Success = false; ctx.Send(ev->Sender, response); return; } + + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe all topics"); + if (!EverGotTopics) { ListTopicsWaiters.push(ev->Sender); return; @@ -398,27 +339,30 @@ private: } } - static NActors::TActorSystem* GetActorSystem() { - return TActivationContext::ActorSystem(); + void StartQuery(const TActorContext& ctx) { + if (NewTopicsVersion > CurrentTopicsVersion) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Start topics rescan"); + RunQuery(EQueryType::EGetTopics, ctx); + } else { + Y_VERIFY(NewTopicsVersion == CurrentTopicsVersion); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Check version rescan"); + RunQuery(EQueryType::ECheckVersion, ctx); + } } public: void Die(const TActorContext& ctx) { - Generation->Inc(); TBase::Die(ctx); } STRICT_STFUNC(StateFunc, - SFunc(NActors::TEvents::TEvWakeup, StartSession) - SFunc(TEvPqNewMetaCache::TEvSessionStarted, HandleSessionStarted) - SFunc(TEvPqNewMetaCache::TEvQueryPrepared, HandleTopicsQueryPrepared) - HFunc(TEvPqNewMetaCache::TEvQueryComplete, HandleQueryComplete) - HFunc(TEvPqNewMetaCache::TEvRestartQuery, HandleRestartQuery) - - HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion) - HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics) - HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics) - HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse) + SFunc(NActors::TEvents::TEvWakeup, StartQuery) + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse); + HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleQueryResponse); + HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion) + HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics) + HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics) + HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse) ) private: @@ -439,68 +383,14 @@ private: }; - class TClientWrapper { - public: - TClientWrapper(ui64 driverPort) - : DriverPort(driverPort) - {} - - TClientWrapper(NYdb::TDriver* driver) - : Driver(driver) - {} - - TClientWrapper(const TAtomicSharedPtr<TTableClient>& tableClient) - : TableClient(tableClient) - {} - - void Reset() { - if (DriverPort.Defined()) { // Own driver - if (DriverHolder == nullptr) { - TString endpoint = TStringBuilder() << "localhost:" << *DriverPort; - auto driverConfig = NYdb::TDriverConfig().SetEndpoint(endpoint); - DriverHolder.Reset(new NYdb::TDriver(driverConfig)); - Driver = DriverHolder.Get(); - TableClient.Reset(new NYdb::NTable::TTableClient(*Driver)); - } - } else if (Driver != nullptr) { - TableClient = MakeAtomicShared<NYdb::NTable::TTableClient>(*Driver); - } - } - NYdb::NTable::TTableClient* GetClient() { - Y_VERIFY(TableClient); - return TableClient.Get(); - } - void Stop() { - if (DriverHolder != nullptr) { - TableClient->Stop(); - DriverHolder->Stop(); - } else if (Driver != nullptr) { - TableClient->Stop(); - } - } - - private: - THolder<NYdb::TDriver> DriverHolder; - NYdb::TDriver* Driver = nullptr; - TAtomicSharedPtr<NYdb::NTable::TTableClient> TableClient; - TMaybe<ui64> DriverPort; - - }; - NMonitoring::TDynamicCounterPtr Counters; - NActors::TActorSystem* ActorSystem; TString VersionQuery; TString TopicsQuery; - std::unique_ptr<TClientWrapper> ClientWrapper; - TAsyncCreateSessionResult SessionFuture; - TMaybe<TSession> YdbSession; - TAsyncPrepareQueryResult TopicsQueryFuture; - TMaybe<TDataQuery> PreparedTopicsQuery; - TAsyncDataQueryResult AsyncQueryResult; ui64 CurrentTopicsVersion = 0; ui64 NewTopicsVersion = 0; TTopicKey LastTopicKey = TTopicKey{}; + EQueryType Type = EQueryType::ECheckVersion; TVector<TString> NewTopics; TVector<TString> CurrentTopics; bool EverGotTopics = false; @@ -526,15 +416,11 @@ IActor* CreatePQMetaCache(const NMonitoring::TDynamicCounterPtr& counters, const return new TPersQueueMetaCacheActor(counters, versionCheckInterval); } -IActor* CreatePQMetaCache(ui64 grpcPort, const NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval) { - return new TPersQueueMetaCacheActor(grpcPort, counters, versionCheckInterval); -} - IActor* CreatePQMetaCache( - const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient, const NActors::TActorId& schemeBoardCacheId, + const NActors::TActorId& schemeBoardCacheId, const TDuration& versionCheckInterval ) { - return new TPersQueueMetaCacheActor(tableClient, schemeBoardCacheId, versionCheckInterval); + return new TPersQueueMetaCacheActor(schemeBoardCacheId, versionCheckInterval); } } // namespace NPqMetaCacheV2 diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h index a125e7b81ac..5c1d6d64713 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.h +++ b/ydb/core/client/server/msgbus_server_pq_metacache.h @@ -38,10 +38,6 @@ using TMetaCacheRequest = TVector<TTopicMetaRequest>; struct TEvPqNewMetaCache { enum EEv { EvWakeup = EventSpaceBegin(TKikimrEvents::ES_PQ_META_CACHE), - EvSessionStarted, - EvQueryPrepared, - EvQueryComplete, - EvRestartQuery, EvGetVersionRequest, EvGetVersionResponse, EvDescribeTopicsRequest, @@ -54,25 +50,6 @@ struct TEvPqNewMetaCache { static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_META_CACHE), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_META_CACHE)"); - struct TEvSessionStarted : public TEventLocal<TEvSessionStarted, EvSessionStarted> { - }; - - struct TEvQueryPrepared : public TEventLocal<TEvQueryPrepared, EvQueryPrepared> { - }; - - struct TEvQueryComplete : public TEventLocal<TEvQueryComplete, EvQueryComplete> { - EQueryType Type; - - explicit TEvQueryComplete(EQueryType type) - : Type(type) {} - }; - - struct TEvRestartQuery : public TEventLocal<TEvRestartQuery, EvRestartQuery> { - EQueryType Type; - - explicit TEvRestartQuery(EQueryType type) - : Type(type) {} - }; struct TEvGetVersionRequest : public TEventLocal<TEvGetVersionRequest, EvGetVersionRequest> { }; @@ -134,14 +111,9 @@ struct TEvPqNewMetaCache { }; }; IActor* CreatePQMetaCache(const NMonitoring::TDynamicCounterPtr& counters, - const TDuration& versionCheckInterval); - -IActor* CreatePQMetaCache(ui64 grpcPort, - const NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval = TDuration::Seconds(1)); -IActor* CreatePQMetaCache(const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient, - const NActors::TActorId& schemeBoardCacheId, +IActor* CreatePQMetaCache(const NActors::TActorId& schemeBoardCacheId, const TDuration& versionCheckInterval = TDuration::Seconds(1)); diff --git a/ydb/core/client/server/pq_metacache_v2_ut.cpp b/ydb/core/client/server/pq_metacache_v2_ut.cpp index 39383a462b1..2b928f8fee5 100644 --- a/ydb/core/client/server/pq_metacache_v2_ut.cpp +++ b/ydb/core/client/server/pq_metacache_v2_ut.cpp @@ -65,7 +65,7 @@ class TPqMetaCacheV2Test: public TTestBase { tableDesc = TTableBuilder() .AddNullableColumn("name", EPrimitiveType::String) .AddNullableColumn("version", EPrimitiveType::Int64) - .SetPrimaryKeyColumns({"name", "version"}) + .SetPrimaryKeyColumns({"name"}) .Build(); CheckYdbResult(session.CreateTable("/Root/PQ/Config/V2/Versions", std::move(tableDesc))); @@ -79,7 +79,7 @@ class TPqMetaCacheV2Test: public TTestBase { } SchemeCacheId = runtime->Register(CreateSchemeBoardSchemeCache(config.Get())); MetaCacheId = runtime->Register( - NPqMetaCacheV2::CreatePQMetaCache(GrpcServerPort, config->Counters, TDuration::MilliSeconds(50)) + NPqMetaCacheV2::CreatePQMetaCache(config->Counters, TDuration::MilliSeconds(50)) ); runtime->EnableScheduleForActor(SchemeCacheId, true); runtime->EnableScheduleForActor(MetaCacheId, true); @@ -124,6 +124,7 @@ class TPqMetaCacheV2Test: public TTestBase { param.Build(); } CheckYdbResult(versionPrepared.Execute(txControl, builder.Build())); + Cerr << "TOPIC VERSION SHIFTED TO " << Version << "\n"; } CheckYdbResult(tx.Commit()); for (auto& status : createTopicStatus) { @@ -195,7 +196,7 @@ class TPqMetaCacheV2Test: public TTestBase { TActorId MakeEdgeTargetedMetaCache() { auto anotherMetaCacheId = Server->GetRuntime()->Register( - NPqMetaCacheV2::CreatePQMetaCache(TableClient, EdgeActorId) + NPqMetaCacheV2::CreatePQMetaCache(EdgeActorId) ); return anotherMetaCacheId; |