diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-03-25 23:41:01 +0300 |
---|---|---|
committer | alexnick <alexnick@yandex-team.ru> | 2022-03-25 23:41:01 +0300 |
commit | d2fb88bc290f27a26b3ed7eb1932c5fb94cfd813 (patch) | |
tree | 6d87d1c8ab1dbce4c7606d2718019ff81e68055c | |
parent | 4d69101bb995dd4c7a9afc5ae460a35af6f51644 (diff) | |
download | ydb-d2fb88bc290f27a26b3ed7eb1932c5fb94cfd813.tar.gz |
refactor read sessions actor in pqv0
ref:1630ee7f08876e125edb62cd8a45bf3ac0130ab6
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.cpp | 8 | ||||
-rw-r--r-- | ydb/core/client/server/msgbus_server_pq_metacache.h | 7 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_actor.h | 3 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_read_actor.cpp | 9 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_write.cpp | 10 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_write.h | 5 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/grpc_pq_write_actor.cpp | 77 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/persqueue.cpp | 2 |
8 files changed, 33 insertions, 88 deletions
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index 45211485f5..6b752a60e8 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -216,7 +216,7 @@ private: void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics"); - SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ctx); + SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ev->Get()->SyncVersion, ctx); auto inserted = DescribeTopicsWaiters.insert(std::make_pair( RequestId, TWaiter{ev->Sender, std::move(ev->Get()->Topics)} @@ -252,7 +252,7 @@ private: } if (DescribeAllTopicsWaiters.empty()) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Make full list SC request"); - SendSchemeCacheRequest(CurrentTopics, true, true, ctx); + SendSchemeCacheRequest(CurrentTopics, true, true, false, ctx); } LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Store waiter"); DescribeAllTopicsWaiters.push(waiter); @@ -298,7 +298,7 @@ private: void SendSchemeCacheRequest( - const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, const TActorContext& ctx + const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, bool syncVersion, const TActorContext& ctx ) { auto instant = isFullListingRequest ? 0 : ++RequestId; auto schemeCacheRequest = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(instant); @@ -310,7 +310,7 @@ private: } entry.Path.insert(entry.Path.end(), split.begin(), split.end()); - entry.SyncVersion = !isFullListingRequest; + entry.SyncVersion = syncVersion; entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic; schemeCacheRequest->ResultSet.emplace_back(std::move(entry)); } diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h index 5c1d6d6471..f1d8248ef3 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.h +++ b/ydb/core/client/server/msgbus_server_pq_metacache.h @@ -65,16 +65,19 @@ struct TEvPqNewMetaCache { struct TEvDescribeTopicsRequest : public TEventLocal<TEvDescribeTopicsRequest, EvDescribeTopicsRequest> { TString PathPrefix; TVector<TString> Topics; + bool SyncVersion; TEvDescribeTopicsRequest() = default; - explicit TEvDescribeTopicsRequest(const TVector<TString>& topics) + explicit TEvDescribeTopicsRequest(const TVector<TString>& topics, bool syncVersion = true) : Topics(topics) + , SyncVersion(syncVersion) {} - TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix) + TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix, bool syncVersion = true) : PathPrefix(pathPrefix) , Topics(topics) + , SyncVersion(syncVersion) {} }; diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h index afc4d8f69e..7c11732d8e 100644 --- a/ydb/services/persqueue_v1/grpc_pq_actor.h +++ b/ydb/services/persqueue_v1/grpc_pq_actor.h @@ -457,7 +457,7 @@ using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache public: TWriteSessionActor(NKikimr::NGRpcService::TEvStreamPQWriteRequest* request, const ui64 cookie, - const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache, + const NActors::TActorId& schemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC, const NPersQueue::TTopicsListController& topicsController); ~TWriteSessionActor(); @@ -573,7 +573,6 @@ private: EState State; TActorId SchemeCache; - TActorId NewSchemeCache; TActorId Writer; TString PeerName; diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index 9f50b6511b..d7209ef46b 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -288,9 +288,6 @@ void TReadSessionActor::Bootstrap(const TActorContext& ctx) { Die(ctx); return; } - //auto& pqConfig = AppData(ctx)->PQConfig; - //bool noDcMode = !pqConfig.GetTopicsAreFirstClassCitizen(); // ToDo[migration] - add multicluster mode - //ConverterFactory = MakeHolder<NPersQueue::TTopicNamesConverterFactory>(noDcMode, pqConfig.GetRoot()); StartTime = ctx.Now(); Become(&TThis::StateFunc); @@ -738,6 +735,10 @@ void TReadSessionActor::Handle(TEvPQProxy::TEvReadInit::TPtr& ev, const TActorCo auto converter = TopicsHandler.GetConverterFactory()->MakeTopicNameConverter( topic.topic(), TString(), Request->GetDatabaseName().GetOrElse(TString()) ); + if (!converter->IsValid()) { + CloseSession(TStringBuilder() << "invalid topic '" << topic.topic() << "' in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx); + return; + } const auto topicName = converter->GetModernName(); if (topicName.empty()) { CloseSession("empty topic in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx); @@ -2646,7 +2647,7 @@ void TReadInitAndAuthActor::Bootstrap(const TActorContext &ctx) { topicNames.emplace_back(topic.second.TopicNameConverter->GetPrimaryPath()); } DoCheckACL = AppData(ctx)->PQConfig.GetCheckACL() && Token; - ctx.Send(MetaCacheId, new TEvDescribeTopicsRequest(topicNames)); + ctx.Send(MetaCacheId, new TEvDescribeTopicsRequest(topicNames, true)); } void TReadInitAndAuthActor::Die(const TActorContext& ctx) { diff --git a/ydb/services/persqueue_v1/grpc_pq_write.cpp b/ydb/services/persqueue_v1/grpc_pq_write.cpp index 5f17a2f3ab..d49c579077 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write.cpp @@ -18,17 +18,16 @@ using namespace PersQueue::V1; /////////////////////////////////////////////////////////////////////////////// -IActor* CreatePQWriteService(const TActorId& schemeCache, const TActorId& newSchemeCache, +IActor* CreatePQWriteService(const TActorId& schemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const ui32 maxSessions) { - return new TPQWriteService(schemeCache, newSchemeCache, counters, maxSessions); + return new TPQWriteService(schemeCache, counters, maxSessions); } -TPQWriteService::TPQWriteService(const TActorId& schemeCache, const TActorId& newSchemeCache, +TPQWriteService::TPQWriteService(const TActorId& schemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const ui32 maxSessions) : SchemeCache(schemeCache) - , NewSchemeCache(newSchemeCache) , Counters(counters) , MaxSessions(maxSessions) , Enabled(false) @@ -176,7 +175,6 @@ void TPQWriteService::Handle(NKikimr::NGRpcService::TEvStreamPQWriteRequest::TPt return; } - TString localCluster = AvailableLocalCluster(ctx); if (HaveClusters && localCluster.empty()) { ev->Get()->GetStreamCtx()->Attach(ctx.SelfID); @@ -198,7 +196,7 @@ void TPQWriteService::Handle(NKikimr::NGRpcService::TEvStreamPQWriteRequest::TPt auto ip = ev->Get()->GetStreamCtx()->GetPeerName(); TActorId worker = ctx.Register(new TWriteSessionActor( - ev->Release().Release(), cookie, SchemeCache, NewSchemeCache, Counters, + ev->Release().Release(), cookie, SchemeCache, Counters, DatacenterClassifier ? DatacenterClassifier->ClassifyAddress(NAddressClassifier::ExtractAddress(ip)) : "unknown", *TopicsHandler )); diff --git a/ydb/services/persqueue_v1/grpc_pq_write.h b/ydb/services/persqueue_v1/grpc_pq_write.h index 73148388b2..5f29df94ba 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write.h +++ b/ydb/services/persqueue_v1/grpc_pq_write.h @@ -21,12 +21,12 @@ inline TActorId GetPQWriteServiceActorID() { return TActorId(0, "PQWriteSvc"); } -IActor* CreatePQWriteService(const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache, +IActor* CreatePQWriteService(const NActors::TActorId& schemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const ui32 maxSessions); class TPQWriteService : public NActors::TActorBootstrapped<TPQWriteService> { public: - TPQWriteService(const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache, + TPQWriteService(const NActors::TActorId& schemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const ui32 maxSessions); ~TPQWriteService() @@ -60,7 +60,6 @@ private: void Handle(TEvPQProxy::TEvSessionDead::TPtr& ev, const TActorContext& ctx); NActors::TActorId SchemeCache; - NActors::TActorId NewSchemeCache; TAtomic LastCookie = 0; diff --git a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp index 6531afc7d3..c40ced868e 100644 --- a/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_write_actor.cpp @@ -89,14 +89,13 @@ static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1); TWriteSessionActor::TWriteSessionActor( NKikimr::NGRpcService::TEvStreamPQWriteRequest* request, const ui64 cookie, - const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache, + const NActors::TActorId& schemeCache, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC, const NPersQueue::TTopicsListController& topicsController ) : Request(request) , State(ES_CREATED) , SchemeCache(schemeCache) - , NewSchemeCache(newSchemeCache) , PeerName("") , Cookie(cookie) , TopicsController(topicsController) @@ -112,7 +111,7 @@ TWriteSessionActor::TWriteSessionActor( , Token(nullptr) , UpdateTokenInProgress(false) , UpdateTokenAuthenticated(false) - , ACLCheckInProgress(false) + , ACLCheckInProgress(true) , FirstACLCheck(true) , RequestNotChecked(false) , LastACLCheckTimestamp(TInstant::Zero()) @@ -341,6 +340,10 @@ void TWriteSessionActor::Handle(TEvPQProxy::TEvWriteInit::TPtr& ev, const TActor void TWriteSessionActor::SetupCounters() { + if (SessionsCreated) { + return; + } + //now topic is checked, can create group for real topic, not garbage auto subGroup = GetServiceCounters(Counters, "pqproxy|writeSession"); TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabels(TopicConverter->GetClientsideName()); @@ -357,6 +360,10 @@ void TWriteSessionActor::SetupCounters() void TWriteSessionActor::SetupCounters(const TString& cloudId, const TString& dbId, const TString& folderId) { + if (SessionsCreated) { + return; + } + //now topic is checked, can create group for real topic, not garbage auto subGroup = NKikimr::NPQ::GetCountersForStream(Counters); TVector<NPQ::TLabelsInfo> aggr = NKikimr::NPQ::GetLabelsForStream(TopicConverter->GetClientsideName(), cloudId, dbId, folderId); @@ -415,7 +422,7 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo SetupCounters(); } - Y_VERIFY (entry.SecurityObject); + Y_VERIFY(entry.SecurityObject); ACL.Reset(new TAclWrapper(entry.SecurityObject)); LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " describe result for acl check"); @@ -437,68 +444,6 @@ void TWriteSessionActor::Handle(TEvDescribeTopicsResponse::TPtr& ev, const TActo } } -void TWriteSessionActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { - TEvTxProxySchemeCache::TEvNavigateKeySetResult* msg = ev->Get(); - const NSchemeCache::TSchemeCacheNavigate* navigate = msg->Request.Get(); - Y_VERIFY(navigate->ResultSet.size() == 1); - if (navigate->ErrorCount > 0) { - const NSchemeCache::TSchemeCacheNavigate::EStatus status = navigate->ResultSet.front().Status; - return CloseSession( - TStringBuilder() << "Failed to read ACL for '" << TopicConverter->GetClientsideName() - << "' Scheme cache error : " << status, - PersQueue::ErrorCode::ERROR, ctx - ); - } - if (!navigate->ResultSet.front().PQGroupInfo) { - return CloseSession( - TStringBuilder() << "topic '" << TopicConverter->GetClientsideName() << "' describe error" - << ", reason: could not retrieve topic description", - PersQueue::ErrorCode::ERROR, ctx - ); - } - - const auto& pqDescription = navigate->ResultSet.front().PQGroupInfo->Description; - - Y_VERIFY(pqDescription.PartitionsSize() > 0); - Y_VERIFY(pqDescription.HasPQTabletConfig()); - InitialPQTabletConfig = pqDescription.GetPQTabletConfig(); - - if (!pqDescription.HasBalancerTabletID()) { - TString errorReason = Sprintf("topic '%s' has no balancer, Marker# PQ93", TopicConverter->GetClientsideName().c_str()); - CloseSession(errorReason, PersQueue::ErrorCode::UNKNOWN_TOPIC, ctx); - return; - } - - BalancerTabletId = pqDescription.GetBalancerTabletID(); - - for (ui32 i = 0; i < pqDescription.PartitionsSize(); ++i) { - const auto& pi = pqDescription.GetPartitions(i); - PartitionToTablet[pi.GetPartitionId()] = pi.GetTabletId(); - } - - if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { - const auto& tabletConfig = pqDescription.GetPQTabletConfig(); - SetupCounters(tabletConfig.GetYcCloudId(), tabletConfig.GetYdbDatabaseId(), - tabletConfig.GetYcFolderId()); - } else { - SetupCounters(); - } - - Y_VERIFY(!navigate->ResultSet.empty()); - ACL.Reset(new TAclWrapper(navigate->ResultSet.front().SecurityObject)); - - if (Request->GetInternalToken().empty()) { // session without auth - // We've already checked authentication flag in init request. Here we should finish it - FirstACLCheck = false; - DiscoverPartition(ctx); - } else { - Y_VERIFY(Request->GetYdbToken()); - Auth = *Request->GetYdbToken(); - Token = new NACLib::TUserToken(Request->GetInternalToken()); - CheckACL(ctx); - } -} - void TWriteSessionActor::DiscoverPartition(const NActors::TActorContext& ctx) { if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { // ToDo[migration] - separate flag for having config tables diff --git a/ydb/services/persqueue_v1/persqueue.cpp b/ydb/services/persqueue_v1/persqueue.cpp index ccacc4d43a..047fff117d 100644 --- a/ydb/services/persqueue_v1/persqueue.cpp +++ b/ydb/services/persqueue_v1/persqueue.cpp @@ -30,7 +30,7 @@ void TGRpcPersQueueService::InitService(grpc::ServerCompletionQueue *cq, NGrpc:: if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) { - IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, NewSchemeCache,Counters, PersQueueWriteSessionsMaxCount); + IActor* writeSvc = NGRpcProxy::V1::CreatePQWriteService(SchemeCache, Counters, PersQueueWriteSessionsMaxCount); TActorId actorId = ActorSystem->Register(writeSvc, TMailboxType::HTSwap, ActorSystem->AppData<TAppData>()->UserPoolId); ActorSystem->RegisterLocalService(NGRpcProxy::V1::GetPQWriteServiceActorID(), actorId); |