aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@yandex-team.ru>2022-03-14 18:58:57 +0300
committerkomels <komels@yandex-team.ru>2022-03-14 18:58:57 +0300
commiteed629ecd3803fa1423a5b83195610fd4703f27a (patch)
tree26e25ee8d350b55b5a2d22f50a6f53b46e69263c
parent61e7c7a08a105da2e4c779cd6a9ed2fb0f0ca1ad (diff)
downloadydb-eed629ecd3803fa1423a5b83195610fd4703f27a.tar.gz
Refactor metacache in trunk and enable tests
ref:64500761c99ed6016f451306eb2bd6b719501902
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.cpp4
-rw-r--r--ydb/core/client/server/msgbus_server_persqueue.h2
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.cpp192
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.h24
-rw-r--r--ydb/core/client/server/pq_metacache_v2_ut.cpp157
-rw-r--r--ydb/core/client/server/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/client/server/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/core/client/server/ut/ya.make1
-rw-r--r--ydb/core/testlib/mock_pq_metacache.h2
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write_actor.cpp2
10 files changed, 243 insertions, 143 deletions
diff --git a/ydb/core/client/server/msgbus_server_persqueue.cpp b/ydb/core/client/server/msgbus_server_persqueue.cpp
index d14fd75ebef..347471e787b 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.cpp
+++ b/ydb/core/client/server/msgbus_server_persqueue.cpp
@@ -33,7 +33,7 @@ const TString& TopicPrefix(const TActorContext& ctx) {
}
TProcessingResult ProcessMetaCacheAllTopicsResponse(TEvPqNewMetaCache::TEvDescribeAllTopicsResponse::TPtr& ev) {
- auto *res = ev->Get()->Result.Get();
+ auto& res = ev->Get()->Result;
const TString& path = ev->Get()->Path;
TProcessingResult result;
if (!ev->Get()->Success) {
@@ -874,7 +874,7 @@ public:
void Handle(TEvAllTopicsDescribeResponse::TPtr& ev, const TActorContext& ctx) {
--DescribeRequests;
- auto* res = ev->Get()->Result.Get();
+ auto& res = ev->Get()->Result;
auto processResult = ProcessMetaCacheAllTopicsResponse(ev);
if (processResult.IsFatal) {
ErrorReason = processResult.Reason;
diff --git a/ydb/core/client/server/msgbus_server_persqueue.h b/ydb/core/client/server/msgbus_server_persqueue.h
index 52c67832e4e..1a915080e0c 100644
--- a/ydb/core/client/server/msgbus_server_persqueue.h
+++ b/ydb/core/client/server/msgbus_server_persqueue.h
@@ -145,7 +145,7 @@ protected:
const TActorId PqMetaCache;
THashMap<TActorId, THolder<TPerTopicInfo>> Children;
size_t ChildrenAnswered = 0;
- THolder<NSchemeCache::TSchemeCacheNavigate> SchemeCacheResponse;
+ std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> SchemeCacheResponse;
// Nodes info
const bool ListNodes;
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
index 5fb9cf55e13..39f253e75a7 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
@@ -33,7 +33,7 @@ public:
const NMonitoring::TDynamicCounterPtr& counters,
const TDuration& versionCheckInterval)
: Counters(counters)
- , ClientWrapper(std::move(std::make_unique<TClientWrapper>(grpcPort)))
+ , ClientWrapper(std::make_unique<TClientWrapper>(grpcPort))
, VersionCheckInterval(versionCheckInterval)
, Generation(std::make_shared<TAtomicCounter>())
{
@@ -47,6 +47,16 @@ public:
{
}
+ TPersQueueMetaCacheActor(
+ const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient, const NActors::TActorId& schemeBoardCacheId,
+ const TDuration& versionCheckInterval
+ )
+ : ClientWrapper(std::make_unique<TClientWrapper>(tableClient))
+ , VersionCheckInterval(versionCheckInterval)
+ , SchemeCacheId(schemeBoardCacheId)
+ , Generation(std::make_shared<TAtomicCounter>())
+ {
+ }
void Bootstrap(const TActorContext& ctx) {
if (ClientWrapper == nullptr) {
auto* driver = AppData(ctx)->YdbDriver;
@@ -60,7 +70,7 @@ public:
}
ClientWrapper.reset(new TClientWrapper(driver));
}
- SkipVersionCheck = !AppData(ctx)->PQConfig.GetMetaCacheSkipVersionCheck();
+ SkipVersionCheck = AppData(ctx)->PQConfig.GetMetaCacheSkipVersionCheck();
PathPrefix = TopicPrefix(ctx);
TopicsQuery = TStringBuilder() << "--!syntax_v1\n"
<< "DECLARE $Path as Utf8; DECLARE $Cluster as Utf8; "
@@ -208,8 +218,8 @@ private:
void HandleGetTopicsResult(const TActorContext& ctx) {
auto result = AsyncQueryResult.GetValue();
if (!result.IsSuccess()) {
- LOG_ERROR_S(ctx, NKikimrServices::PQ_METACACHE,
- "Got error trying to get topics: " << result.GetIssues().ToString());
+ LOG_INFO_S(ctx, NKikimrServices::PQ_METACACHE,
+ "Got error trying to get topics: " << result.GetIssues().ToString());
return Reset(ctx);
}
Y_VERIFY(result.GetResultSets().size() == 1);
@@ -233,18 +243,8 @@ private:
FullTopicsCache = nullptr;
while (!ListTopicsWaiters.empty()) {
auto& waiter = ListTopicsWaiters.front();
- switch (waiter.Type) {
- case EWaiterType::ListTopics:
- SendListTopicsResponse(waiter.WaiterId, ctx);
- break;
- case EWaiterType::DescribeAllTopics:
- ProcessDescribeAllTopics(waiter.WaiterId, ctx);
- break;
- default:
- Y_FAIL();
- break;
- }
- ListTopicsWaiters.pop_front();
+ ProcessDescribeAllTopics(waiter, ctx);
+ ListTopicsWaiters.pop();
}
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE,
"Updated topics list with : " << CurrentTopics.size() << " topics");
@@ -268,26 +268,25 @@ private:
RunQuery(ev->Get()->Type, ctx);
}
- void HandleListTopics(TEvPqNewMetaCache::TEvListTopicsRequest::TPtr& ev, const TActorContext& ctx) {
- if (!LastVersionUpdate || !EverGotTopics) {
- ListTopicsWaiters.emplace_back(TWaiter{ev->Sender, {}, EWaiterType::ListTopics});
- } else {
- SendListTopicsResponse(ev->Sender, ctx);
- }
+ void HandleGetVersion(TEvPqNewMetaCache::TEvGetVersionRequest::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send version response: " << CurrentTopicsVersion);
+
+ ctx.Send(ev->Sender, new TEvPqNewMetaCache::TEvGetVersionResponse{CurrentTopicsVersion});
}
void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics with prefix: " << ev->Get()->PathPrefix);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics");
- SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), ctx);
+ SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ctx);
auto inserted = DescribeTopicsWaiters.insert(std::make_pair(
RequestId,
- TWaiter{ev->Sender, std::move(ev->Get()->Topics), EWaiterType::DescribeCustomTopics}
+ TWaiter{ev->Sender, std::move(ev->Get()->Topics)}
)).second;
Y_VERIFY(inserted);
}
void HandleDescribeAllTopics(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe all topics");
if (ev->Get()->PathPrefix && ev->Get()->PathPrefix != PathPrefix) {
auto* response = new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(ev->Get()->PathPrefix);
response->Success = false;
@@ -295,40 +294,34 @@ private:
return;
}
if (!EverGotTopics) {
- ListTopicsWaiters.emplace_back(TWaiter{ev->Sender, {}, EWaiterType::DescribeAllTopics});
+ ListTopicsWaiters.push(ev->Sender);
return;
}
return ProcessDescribeAllTopics(ev->Sender, ctx);
}
- void ProcessDescribeAllTopics(const TActorId& waiterId, const TActorContext& ctx) {
+ void ProcessDescribeAllTopics(const TActorId& waiter, const TActorContext& ctx) {
if (EverGotTopics && CurrentTopics.empty()) {
- SendDescribeAllTopicsResponse(waiterId, ctx, true);
+ SendDescribeAllTopicsResponse(waiter, ctx, true);
return;
}
if (FullTopicsCache && !FullTopicsCacheOutdated) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Respond from cache");
-
- return SendDescribeAllTopicsResponse(waiterId, ctx);
+ return SendDescribeAllTopicsResponse(waiter, ctx);
+ }
+ if (DescribeAllTopicsWaiters.empty()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Make full list SC request");
+ SendSchemeCacheRequest(CurrentTopics, true, true, ctx);
}
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Store waiter");
- SendSchemeCacheRequest(CurrentTopics, true, ctx);
- auto inserted = DescribeTopicsWaiters.insert(std::make_pair(
- RequestId,
- TWaiter{waiterId, {}, EWaiterType::DescribeAllTopics}
- )).second;
- Y_VERIFY(inserted);
+ DescribeAllTopicsWaiters.push(waiter);
FullTopicsCacheOutdated = false;
FullTopicsCache = nullptr;
}
void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
- auto* result = ev->Get()->Request.Release();
- auto waiterIter = DescribeTopicsWaiters.find(result->Instant);
- Y_VERIFY(!waiterIter.IsEnd());
- auto& waiter = waiterIter->second;
-
- if (waiter.Type == EWaiterType::DescribeAllTopics) {
+ auto& result = ev->Get()->Request;
+ if (result->Instant == 0) {
for (const auto& entry : result->ResultSet) {
if (!entry.PQGroupInfo) {
continue;
@@ -342,23 +335,32 @@ private:
FullTopicsCacheOutdated = true;
}
- FullTopicsCache.Reset(result);
+ FullTopicsCache.reset(result.Release());
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Updated topics cache with " << FullTopicsCache->ResultSet.size());
- SendDescribeAllTopicsResponse(waiter.WaiterId, ctx);
+ while (!DescribeAllTopicsWaiters.empty()) {
+ SendDescribeAllTopicsResponse(DescribeAllTopicsWaiters.front(), ctx);
+ DescribeAllTopicsWaiters.pop();
+ }
} else {
- Y_VERIFY(waiter.Type == EWaiterType::DescribeCustomTopics);
+ auto waiterIter = DescribeTopicsWaiters.find(result->Instant);
+ Y_VERIFY(!waiterIter.IsEnd());
+ auto& waiter = waiterIter->second;
+
Y_VERIFY(waiter.Topics.size() == result->ResultSet.size());
auto *response = new TEvPqNewMetaCache::TEvDescribeTopicsResponse{
- std::move(waiter.Topics), result
+ std::move(waiter.Topics), result.Release()
};
ctx.Send(waiter.WaiterId, response);
+ DescribeTopicsWaiters.erase(waiterIter);
}
- DescribeTopicsWaiters.erase(waiterIter);
}
- void SendSchemeCacheRequest(const TVector<TString>& topics, bool addDefaultPathPrefix, const TActorContext& ctx) {
- auto schemeCacheRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(++RequestId);
+ void SendSchemeCacheRequest(
+ const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, const TActorContext& ctx
+ ) {
+ auto instant = isFullListingRequest ? 0 : ++RequestId;
+ auto schemeCacheRequest = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(instant);
for (const auto& path : topics) {
auto split = NKikimr::SplitPath(path);
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
@@ -367,32 +369,33 @@ private:
}
entry.Path.insert(entry.Path.end(), split.begin(), split.end());
- entry.SyncVersion = true;
+ entry.SyncVersion = !isFullListingRequest;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic;
schemeCacheRequest->ResultSet.emplace_back(std::move(entry));
}
- ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.release()));
- }
-
- void SendListTopicsResponse(const TActorId& recipient, const TActorContext& ctx) {
- auto* response = new TEvPqNewMetaCache::TEvListTopicsResponse();
- response->Topics = CurrentTopics;
- response->TopicsVersion = CurrentTopicsVersion;
- ctx.Send(recipient, response);
+ ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.Release()));
}
void SendDescribeAllTopicsResponse(const TActorId& recipient, const TActorContext& ctx, bool empty = false) {
- NSchemeCache::TSchemeCacheNavigate* scResponse;
if (empty) {
- scResponse = new NSchemeCache::TSchemeCacheNavigate();
+ ctx.Send(
+ recipient,
+ new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(
+ PathPrefix, std::make_shared<NSchemeCache::TSchemeCacheNavigate>()
+ )
+ );
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send empty describe all topics response");
+ return;
} else {
- scResponse = new NSchemeCache::TSchemeCacheNavigate(*FullTopicsCache);
+ ctx.Send(
+ recipient,
+ new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(
+ PathPrefix, FullTopicsCache
+ )
+ );
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send describe all topics response with " << FullTopicsCache->ResultSet.size() << " topics");
+
}
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Send describe all topics response with " << scResponse->ResultSet.size() << " topics");
- auto* response = new TEvPqNewMetaCache::TEvDescribeAllTopicsResponse(
- PathPrefix, scResponse
- );
- ctx.Send(recipient, response);
}
static NActors::TActorSystem* GetActorSystem() {
@@ -406,16 +409,16 @@ public:
}
STRICT_STFUNC(StateFunc,
- SFunc(NActors::TEvents::TEvWakeup, StartSession)
- SFunc(TEvPqNewMetaCache::TEvSessionStarted, HandleSessionStarted)
- SFunc(TEvPqNewMetaCache::TEvQueryPrepared, HandleTopicsQueryPrepared)
- HFunc(TEvPqNewMetaCache::TEvQueryComplete, HandleQueryComplete)
- HFunc(TEvPqNewMetaCache::TEvRestartQuery, HandleRestartQuery)
-
- HFunc(TEvPqNewMetaCache::TEvListTopicsRequest, HandleListTopics)
- HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics)
- HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics)
- HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse)
+ SFunc(NActors::TEvents::TEvWakeup, StartSession)
+ SFunc(TEvPqNewMetaCache::TEvSessionStarted, HandleSessionStarted)
+ SFunc(TEvPqNewMetaCache::TEvQueryPrepared, HandleTopicsQueryPrepared)
+ HFunc(TEvPqNewMetaCache::TEvQueryComplete, HandleQueryComplete)
+ HFunc(TEvPqNewMetaCache::TEvRestartQuery, HandleRestartQuery)
+
+ HFunc(TEvPqNewMetaCache::TEvGetVersionRequest, HandleGetVersion)
+ HFunc(TEvPqNewMetaCache::TEvDescribeTopicsRequest, HandleDescribeTopics)
+ HFunc(TEvPqNewMetaCache::TEvDescribeAllTopicsRequest, HandleDescribeAllTopics)
+ HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleSchemeCacheResponse)
)
private:
@@ -428,11 +431,6 @@ private:
struct TWaiter {
TActorId WaiterId;
TVector<TString> Topics;
- EWaiterType Type;
-
- static TWaiter AllTopicsWaiter(const TActorId& waiterId) {
- return TWaiter{waiterId, {}, EWaiterType::DescribeAllTopics};
- }
};
struct TTopicKey {
@@ -443,11 +441,6 @@ private:
class TClientWrapper {
public:
- TClientWrapper(const TClientWrapper&) = delete;
- TClientWrapper& operator=(const TClientWrapper&) = delete;
- TClientWrapper(TClientWrapper&&) = default;
- TClientWrapper& operator=(TClientWrapper&&) = default;
-
TClientWrapper(ui64 driverPort)
: DriverPort(driverPort)
{}
@@ -456,6 +449,10 @@ private:
: Driver(driver)
{}
+ TClientWrapper(const TAtomicSharedPtr<TTableClient>& tableClient)
+ : TableClient(tableClient)
+ {}
+
void Reset() {
if (DriverPort.Defined()) { // Own driver
if (DriverHolder == nullptr) {
@@ -463,18 +460,16 @@ private:
auto driverConfig = NYdb::TDriverConfig().SetEndpoint(endpoint);
DriverHolder.Reset(new NYdb::TDriver(driverConfig));
Driver = DriverHolder.Get();
- TableClient.reset(new NYdb::NTable::TTableClient(*Driver));
+ TableClient.Reset(new NYdb::NTable::TTableClient(*Driver));
}
} else if (Driver != nullptr) {
- TableClient.reset(new NYdb::NTable::TTableClient(*Driver));
+ TableClient = MakeAtomicShared<NYdb::NTable::TTableClient>(*Driver);
}
}
-
NYdb::NTable::TTableClient* GetClient() {
Y_VERIFY(TableClient);
- return TableClient.get();
+ return TableClient.Get();
}
-
void Stop() {
if (DriverHolder != nullptr) {
TableClient->Stop();
@@ -487,8 +482,9 @@ private:
private:
THolder<NYdb::TDriver> DriverHolder;
NYdb::TDriver* Driver = nullptr;
+ TAtomicSharedPtr<NYdb::NTable::TTableClient> TableClient;
TMaybe<ui64> DriverPort;
- std::unique_ptr<NYdb::NTable::TTableClient> TableClient;
+
};
NMonitoring::TDynamicCounterPtr Counters;
@@ -512,11 +508,12 @@ private:
TDuration VersionCheckInterval = TDuration::Seconds(1);
TInstant LastVersionUpdate = TInstant::Zero();
- TDeque<TWaiter> ListTopicsWaiters;
+ TQueue<TActorId> ListTopicsWaiters;
+ TQueue<TActorId> DescribeAllTopicsWaiters;
THashMap<ui64, TWaiter> DescribeTopicsWaiters;
ui64 RequestId = 1;
- THolder<NSchemeCache::TSchemeCacheNavigate> FullTopicsCache;
+ std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> FullTopicsCache;
bool FullTopicsCacheOutdated = false;
NActors::TActorId SchemeCacheId;
TString PathPrefix;
@@ -533,6 +530,13 @@ IActor* CreatePQMetaCache(ui64 grpcPort, const NMonitoring::TDynamicCounterPtr&
return new TPersQueueMetaCacheActor(grpcPort, counters, versionCheckInterval);
}
+IActor* CreatePQMetaCache(
+ const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient, const NActors::TActorId& schemeBoardCacheId,
+ const TDuration& versionCheckInterval
+) {
+ return new TPersQueueMetaCacheActor(tableClient, schemeBoardCacheId, versionCheckInterval);
+}
+
} // namespace NPqMetaCacheV2
} // namespace NKikimr::NMsgBusProxy
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h
index 6c12b76b041..a125e7b81ac 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.h
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.h
@@ -42,8 +42,8 @@ struct TEvPqNewMetaCache {
EvQueryPrepared,
EvQueryComplete,
EvRestartQuery,
- EvListTopicsRequest,
- EvListTopicsResponse,
+ EvGetVersionRequest,
+ EvGetVersionResponse,
EvDescribeTopicsRequest,
EvDescribeTopicsResponse,
EvDescribeAllTopicsRequest,
@@ -74,11 +74,14 @@ struct TEvPqNewMetaCache {
: Type(type) {}
};
- struct TEvListTopicsRequest : public TEventLocal<TEvListTopicsRequest, EvListTopicsRequest> {
+ struct TEvGetVersionRequest : public TEventLocal<TEvGetVersionRequest, EvGetVersionRequest> {
};
- struct TEvListTopicsResponse : public TEventLocal<TEvListTopicsResponse, EvListTopicsResponse> {
- TVector<TString> Topics;
+ struct TEvGetVersionResponse : public TEventLocal<TEvGetVersionResponse, EvGetVersionResponse> {
+ TEvGetVersionResponse(ui64 version)
+ : TopicsVersion(version)
+ {}
+
ui64 TopicsVersion;
};
@@ -100,7 +103,7 @@ struct TEvPqNewMetaCache {
struct TEvDescribeTopicsResponse : public TEventLocal<TEvDescribeTopicsResponse, EvDescribeTopicsResponse> {
TVector<TString> TopicsRequested;
- THolder<NSchemeCache::TSchemeCacheNavigate> Result;
+ std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> Result;
explicit TEvDescribeTopicsResponse(const TVector<TString>& topics,
NSchemeCache::TSchemeCacheNavigate* result)
@@ -120,11 +123,11 @@ struct TEvPqNewMetaCache {
struct TEvDescribeAllTopicsResponse : public TEventLocal<TEvDescribeAllTopicsResponse, EvDescribeAllTopicsResponse> {
bool Success = true;
TString Path;
- THolder<NSchemeCache::TSchemeCacheNavigate> Result;
+ std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> Result;
explicit TEvDescribeAllTopicsResponse(const TString& path)
: Path(path)
{}
- TEvDescribeAllTopicsResponse(const TString& path, NSchemeCache::TSchemeCacheNavigate* result)
+ TEvDescribeAllTopicsResponse(const TString& path, const std::shared_ptr<NSchemeCache::TSchemeCacheNavigate>& result)
: Path(path)
, Result(result)
{}
@@ -137,6 +140,11 @@ IActor* CreatePQMetaCache(ui64 grpcPort,
const NMonitoring::TDynamicCounterPtr& counters,
const TDuration& versionCheckInterval = TDuration::Seconds(1));
+IActor* CreatePQMetaCache(const TAtomicSharedPtr<NYdb::NTable::TTableClient>& tableClient,
+ const NActors::TActorId& schemeBoardCacheId,
+ const TDuration& versionCheckInterval = TDuration::Seconds(1));
+
+
} // namespace NPqMetaCacheV2
} //namespace NKikimr::NMsgBusProxy
diff --git a/ydb/core/client/server/pq_metacache_v2_ut.cpp b/ydb/core/client/server/pq_metacache_v2_ut.cpp
index f02fd420c9a..39383a462b1 100644
--- a/ydb/core/client/server/pq_metacache_v2_ut.cpp
+++ b/ydb/core/client/server/pq_metacache_v2_ut.cpp
@@ -27,7 +27,8 @@ class TPqMetaCacheV2Test: public TTestBase {
settings.SetDomainName("Root");
settings.SetDomain(0);
settings.SetUseRealThreads(true);
- Server = new NKikimr::Tests::TServer(settings);
+ //settings.PQConfig.SetMetaCacheSkipVersionCheck(true);
+ Server = MakeHolder<NKikimr::Tests::TServer>(settings);
Server->EnableGRpc(NGrpc::TServerOptions().SetHost("localhost").SetPort(GrpcServerPort));
auto* runtime = Server->GetRuntime();
@@ -38,15 +39,15 @@ class TPqMetaCacheV2Test: public TTestBase {
runtime->SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_EMERG);
runtime->SetLogPriority(NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, NActors::NLog::PRI_EMERG);
- Client = MakeHolder<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(settings, GrpcServerPort);
+ Client = std::make_shared<NKikimr::NPersQueueTests::TFlatMsgBusPQClient>(settings, GrpcServerPort);
Client->InitRootScheme();
NYdb::TDriverConfig driverCfg;
driverCfg.SetEndpoint(TStringBuilder() << "localhost:" << GrpcServerPort);
- YdbDriver = new NYdb::TDriver(driverCfg);
- TableClient = new NYdb::NTable::TTableClient(*YdbDriver);
- PQClient = new NYdb::NPersQueue::TPersQueueClient(*YdbDriver);
+ YdbDriver = std::make_shared<NYdb::TDriver>(driverCfg);
+ TableClient.Reset(new NYdb::NTable::TTableClient(*YdbDriver));
+ PQClient = std::make_shared<NYdb::NPersQueue::TPersQueueClient>(*YdbDriver);
Client->MkDir("/Root", "PQ");
Client->MkDir("/Root/PQ", "Config");
@@ -63,7 +64,7 @@ class TPqMetaCacheV2Test: public TTestBase {
tableDesc = TTableBuilder()
.AddNullableColumn("name", EPrimitiveType::String)
- .AddNullableColumn("version", EPrimitiveType::Uint64)
+ .AddNullableColumn("version", EPrimitiveType::Int64)
.SetPrimaryKeyColumns({"name", "version"})
.Build();
CheckYdbResult(session.CreateTable("/Root/PQ/Config/V2/Versions", std::move(tableDesc)));
@@ -78,7 +79,7 @@ class TPqMetaCacheV2Test: public TTestBase {
}
SchemeCacheId = runtime->Register(CreateSchemeBoardSchemeCache(config.Get()));
MetaCacheId = runtime->Register(
- NPqMetaCacheV2::CreatePQMetaCache(config->Counters, TDuration::MilliSeconds(50))
+ NPqMetaCacheV2::CreatePQMetaCache(GrpcServerPort, config->Counters, TDuration::MilliSeconds(50))
);
runtime->EnableScheduleForActor(SchemeCacheId, true);
runtime->EnableScheduleForActor(MetaCacheId, true);
@@ -119,7 +120,7 @@ class TPqMetaCacheV2Test: public TTestBase {
auto builder = versionPrepared.GetParamsBuilder();
{
auto &param = builder.AddParam("$Version");
- param.Uint64(++Version);
+ param.Int64(++Version);
param.Build();
}
CheckYdbResult(versionPrepared.Execute(txControl, builder.Build()));
@@ -130,24 +131,33 @@ class TPqMetaCacheV2Test: public TTestBase {
}
}
- THolder<TEvPqNewMetaCache::TEvDescribeTopicsResponse> DoMetaCacheRequest(const TVector<TTopicInfo>& topicList) {
+ template<class TEvType>
+ THolder<TEvType> DoMetaCacheRequest(const TVector<TTopicInfo>& topicList = {}) {
IEventBase* ev;
if (topicList.empty()) {
ev = new TEvPqNewMetaCache::TEvDescribeAllTopicsRequest();
} else {
TVector<TString> topicNames;
for (const auto &topic : topicList) {
- topicNames.emplace_back(std::move(::NPersQueue::BuildFullTopicName(topic.Path, topic.Cluster)));
+ topicNames.emplace_back(TString("/Root/PQ/") + ::NPersQueue::BuildFullTopicName(topic.Path, topic.Cluster));
}
ev = new TEvPqNewMetaCache::TEvDescribeTopicsRequest(topicNames);
}
auto handle = new IEventHandle(MetaCacheId, EdgeActorId, ev);
Server->GetRuntime()->Send(handle);
- auto response = Server->GetRuntime()->GrabEdgeEvent<TEvPqNewMetaCache::TEvDescribeTopicsResponse>();
+ auto response = Server->GetRuntime()->GrabEdgeEvent<TEvType>();
return std::move(response);
}
- void CheckTopicInfo(const TVector<TTopicInfo>& expected, TSchemeCacheNavigate* result) {
+ THolder<TEvPqNewMetaCache::TEvDescribeTopicsResponse> DescribeTopics(const TVector<TTopicInfo>& topicList) {
+ return DoMetaCacheRequest<TEvPqNewMetaCache::TEvDescribeTopicsResponse>(topicList);
+ }
+
+ THolder<TEvPqNewMetaCache::TEvDescribeAllTopicsResponse> DescribeAllTopics() {
+ return DoMetaCacheRequest<TEvPqNewMetaCache::TEvDescribeAllTopicsResponse>();
+ }
+
+ void CheckTopicInfo(const TVector<TTopicInfo>& expected, std::shared_ptr<NSchemeCache::TSchemeCacheNavigate> result) {
ui64 i = 0;
Cerr << "=== Got cache navigate response: \n" << result->ToString(NKikimr::NScheme::TTypeRegistry()) << Endl;
Cerr << "=== Expect to have " << expected.size() << " records, got: " << result->ResultSet.size() << " records" << Endl;
@@ -170,9 +180,9 @@ class TPqMetaCacheV2Test: public TTestBase {
auto endTime = TInstant::Now() + timeout;
auto* runtime = Server->GetRuntime();
while (endTime > TInstant::Now()) {
- auto handle = new IEventHandle(MetaCacheId, EdgeActorId, new TEvPqNewMetaCache::TEvListTopicsRequest());
+ auto handle = new IEventHandle(MetaCacheId, EdgeActorId, new TEvPqNewMetaCache::TEvGetVersionRequest());
runtime->Send(handle);
- auto response = runtime->GrabEdgeEvent<TEvPqNewMetaCache::TEvListTopicsResponse>();
+ auto response = runtime->GrabEdgeEvent<TEvPqNewMetaCache::TEvGetVersionResponse>();
currentVersion = response->TopicsVersion;
if (currentVersion >= version) {
Cerr << "=== Got current topics version: " << currentVersion << Endl;
@@ -183,51 +193,126 @@ class TPqMetaCacheV2Test: public TTestBase {
UNIT_FAIL("Wait for topics version timed out");
}
+ TActorId MakeEdgeTargetedMetaCache() {
+ auto anotherMetaCacheId = Server->GetRuntime()->Register(
+ NPqMetaCacheV2::CreatePQMetaCache(TableClient, EdgeActorId)
+
+ );
+ return anotherMetaCacheId;
+ }
+
void TestDescribeTopics() {
auto topics = TVector<TTopicInfo>({
- {"topic1", "man", true},
- {"topic2", "man", true},
- {"topic3", "man", false}
- });
+ {"topic1", "man", true},
+ {"topic2", "man", true},
+ {"topic3", "man", false}
+ });
AddTopics(topics);
- auto ev = DoMetaCacheRequest(topics);
- CheckTopicInfo(topics, ev->Result.Get());
+ auto ev = DescribeTopics(topics);
+ CheckTopicInfo(topics, ev->Result);
UNIT_ASSERT(ev);
}
+
+ void TestDescribeManyTopics() {
+ auto topics = TVector<TTopicInfo>({
+ {"topic1", "man", true},
+ {"topic2", "man", false},
+ {"topic3", "man", false}
+ });
+
+ AddTopics(topics);
+
+ Cerr << "===Wait base response\n";
+ auto baseResponse = DescribeTopics(topics);
+ Cerr << "===Got base response\n";
+ auto baseEntry = baseResponse->Result->ResultSet[0];
+
+ TVector<TTopicInfo> lotOfTopics;
+ for (auto i = 0u; i < 10000u; i++) {
+ lotOfTopics.emplace_back(TTopicInfo{TString("topic0") + ToString(i), "man", false});
+ }
+ AddTopics(lotOfTopics);
+ ui64 totalTopics = topics.size() + lotOfTopics.size();
+ auto secondMetaCache = MakeEdgeTargetedMetaCache();
+ Cerr << "===Registered secondary meta cache: " << secondMetaCache.ToString() << Endl;
+ auto* runtime = Server->GetRuntime();
+
+ auto sendRequest = [&]() {
+ auto* ev = new TEvPqNewMetaCache::TEvDescribeAllTopicsRequest();
+ auto handle = new IEventHandle(secondMetaCache, EdgeActorId, ev);
+ runtime->Send(handle);
+ };
+ Cerr << "===Send first request\n";
+ sendRequest();
+
+ auto scRequest = runtime->GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySet>();
+ Cerr << "===Got SC request\n";
+ auto& entries = scRequest->Request->ResultSet;
+ UNIT_ASSERT_VALUES_EQUAL(entries.size(), totalTopics);
+ for (auto i = 0u; i < entries.size(); i++) {
+ auto initialPath = entries[i].Path;
+ entries[i] = baseEntry;
+ entries[i].Path = initialPath;
+ }
+ auto* scResponse = new TEvTxProxySchemeCache::TEvNavigateKeySetResult(scRequest->Request);
+
+ auto handle = new IEventHandle(secondMetaCache, EdgeActorId, scResponse);
+ Cerr << "===Send fake SC response\n";
+ runtime->Send(handle);
+
+ Cerr << "===Wait secondary metacache response\n";
+ auto response = runtime->GrabEdgeEvent<TEvPqNewMetaCache::TEvDescribeAllTopicsResponse>();
+ Cerr << "===Got secondary metacache response\n";
+
+ // Preparation done, now can send many requests
+ Cerr << "===Started pushing requests at: " << TInstant::Now() << Endl;
+ for (auto i = 0u; i < 1000; i++) {
+ sendRequest();
+ }
+ Cerr << "===Done pushing requests at: " << TInstant::Now() << Endl;
+ for (auto i = 0u; i < 1000; i++) {
+ response = runtime->GrabEdgeEvent<TEvPqNewMetaCache::TEvDescribeAllTopicsResponse>();
+ if (!i) {
+ Cerr << "===Done gathering first response at: " << TInstant::Now() << Endl;
+ }
+ }
+ Cerr << "===Done gathering responses at: " << TInstant::Now() << Endl;
+ }
+
void TestDescribeAllTopics() {
auto topics = TVector<TTopicInfo>({
- {"topic1", "man", true},
- {"topic2", "man", true},
- {"topic3", "man", false}
+ {"topic1", "man", true},
+ {"topic2", "man", true},
+ {"topic3", "man", false}
});
AddTopics(topics);
- auto ev = DoMetaCacheRequest({});
- CheckTopicInfo(topics, ev->Result.Get());
+ auto ev = DescribeAllTopics();
+ CheckTopicInfo(topics, ev->Result);
UNIT_ASSERT(ev);
}
void TestTopicsUpdate() {
auto topics = TVector<TTopicInfo>({
- {"topic1", "man", true},
- {"topic2", "man", true},
- {"topic3", "man", false}
- });
+ {"topic1", "man", true},
+ {"topic2", "man", true},
+ {"topic3", "man", false}
+ });
AddTopics(topics);
- CheckTopicInfo(topics, DoMetaCacheRequest(topics)->Result.Get());
+ CheckTopicInfo(topics, DescribeTopics(topics)->Result);
WaitForVersion(Version);
TTopicInfo topic{"topic1", "sas", true};
AddTopics({topic}, false);
- CheckTopicInfo(topics, DoMetaCacheRequest({})->Result.Get());
+ CheckTopicInfo(topics, DescribeAllTopics()->Result);
AddTopics({}, true);
- Cerr << "===Wait for version: " << Version << Endl;
WaitForVersion(Version);
topics.insert(topics.end() - 1, topic);
- CheckTopicInfo(topics, DoMetaCacheRequest({})->Result.Get());
+ CheckTopicInfo(topics, DescribeAllTopics()->Result);
}
+
template<class T>
T CheckYdbResult(NThreading::TFuture<T>&& asyncResult) {
auto res = asyncResult.GetValueSync();
@@ -259,7 +344,7 @@ private:
std::shared_ptr<NKikimr::NPersQueueTests::TFlatMsgBusPQClient> Client;
std::shared_ptr<NYdb::TDriver> YdbDriver;
- std::shared_ptr<NYdb::NTable::TTableClient> TableClient;
+ TAtomicSharedPtr<NYdb::NTable::TTableClient> TableClient;
std::shared_ptr<NYdb::NPersQueue::TPersQueueClient> PQClient;
TString UpsertTopicQuery = TStringBuilder()
<< "--!syntax_v1\n"
@@ -268,8 +353,8 @@ private:
<< "(path, dc) VALUES ($Path, $Cluster);";
TString UpdateVersionQuery = TStringBuilder()
<< "--!syntax_v1\n"
- << "DECLARE $Version as Uint64; "
- << "UPSERT INTO `/Root/PQ/Config/V2/Version`"
+ << "DECLARE $Version as Int64; "
+ << "UPSERT INTO `/Root/PQ/Config/V2/Versions`"
<< "(name, version) VALUES ('Topics', $Version);";
diff --git a/ydb/core/client/server/ut/CMakeLists.darwin.txt b/ydb/core/client/server/ut/CMakeLists.darwin.txt
index 15ec30f11a2..b993911e557 100644
--- a/ydb/core/client/server/ut/CMakeLists.darwin.txt
+++ b/ydb/core/client/server/ut/CMakeLists.darwin.txt
@@ -27,6 +27,7 @@ target_link_libraries(ydb-core-client-server-ut PUBLIC
core-testlib-actors
)
target_sources(ydb-core-client-server-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/client/server/pq_metacache_v2_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
)
add_test(
diff --git a/ydb/core/client/server/ut/CMakeLists.linux.txt b/ydb/core/client/server/ut/CMakeLists.linux.txt
index f4b6f53ae04..8e7efa1338f 100644
--- a/ydb/core/client/server/ut/CMakeLists.linux.txt
+++ b/ydb/core/client/server/ut/CMakeLists.linux.txt
@@ -28,6 +28,7 @@ target_link_libraries(ydb-core-client-server-ut PUBLIC
core-testlib-actors
)
target_sources(ydb-core-client-server-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/client/server/pq_metacache_v2_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_pq_metarequest_ut.cpp
)
add_test(
diff --git a/ydb/core/client/server/ut/ya.make b/ydb/core/client/server/ut/ya.make
index 409979461df..493afd12e85 100644
--- a/ydb/core/client/server/ut/ya.make
+++ b/ydb/core/client/server/ut/ya.make
@@ -21,6 +21,7 @@ PEERDIR(
YQL_LAST_ABI_VERSION()
SRCS(
+ pq_metacache_v2_ut.cpp
msgbus_server_pq_metarequest_ut.cpp
)
diff --git a/ydb/core/testlib/mock_pq_metacache.h b/ydb/core/testlib/mock_pq_metacache.h
index 1280f877792..e132cd5f9ac 100644
--- a/ydb/core/testlib/mock_pq_metacache.h
+++ b/ydb/core/testlib/mock_pq_metacache.h
@@ -69,7 +69,7 @@ public:
response->Success = success;
auto* result = new NSchemeCache::TSchemeCacheNavigate();
result->ResultSet = resultSet;
- response->Result.Reset(result);
+ response->Result.reset(result);
ctx.Send(ev->Sender, std::move(response));
};
diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
index f208728706a..6531afc7d3b 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
@@ -384,7 +384,7 @@ void TWriteSessionActor::InitCheckSchema(const TActorContext& ctx, bool needWait
}
void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx) {
- auto* res = ev->Get()->Result.Get();
+ auto& res = ev->Get()->Result;
Y_VERIFY(res->ResultSet.size() == 1);
auto& entry = res->ResultSet[0];