aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-03-25 23:41:01 +0300
committeralexnick <alexnick@yandex-team.ru>2022-03-25 23:41:01 +0300
commitd2fb88bc290f27a26b3ed7eb1932c5fb94cfd813 (patch)
tree6d87d1c8ab1dbce4c7606d2718019ff81e68055c
parent4d69101bb995dd4c7a9afc5ae460a35af6f51644 (diff)
downloadydb-d2fb88bc290f27a26b3ed7eb1932c5fb94cfd813.tar.gz
refactor read sessions actor in pqv0
ref:1630ee7f08876e125edb62cd8a45bf3ac0130ab6
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.cpp8
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.h7
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_actor.h3
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp9
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write.cpp10
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write.h5
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_write_actor.cpp77
-rw-r--r--ydb/services/persqueue_v1/persqueue.cpp2
8 files changed, 33 insertions, 88 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
index 45211485f5..6b752a60e8 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
@@ -216,7 +216,7 @@ private:
void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics");
- SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ctx);
+ SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ev->Get()->SyncVersion, ctx);
auto inserted = DescribeTopicsWaiters.insert(std::make_pair(
RequestId,
TWaiter{ev->Sender, std::move(ev->Get()->Topics)}
@@ -252,7 +252,7 @@ private:
}
if (DescribeAllTopicsWaiters.empty()) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Make full list SC request");
- SendSchemeCacheRequest(CurrentTopics, true, true, ctx);
+ SendSchemeCacheRequest(CurrentTopics, true, true, false, ctx);
}
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Store waiter");
DescribeAllTopicsWaiters.push(waiter);
@@ -298,7 +298,7 @@ private:
void SendSchemeCacheRequest(
- const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, const TActorContext& ctx
+ const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, bool syncVersion, const TActorContext& ctx
) {
auto instant = isFullListingRequest ? 0 : ++RequestId;
auto schemeCacheRequest = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(instant);
@@ -310,7 +310,7 @@ private:
}
entry.Path.insert(entry.Path.end(), split.begin(), split.end());
- entry.SyncVersion = !isFullListingRequest;
+ entry.SyncVersion = syncVersion;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic;
schemeCacheRequest->ResultSet.emplace_back(std::move(entry));
}
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h
index 5c1d6d6471..f1d8248ef3 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.h
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.h
@@ -65,16 +65,19 @@ struct TEvPqNewMetaCache {
struct TEvDescribeTopicsRequest : public TEventLocal<TEvDescribeTopicsRequest, EvDescribeTopicsRequest> {
TString PathPrefix;
TVector<TString> Topics;
+ bool SyncVersion;
TEvDescribeTopicsRequest() = default;
- explicit TEvDescribeTopicsRequest(const TVector<TString>& topics)
+ explicit TEvDescribeTopicsRequest(const TVector<TString>& topics, bool syncVersion = true)
: Topics(topics)
+ , SyncVersion(syncVersion)
{}
- TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix)
+ TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix, bool syncVersion = true)
: PathPrefix(pathPrefix)
, Topics(topics)
+ , SyncVersion(syncVersion)
{}
};
diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h
index afc4d8f69e..7c11732d8e 100644
--- a/ydb/services/persqueue_v1/grpc_pq_actor.h
+++ b/ydb/services/persqueue_v1/grpc_pq_actor.h
@@ -457,7 +457,7 @@ using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache
public:
TWriteSessionActor(NKikimr::NGRpcService::TEvStreamPQWriteRequest* request, const ui64 cookie,
- const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
+ const NActors::TActorId& schemeCache,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC,
const NPersQueue::TTopicsListController& topicsController);
~TWriteSessionActor();
@@ -573,7 +573,6 @@ private:
EState State;
TActorId SchemeCache;
- TActorId NewSchemeCache;
TActorId Writer;
TString PeerName;
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index 9f50b6511b..d7209ef46b 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -288,9 +288,6 @@ void TReadSessionActor::Bootstrap(const TActorContext& ctx) {
Die(ctx);
return;
}
- //auto& pqConfig = AppData(ctx)->PQConfig;
- //bool noDcMode = !pqConfig.GetTopicsAreFirstClassCitizen(); // ToDo[migration] - add multicluster mode
- //ConverterFactory = MakeHolder<NPersQueue::TTopicNamesConverterFactory>(noDcMode, pqConfig.GetRoot());
StartTime = ctx.Now();
Become(&TThis::StateFunc);
@@ -738,6 +735,10 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo
auto converter = TopicsHandler.GetConverterFactory()->MakeTopicNameConverter(
topic.topic(), TString(), Request->GetDatabaseName().GetOrElse(TString())
);
+ if (!converter->IsValid()) {
+ CloseSession(TStringBuilder() << "invalid topic '" << topic.topic() << "' in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return;
+ }
const auto topicName = converter->GetModernName();
if (topicName.empty()) {
CloseSession("empty topic in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
@@ -2646,7 +2647,7 @@ void TReadInitAndAuthActor::Bootstrap(const TActorContext &ctx) {
topicNames.emplace_back(topic.second.TopicNameConverter->GetPrimaryPath());
}
DoCheckACL = AppData(ctx)->PQConfig.GetCheckACL() && Token;
- ctx.Send(MetaCacheId, new TEvDescribeTopicsRequest(topicNames));
+ ctx.Send(MetaCacheId, new TEvDescribeTopicsRequest(topicNames, true));
}
void TReadInitAndAuthActor::Die(const TActorContext& ctx) {
diff --git a/ydb/services/persqueue_v1/grpc_pq_write.cpp b/ydb/services/persqueue_v1/grpc_pq_write.cpp
index 5f17a2f3ab..d49c579077 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write.cpp
@@ -18,17 +18,16 @@ using namespace PersQueue::V1;
///////////////////////////////////////////////////////////////////////////////
-IActor* CreatePQWriteService(const TActorId& schemeCache, const TActorId& newSchemeCache,
+IActor* CreatePQWriteService(const TActorId& schemeCache,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const ui32 maxSessions) {
- return new TPQWriteService(schemeCache, newSchemeCache, counters, maxSessions);
+ return new TPQWriteService(schemeCache, counters, maxSessions);
}
-TPQWriteService::TPQWriteService(const TActorId& schemeCache, const TActorId& newSchemeCache,
+TPQWriteService::TPQWriteService(const TActorId& schemeCache,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const ui32 maxSessions)
: SchemeCache(schemeCache)
- , NewSchemeCache(newSchemeCache)
, Counters(counters)
, MaxSessions(maxSessions)
, Enabled(false)
@@ -176,7 +175,6 @@ void TPQWriteService::Handle(NKikimr::NGRpcService::TEvStreamPQWriteRequest::TPt
return;
}
-
TString localCluster = AvailableLocalCluster(ctx);
if (HaveClusters && localCluster.empty()) {
ev->Get()->GetStreamCtx()->Attach(ctx.SelfID);
@@ -198,7 +196,7 @@ void TPQWriteService::Handle(NKikimr::NGRpcService::TEvStreamPQWriteRequest::TPt
auto ip = ev->Get()->GetStreamCtx()->GetPeerName();
TActorId worker = ctx.Register(new TWriteSessionActor(
- ev->Release().Release(), cookie, SchemeCache, NewSchemeCache, Counters,
+ ev->Release().Release(), cookie, SchemeCache, Counters,
DatacenterClassifier ? DatacenterClassifier->ClassifyAddress(NAddressClassifier::ExtractAddress(ip)) : "unknown",
*TopicsHandler
));
diff --git a/ydb/services/persqueue_v1/grpc_pq_write.h b/ydb/services/persqueue_v1/grpc_pq_write.h
index 73148388b2..5f29df94ba 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write.h
+++ b/ydb/services/persqueue_v1/grpc_pq_write.h
@@ -21,12 +21,12 @@ inline TActorId GetPQWriteServiceActorID() {
return TActorId(0, "PQWriteSvc");
}
-IActor* CreatePQWriteService(const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
+IActor* CreatePQWriteService(const NActors::TActorId& schemeCache,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const ui32 maxSessions);
class TPQWriteService : public NActors::TActorBootstrapped<TPQWriteService> {
public:
- TPQWriteService(const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
+ TPQWriteService(const NActors::TActorId& schemeCache,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const ui32 maxSessions);
~TPQWriteService()
@@ -60,7 +60,6 @@ private:
void Handle(TEvPQProxy::TEvSessionDead::TPtr& ev, const TActorContext& ctx);
NActors::TActorId SchemeCache;
- NActors::TActorId NewSchemeCache;
TAtomic LastCookie = 0;
diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
index 6531afc7d3..c40ced868e 100644
--- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp
@@ -89,14 +89,13 @@ static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1);
TWriteSessionActor::TWriteSessionActor(
NKikimr::NGRpcService::TEvStreamPQWriteRequest* request, const ui64 cookie,
- const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
+ const NActors::TActorId& schemeCache,
TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC,
const NPersQueue::TTopicsListController& topicsController
)
: Request(request)
, State(ES_CREATED)
, SchemeCache(schemeCache)
- , NewSchemeCache(newSchemeCache)
, PeerName("")
, Cookie(cookie)
, TopicsController(topicsController)
@@ -112,7 +111,7 @@ TWriteSessionActor::TWriteSessionActor(
, Token(nullptr)
, UpdateTokenInProgress(false)
, UpdateTokenAuthenticated(false)
- , ACLCheckInProgress(false)
+ , ACLCheckInProgress(true)
, FirstACLCheck(true)
, RequestNotChecked(false)
, LastACLCheckTimestamp(TInstant::Zero())
@@ -341,6 +340,10 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor
void TWriteSessionActor::SetupCounters()
{
+ if (SessionsCreated) {
+ return;
+ }
+
//now topic is checked, can create group for real topic, not garbage
auto subGroup = GetServiceCounters(Counters, "pqproxy|writeSession");
TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabels(TopicConverter->GetClientsideName());
@@ -357,6 +360,10 @@ void TWriteSessionActor::SetupCounters()
void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& dbId, const TString& folderId)
{
+ if (SessionsCreated) {
+ return;
+ }
+
//now topic is checked, can create group for real topic, not garbage
auto subGroup = NKikimr::NPQ::GetCountersForStream(Counters);
TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabelsForStream(TopicConverter->GetClientsideName(), cloudId, dbId, folderId);
@@ -415,7 +422,7 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo
SetupCounters();
}
- Y_VERIFY (entry.SecurityObject);
+ Y_VERIFY(entry.SecurityObject);
ACL.Reset(new TAclWrapper(entry.SecurityObject));
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " describe result for acl check");
@@ -437,68 +444,6 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo
}
}
-void TWriteSessionActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
- TEvTxProxySchemeCache::TEvNavigateKeySetResult* msg = ev->Get();
- const NSchemeCache::TSchemeCacheNavigate* navigate = msg->Request.Get();
- Y_VERIFY(navigate->ResultSet.size() == 1);
- if (navigate->ErrorCount > 0) {
- const NSchemeCache::TSchemeCacheNavigate::EStatus status = navigate->ResultSet.front().Status;
- return CloseSession(
- TStringBuilder() << "Failed to read ACL for '" << TopicConverter->GetClientsideName()
- << "' Scheme cache error : " << status,
- PersQueue::ErrorCode::ERROR, ctx
- );
- }
- if (!navigate->ResultSet.front().PQGroupInfo) {
- return CloseSession(
- TStringBuilder() << "topic '" << TopicConverter->GetClientsideName() << "' describe error"
- << ", reason: could not retrieve topic description",
- PersQueue::ErrorCode::ERROR, ctx
- );
- }
-
- const auto& pqDescription = navigate->ResultSet.front().PQGroupInfo->Description;
-
- Y_VERIFY(pqDescription.PartitionsSize() > 0);
- Y_VERIFY(pqDescription.HasPQTabletConfig());
- InitialPQTabletConfig = pqDescription.GetPQTabletConfig();
-
- if (!pqDescription.HasBalancerTabletID()) {
- TString errorReason = Sprintf("topic '%s' has no balancer, Marker# PQ93", TopicConverter->GetClientsideName().c_str());
- CloseSession(errorReason, PersQueue::ErrorCode::UNKNOWN_TOPIC, ctx);
- return;
- }
-
- BalancerTabletId = pqDescription.GetBalancerTabletID();
-
- for (ui32 i = 0; i < pqDescription.PartitionsSize(); ++i) {
- const auto& pi = pqDescription.GetPartitions(i);
- PartitionToTablet[pi.GetPartitionId()] = pi.GetTabletId();
- }
-
- if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
- const auto& tabletConfig = pqDescription.GetPQTabletConfig();
- SetupCounters(tabletConfig.GetYcCloudId(), tabletConfig.GetYdbDatabaseId(),
- tabletConfig.GetYcFolderId());
- } else {
- SetupCounters();
- }
-
- Y_VERIFY(!navigate->ResultSet.empty());
- ACL.Reset(new TAclWrapper(navigate->ResultSet.front().SecurityObject));
-
- if (Request->GetInternalToken().empty()) { // session without auth
- // We've already checked authentication flag in init request. Here we should finish it
- FirstACLCheck = false;
- DiscoverPartition(ctx);
- } else {
- Y_VERIFY(Request->GetYdbToken());
- Auth = *Request->GetYdbToken();
- Token = new NACLib::TUserToken(Request->GetInternalToken());
- CheckACL(ctx);
- }
-}
-
void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) {
if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { // ToDo[migration] - separate flag for having config tables
diff --git a/ydb/services/persqueue_v1/persqueue.cpp b/ydb/services/persqueue_v1/persqueue.cpp
index ccacc4d43a..047fff117d 100644
--- a/ydb/services/persqueue_v1/persqueue.cpp
+++ b/ydb/services/persqueue_v1/persqueue.cpp
@@ -30,7 +30,7 @@ void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::
if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) {
- IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, NewSchemeCache,Counters, PersQueueWriteSessionsMaxCount);
+ IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters, PersQueueWriteSessionsMaxCount);
TActorId actorId = ActorSystem->Register(writeSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId);
ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId);