aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-03-14 20:16:27 +0300
committeralexnick <alexnick@yandex-team.ru>2022-03-14 20:16:27 +0300
commit37fa27eedb4ff54da776b751356b9b4cce2a6961 (patch)
treee52955d9be907f85e29a54edb639f3a6374232bd
parentb3566c4940538fab9ce3687168f46d228f25db07 (diff)
downloadydb-37fa27eedb4ff54da776b751356b9b4cce2a6961.tar.gz
use KQP instead of driver LOGBROKER-7358
ref:937549cf701e3fefdaecf3f99e9a64eccfdd77a4
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.cpp350
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.h30
-rw-r--r--ydb/core/client/server/pq_metacache_v2_ut.cpp7
3 files changed, 123 insertions, 264 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
index 39f253e75a7..45211485f5d 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
@@ -2,13 +2,17 @@
#include "msgbus_server_pq_metacache.h"
#include <ydb/public/api/protos/draft/persqueue_error_codes.pb.h>
+#include <ydb/public/lib/deprecated/kicli/kicli.h>
+
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/scheme_board/cache.h>
#include <ydb/core/base/counters.h>
-
+#include <ydb/core/kqp/kqp.h>
#include <ydb/core/base/appdata.h>
+#include <ydb/core/persqueue/pq_database.h>
+
namespace NKikimr::NMsgBusProxy {
@@ -16,8 +20,10 @@ using namespace NYdb::NTable;
namespace NPqMetaCacheV2 {
-IActor* CreateSchemeCache(NActors::TActorSystem* ActorSystem, TIntrusivePtr<NMonitoring::TDynamicCounters> counters) {
- auto appData = ActorSystem->AppData<TAppData>();
+
+
+IActor* CreateSchemeCache(const TActorContext& ctx, TIntrusivePtr<NMonitoring::TDynamicCounters> counters) {
+ auto appData = AppData(ctx);
auto cacheCounters = GetServiceCounters(counters, "pqproxy|schemecache");
auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters);
return CreateSchemeBoardSchemeCache(cacheConfig.Get());
@@ -29,48 +35,36 @@ public:
TPersQueueMetaCacheActor(TPersQueueMetaCacheActor&&) = default;
TPersQueueMetaCacheActor& operator=(TPersQueueMetaCacheActor&&) = default;
- TPersQueueMetaCacheActor(ui64 grpcPort,
- const NMonitoring::TDynamicCounterPtr& counters,
- const TDuration& versionCheckInterval)
- : Counters(counters)
- , ClientWrapper(std::make_unique<TClientWrapper>(grpcPort))
- , VersionCheckInterval(versionCheckInterval)
- , Generation(std::make_shared<TAtomicCounter>())
- {
- }
-
TPersQueueMetaCacheActor(const NMonitoring::TDynamicCounterPtr& counters,
const TDuration& versionCheckInterval)
: Counters(counters)
, VersionCheckInterval(versionCheckInterval)
- , Generation(std::make_shared<TAtomicCounter>())
+ , Generation(std::make_shared<TAtomicCounter>(100))
{
}
TPersQueueMetaCacheActor(
- const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient, const NActors::TActorId& schemeBoardCacheId,
+ const NActors::TActorId& schemeBoardCacheId,
const TDuration& versionCheckInterval
)
- : ClientWrapper(std::make_unique<TClientWrapper>(tableClient))
- , VersionCheckInterval(versionCheckInterval)
+ : VersionCheckInterval(versionCheckInterval)
, SchemeCacheId(schemeBoardCacheId)
, Generation(std::make_shared<TAtomicCounter>())
{
}
+
void Bootstrap(const TActorContext& ctx) {
- if (ClientWrapper == nullptr) {
- auto* driver = AppData(ctx)->YdbDriver;
- if (driver == nullptr) {
- LOG_WARN_S(
- ctx, NKikimrServices::PQ_METACACHE,
- "Initialized without valid YDB driver - suppose misconfiguration. Refuse to work. Die."
- );
- Die(ctx);
- return;
- }
- ClientWrapper.reset(new TClientWrapper(driver));
+ Become(&TPersQueueMetaCacheActor::StateFunc);
+
+ if (!SchemeCacheId) {
+ SchemeCacheId = Register(CreateSchemeCache(ctx, Counters));
}
+
+ if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen())
+ return;
+
SkipVersionCheck = AppData(ctx)->PQConfig.GetMetaCacheSkipVersionCheck();
+
PathPrefix = TopicPrefix(ctx);
TopicsQuery = TStringBuilder() << "--!syntax_v1\n"
<< "DECLARE $Path as Utf8; DECLARE $Cluster as Utf8; "
@@ -80,157 +74,117 @@ public:
VersionQuery = TStringBuilder() << "--!syntax_v1\nSELECT version FROM `" << PathPrefix << "Config/V2/Versions` "
<< "WHERE name = 'Topics';";
PathPrefixParts = NKikimr::SplitPath(PathPrefix);
- Become(&TPersQueueMetaCacheActor::StateFunc);
- if (!SchemeCacheId) {
- SchemeCacheId = Register(CreateSchemeCache(GetActorSystem(), Counters));
- }
- ActorSystem = GetActorSystem();
Reset(ctx);
}
~TPersQueueMetaCacheActor() {
- Generation->Inc();
- if (ClientWrapper)
- ClientWrapper->Stop();
}
private:
- template<class TEventType, class TFutureType, class... TArgs>
- void SubscribeEvent(const TFutureType& future, TArgs... args) {
- std::weak_ptr<TAtomicCounter> weakGeneration(Generation);
- future.Subscribe(
- [
- id = SelfId(),
- originalGen = Generation->Val(),
- weakGen = weakGeneration,
- as = ActorSystem,
- ... args = std::forward<TArgs>(args)
- ](const auto&) mutable {
- auto currentGen = weakGen.lock();
- if (currentGen && originalGen == currentGen->Val()) {
- as->Send(id, new TEventType(args...));
- }
- }
- );
- }
- void Reset(const TActorContext& ctx) {
+ void Reset(const TActorContext& ctx, bool error = true) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Metacache: reset");
- Y_VERIFY(ClientWrapper);
- ClientWrapper->Reset();
Generation->Inc();
- YdbSession = Nothing();
- PreparedTopicsQuery = Nothing();
LastTopicKey = {};
- ctx.Schedule(QueryRetryInterval, new NActors::TEvents::TEvWakeup());
+ Type = EQueryType::ECheckVersion;
+ ctx.Schedule(error ? QueryRetryInterval : VersionCheckInterval, new NActors::TEvents::TEvWakeup());
}
- void StartSession(const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Start new session");
- SessionFuture = ClientWrapper->GetClient()->GetSession();
- SubscribeEvent<TEvPqNewMetaCache::TEvSessionStarted>(SessionFuture);
- }
+ void RunQuery(EQueryType type, const TActorContext& ctx) {
- void HandleSessionStarted(const TActorContext& ctx) {
- auto& value = SessionFuture.GetValue();
- if (!value.IsSuccess()) {
- LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE, "Session start failed: " << value.GetIssues().ToString());
- return Reset(ctx);
- }
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Session started");
- YdbSession = value.GetSession();
- PrepareTopicsQuery(ctx);
- }
+ auto req = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
- void PrepareTopicsQuery(const TActorContext& ctx) {
- Y_UNUSED(ctx);
- TopicsQueryFuture = YdbSession->PrepareDataQuery(TopicsQuery);
- SubscribeEvent<TEvPqNewMetaCache::TEvQueryPrepared>(TopicsQueryFuture);
- }
+ req->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ req->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_DML);
+ req->Record.MutableRequest()->SetKeepSession(false);
+ req->Record.MutableRequest()->SetDatabase(NKikimr::NPQ::GetDatabaseFromConfig(AppData(ctx)->PQConfig));
- void HandleTopicsQueryPrepared(const TActorContext& ctx) {
- auto& value = TopicsQueryFuture.GetValue();
- if (!value.IsSuccess()) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Topics query prepare failed: " << value.GetIssues().ToString());
- return Reset(ctx);
- }
- PreparedTopicsQuery = value.GetQuery();
- if (NewTopicsVersion > CurrentTopicsVersion) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Start topics rescan");
- RunQuery(EQueryType::EGetTopics, ctx);
- } else {
- Y_VERIFY(NewTopicsVersion == CurrentTopicsVersion);
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Check version");
- RunQuery(EQueryType::ECheckVersion, ctx);
- }
- }
+ req->Record.MutableRequest()->MutableQueryCachePolicy()->set_keep_in_cache(true);
+ req->Record.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ req->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+
+ Type = type;
- void RunQuery(EQueryType type, const TActorContext& ctx) {
- Y_UNUSED(ctx);
- Y_VERIFY(YdbSession);
if (type == EQueryType::ECheckVersion) {
- AsyncQueryResult = YdbSession->ExecuteDataQuery(VersionQuery, TTxControl::BeginTx().CommitTx());
+ req->Record.MutableRequest()->SetQuery(VersionQuery);
} else {
- Y_VERIFY(PreparedTopicsQuery);
- auto builder = PreparedTopicsQuery->GetParamsBuilder();
- {
- auto& param = builder.AddParam("$Path");
- param.Utf8(LastTopicKey.Path);
- param.Build();
- }
- {
- auto& param = builder.AddParam("$Cluster");
- param.Utf8(LastTopicKey.Cluster);
- param.Build();
- }
- AsyncQueryResult = PreparedTopicsQuery->Execute(TTxControl::BeginTx().CommitTx(), builder.Build());
+ req->Record.MutableRequest()->SetQuery(TopicsQuery);
+ NClient::TParameters params;
+ params["$Path"] = LastTopicKey.Path;
+ params["$Cluster"] = LastTopicKey.Cluster;
+ req->Record.MutableRequest()->MutableParameters()->Swap(&params);
}
- SubscribeEvent<TEvPqNewMetaCache::TEvQueryComplete>(AsyncQueryResult, type);
+ Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), req.Release(), 0, Generation->Val());
}
- void HandleCheckVersionResult(const TActorContext& ctx) {
- auto result = AsyncQueryResult.GetValue();
- if (!result.IsSuccess()) {
+ void HandleQueryResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ if (ev->Cookie != (ui64)Generation->Val()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "stale response with generation " << ev->Cookie << ", actual is " << Generation->Val());
+ return;
+ }
+ const auto& record = ev->Get()->Record.GetRef();
+
+ if (record.GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE,
- "Got error trying to update config version: " << result.GetIssues().ToString());
- ctx.Schedule(QueryRetryInterval, new TEvPqNewMetaCache::TEvRestartQuery(EQueryType::ECheckVersion));
+ "Got error trying to perform request: " << record);
+ Reset(ctx);
return;
}
- Y_VERIFY(result.GetResultSets().size() == 1);
- ui64 newVersion = 0;
- {
- auto versionQueryResult = result.GetResultSetParser(0);
- while (versionQueryResult.TryNextRow()) {
- newVersion = versionQueryResult.ColumnParser("version").GetOptionalInt64().GetOrElse(0);
- }
+
+ switch (Type) {
+ case EQueryType::ECheckVersion:
+ return HandleCheckVersionResult(ev, ctx);
+ case EQueryType::EGetTopics:
+ return HandleGetTopicsResult(ev, ctx);
+ default:
+ Y_FAIL();
}
+ }
+
+ void HandleQueryResponse(NKqp::TEvKqp::TEvProcessResponse::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
+ LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE, "failed to list topics: " << record);
+
+ Reset(ctx);
+ }
+
+ void HandleCheckVersionResult(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+
+ const auto& record = ev->Get()->Record.GetRef();
+
+ Y_VERIFY(record.GetResponse().GetResults().size() == 1);
+ const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
+ ui64 newVersion = rr.ListSize() == 0 ? 0 : rr.GetList(0).GetStruct(0).GetOptional().GetInt64();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Got config version: " << newVersion);
+
LastVersionUpdate = ctx.Now();
if (newVersion > CurrentTopicsVersion || CurrentTopicsVersion == 0 || SkipVersionCheck) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Got config version: " << newVersion);
NewTopicsVersion = newVersion;
RunQuery(EQueryType::EGetTopics, ctx);
} else {
- ctx.Schedule(VersionCheckInterval, new TEvPqNewMetaCache::TEvRestartQuery(EQueryType::ECheckVersion));
+ Reset(ctx, false);
}
}
- void HandleGetTopicsResult(const TActorContext& ctx) {
- auto result = AsyncQueryResult.GetValue();
- if (!result.IsSuccess()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_METACACHE,
- "Got error trying to get topics: " << result.GetIssues().ToString());
- return Reset(ctx);
- }
- Y_VERIFY(result.GetResultSets().size() == 1);
- auto versionQueryResult = result.GetResultSetParser(0);
+ void HandleGetTopicsResult(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+
+ const auto& record = ev->Get()->Record.GetRef();
+
+ Y_VERIFY(record.GetResponse().GetResults().size() == 1);
+
TString path, dc;
- while (versionQueryResult.TryNextRow()) {
- path = *versionQueryResult.ColumnParser("path").GetOptionalUtf8();
- dc = *versionQueryResult.ColumnParser("dc").GetOptionalUtf8();
+ const auto& rr = record.GetResponse().GetResults(0).GetValue().GetStruct(0);
+ for (const auto& row : rr.GetList()) {
+
+ path = row.GetStruct(0).GetOptional().GetText();
+ dc = row.GetStruct(1).GetOptional().GetText();
+
NewTopics.emplace_back(NPersQueue::BuildFullTopicName(path, dc));
}
- if (result.GetResultSet(0).Truncated()) {
+ if (rr.ListSize() > 0) {
LastTopicKey = {path, dc};
return RunQuery(EQueryType::EGetTopics, ctx);
} else {
@@ -248,25 +202,10 @@ private:
}
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE,
"Updated topics list with : " << CurrentTopics.size() << " topics");
- ctx.Schedule(VersionCheckInterval, new TEvPqNewMetaCache::TEvRestartQuery(EQueryType::ECheckVersion));
+ Reset(ctx, false);
}
}
- void HandleQueryComplete(TEvPqNewMetaCache::TEvQueryComplete::TPtr& ev, const TActorContext& ctx) {
- switch (ev->Get()->Type) {
- case EQueryType::ECheckVersion:
- return HandleCheckVersionResult(ctx);
- case EQueryType::EGetTopics:
- return HandleGetTopicsResult(ctx);
- default:
- Y_FAIL();
- }
- }
-
- void HandleRestartQuery(TEvPqNewMetaCache::TEvRestartQuery::TPtr& ev, const TActorContext& ctx) {
- Y_VERIFY(ev->Get()->Type == EQueryType::ECheckVersion);
- RunQuery(ev->Get()->Type, ctx);
- }
void HandleGetVersion(TEvPqNewMetaCache::TEvGetVersionRequest::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send version response: " << CurrentTopicsVersion);
@@ -286,13 +225,15 @@ private:
}
void HandleDescribeAllTopics(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe all topics");
- if (ev->Get()->PathPrefix && ev->Get()->PathPrefix != PathPrefix) {
+ if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() || ev->Get()->PathPrefix && ev->Get()->PathPrefix != PathPrefix) {
auto* response = new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(ev->Get()->PathPrefix);
response->Success = false;
ctx.Send(ev->Sender, response);
return;
}
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe all topics");
+
if (!EverGotTopics) {
ListTopicsWaiters.push(ev->Sender);
return;
@@ -398,27 +339,30 @@ private:
}
}
- static NActors::TActorSystem* GetActorSystem() {
- return TActivationContext::ActorSystem();
+ void StartQuery(const TActorContext& ctx) {
+ if (NewTopicsVersion > CurrentTopicsVersion) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Start topics rescan");
+ RunQuery(EQueryType::EGetTopics, ctx);
+ } else {
+ Y_VERIFY(NewTopicsVersion == CurrentTopicsVersion);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Check version rescan");
+ RunQuery(EQueryType::ECheckVersion, ctx);
+ }
}
public:
void Die(const TActorContext& ctx) {
- Generation->Inc();
TBase::Die(ctx);
}
STRICT_STFUNC(StateFunc,
- SFunc(NActors::TEvents::TEvWakeup, StartSession)
- SFunc(TEvPqNewMetaCache::TEvSessionStarted, HandleSessionStarted)
- SFunc(TEvPqNewMetaCache::TEvQueryPrepared, HandleTopicsQueryPrepared)
- HFunc(TEvPqNewMetaCache::TEvQueryComplete, HandleQueryComplete)
- HFunc(TEvPqNewMetaCache::TEvRestartQuery, HandleRestartQuery)
-
- HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion)
- HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics)
- HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics)
- HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse)
+ SFunc(NActors::TEvents::TEvWakeup, StartQuery)
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleQueryResponse);
+ HFunc(NKqp::TEvKqp::TEvProcessResponse, HandleQueryResponse);
+ HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion)
+ HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics)
+ HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics)
+ HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse)
)
private:
@@ -439,68 +383,14 @@ private:
};
- class TClientWrapper {
- public:
- TClientWrapper(ui64 driverPort)
- : DriverPort(driverPort)
- {}
-
- TClientWrapper(NYdb::TDriver* driver)
- : Driver(driver)
- {}
-
- TClientWrapper(const TAtomicSharedPtr<TTableClient>& tableClient)
- : TableClient(tableClient)
- {}
-
- void Reset() {
- if (DriverPort.Defined()) { // Own driver
- if (DriverHolder == nullptr) {
- TString endpoint = TStringBuilder() << "localhost:" << *DriverPort;
- auto driverConfig = NYdb::TDriverConfig().SetEndpoint(endpoint);
- DriverHolder.Reset(new NYdb::TDriver(driverConfig));
- Driver = DriverHolder.Get();
- TableClient.Reset(new NYdb::NTable::TTableClient(*Driver));
- }
- } else if (Driver != nullptr) {
- TableClient = MakeAtomicShared<NYdb::NTable::TTableClient>(*Driver);
- }
- }
- NYdb::NTable::TTableClient* GetClient() {
- Y_VERIFY(TableClient);
- return TableClient.Get();
- }
- void Stop() {
- if (DriverHolder != nullptr) {
- TableClient->Stop();
- DriverHolder->Stop();
- } else if (Driver != nullptr) {
- TableClient->Stop();
- }
- }
-
- private:
- THolder<NYdb::TDriver> DriverHolder;
- NYdb::TDriver* Driver = nullptr;
- TAtomicSharedPtr<NYdb::NTable::TTableClient> TableClient;
- TMaybe<ui64> DriverPort;
-
- };
-
NMonitoring::TDynamicCounterPtr Counters;
- NActors::TActorSystem* ActorSystem;
TString VersionQuery;
TString TopicsQuery;
- std::unique_ptr<TClientWrapper> ClientWrapper;
- TAsyncCreateSessionResult SessionFuture;
- TMaybe<TSession> YdbSession;
- TAsyncPrepareQueryResult TopicsQueryFuture;
- TMaybe<TDataQuery> PreparedTopicsQuery;
- TAsyncDataQueryResult AsyncQueryResult;
ui64 CurrentTopicsVersion = 0;
ui64 NewTopicsVersion = 0;
TTopicKey LastTopicKey = TTopicKey{};
+ EQueryType Type = EQueryType::ECheckVersion;
TVector<TString> NewTopics;
TVector<TString> CurrentTopics;
bool EverGotTopics = false;
@@ -526,15 +416,11 @@ IActor* CreatePQMetaCache(const NMonitoring::TDynamicCounterPtr& counters, const
return new TPersQueueMetaCacheActor(counters, versionCheckInterval);
}
-IActor* CreatePQMetaCache(ui64 grpcPort, const NMonitoring::TDynamicCounterPtr& counters, const TDuration& versionCheckInterval) {
- return new TPersQueueMetaCacheActor(grpcPort, counters, versionCheckInterval);
-}
-
IActor* CreatePQMetaCache(
- const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient, const NActors::TActorId& schemeBoardCacheId,
+ const NActors::TActorId& schemeBoardCacheId,
const TDuration& versionCheckInterval
) {
- return new TPersQueueMetaCacheActor(tableClient, schemeBoardCacheId, versionCheckInterval);
+ return new TPersQueueMetaCacheActor(schemeBoardCacheId, versionCheckInterval);
}
} // namespace NPqMetaCacheV2
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h
index a125e7b81ac..5c1d6d64713 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.h
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.h
@@ -38,10 +38,6 @@ using TMetaCacheRequest = TVector<TTopicMetaRequest>;
struct TEvPqNewMetaCache {
enum EEv {
EvWakeup = EventSpaceBegin(TKikimrEvents::ES_PQ_META_CACHE),
- EvSessionStarted,
- EvQueryPrepared,
- EvQueryComplete,
- EvRestartQuery,
EvGetVersionRequest,
EvGetVersionResponse,
EvDescribeTopicsRequest,
@@ -54,25 +50,6 @@ struct TEvPqNewMetaCache {
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_META_CACHE),
"expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_PQ_META_CACHE)");
- struct TEvSessionStarted : public TEventLocal<TEvSessionStarted, EvSessionStarted> {
- };
-
- struct TEvQueryPrepared : public TEventLocal<TEvQueryPrepared, EvQueryPrepared> {
- };
-
- struct TEvQueryComplete : public TEventLocal<TEvQueryComplete, EvQueryComplete> {
- EQueryType Type;
-
- explicit TEvQueryComplete(EQueryType type)
- : Type(type) {}
- };
-
- struct TEvRestartQuery : public TEventLocal<TEvRestartQuery, EvRestartQuery> {
- EQueryType Type;
-
- explicit TEvRestartQuery(EQueryType type)
- : Type(type) {}
- };
struct TEvGetVersionRequest : public TEventLocal<TEvGetVersionRequest, EvGetVersionRequest> {
};
@@ -134,14 +111,9 @@ struct TEvPqNewMetaCache {
};
};
IActor* CreatePQMetaCache(const NMonitoring::TDynamicCounterPtr& counters,
- const TDuration& versionCheckInterval);
-
-IActor* CreatePQMetaCache(ui64 grpcPort,
- const NMonitoring::TDynamicCounterPtr& counters,
const TDuration& versionCheckInterval = TDuration::Seconds(1));
-IActor* CreatePQMetaCache(const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient,
- const NActors::TActorId& schemeBoardCacheId,
+IActor* CreatePQMetaCache(const NActors::TActorId& schemeBoardCacheId,
const TDuration& versionCheckInterval = TDuration::Seconds(1));
diff --git a/ydb/core/client/server/pq_metacache_v2_ut.cpp b/ydb/core/client/server/pq_metacache_v2_ut.cpp
index 39383a462b1..2b928f8fee5 100644
--- a/ydb/core/client/server/pq_metacache_v2_ut.cpp
+++ b/ydb/core/client/server/pq_metacache_v2_ut.cpp
@@ -65,7 +65,7 @@ class TPqMetaCacheV2Test: public TTestBase {
tableDesc = TTableBuilder()
.AddNullableColumn("name", EPrimitiveType::String)
.AddNullableColumn("version", EPrimitiveType::Int64)
- .SetPrimaryKeyColumns({"name", "version"})
+ .SetPrimaryKeyColumns({"name"})
.Build();
CheckYdbResult(session.CreateTable("/Root/PQ/Config/V2/Versions", std::move(tableDesc)));
@@ -79,7 +79,7 @@ class TPqMetaCacheV2Test: public TTestBase {
}
SchemeCacheId = runtime->Register(CreateSchemeBoardSchemeCache(config.Get()));
MetaCacheId = runtime->Register(
- NPqMetaCacheV2::CreatePQMetaCache(GrpcServerPort, config->Counters, TDuration::MilliSeconds(50))
+ NPqMetaCacheV2::CreatePQMetaCache(config->Counters, TDuration::MilliSeconds(50))
);
runtime->EnableScheduleForActor(SchemeCacheId, true);
runtime->EnableScheduleForActor(MetaCacheId, true);
@@ -124,6 +124,7 @@ class TPqMetaCacheV2Test: public TTestBase {
param.Build();
}
CheckYdbResult(versionPrepared.Execute(txControl, builder.Build()));
+ Cerr << "TOPIC VERSION SHIFTED TO " << Version << "\n";
}
CheckYdbResult(tx.Commit());
for (auto& status : createTopicStatus) {
@@ -195,7 +196,7 @@ class TPqMetaCacheV2Test: public TTestBase {
TActorId MakeEdgeTargetedMetaCache() {
auto anotherMetaCacheId = Server->GetRuntime()->Register(
- NPqMetaCacheV2::CreatePQMetaCache(TableClient, EdgeActorId)
+ NPqMetaCacheV2::CreatePQMetaCache(EdgeActorId)
);
return anotherMetaCacheId;