diff options
author | ilnaz <[email protected]> | 2022-09-14 13:00:41 +0300 |
---|---|---|
committer | ilnaz <[email protected]> | 2022-09-14 13:00:41 +0300 |
commit | d6e8190159c9b88668658afdd7c796133f8f69ba (patch) | |
tree | 818f561cfa415c0938253d559e342e2fd8748d8c | |
parent | a1b046d8ad132a2052989ac30585ab2c5a0e7f6f (diff) |
Read session refactoring
-rw-r--r-- | ydb/services/lib/actors/type_definitions.h | 80 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.h | 313 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 1580 |
3 files changed, 965 insertions, 1008 deletions
diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h index 89e0168a71e..fbd7622b852 100644 --- a/ydb/services/lib/actors/type_definitions.h +++ b/ydb/services/lib/actors/type_definitions.h @@ -1,43 +1,53 @@ #pragma once #include <ydb/library/persqueue/topic_parser/topic_parser.h> + #include <library/cpp/actors/core/actor.h> -#include <library/cpp/actors/core/event_local.h> +#include <util/generic/hash.h> +#include <util/generic/map.h> +#include <util/generic/maybe.h> +#include <util/generic/vector.h> namespace NKikimr::NGRpcProxy { - struct TTopicHolder { - ui64 TabletID; - TActorId PipeClient; - bool ACLRequestInfly; - TString CloudId; - TString DbId; - TString FolderId; - NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; - NPersQueue::TDiscoveryConverterPtr DiscoveryConverter; - NPersQueue::TTopicConverterPtr FullConverter; - TMaybe<TString> CdcStreamPath; - - TVector<ui32> Groups; - TMap<ui64, ui64> Partitions; - - TTopicHolder() - : TabletID(0) - , PipeClient() - , ACLRequestInfly(false) - {} - }; - - struct TTopicInitInfo { - NPersQueue::TTopicConverterPtr TopicNameConverter; - ui64 TabletID; - TString CloudId; - TString DbId; - TString FolderId; - NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; - }; - - using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>; - -} // namespace NKikimr::NGRpcProxy +struct TTopicInitInfo { + NPersQueue::TTopicConverterPtr TopicNameConverter; + ui64 TabletID; + TString CloudId; + TString DbId; + TString FolderId; + NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; +}; + +using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>; + +struct TTopicHolder { + ui64 TabletID = 0; + TActorId PipeClient; + bool ACLRequestInfly = false; + TString CloudId; + TString DbId; + TString FolderId; + NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; + NPersQueue::TDiscoveryConverterPtr DiscoveryConverter; + NPersQueue::TTopicConverterPtr FullConverter; + TMaybe<TString> CdcStreamPath; + + TVector<ui32> Groups; + TMap<ui64, ui64> Partitions; + + inline static TTopicHolder FromTopicInfo(const TTopicInitInfo& info) { + return TTopicHolder{ + .TabletID = info.TabletID, + .ACLRequestInfly = false, + .CloudId = info.CloudId, + .DbId = info.DbId, + .FolderId = info.FolderId, + .MeteringMode = info.MeteringMode, + .FullConverter = info.TopicNameConverter, + }; + } +}; + +} // namespace NKikimr::NGRpcProxy diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h index d9e048394de..857a83d6c7e 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.h +++ b/ydb/services/persqueue_v1/actors/read_session_actor.h @@ -4,20 +4,19 @@ #include "partition_actor.h" #include "persqueue_utils.h" -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> - #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/persqueue/events/global.h> #include <ydb/services/lib/actors/pq_rl_helpers.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> + #include <util/generic/guid.h> #include <util/system/compiler.h> #include <type_traits> - namespace NKikimr::NGRpcProxy::V1 { inline TActorId GetPQReadServiceActorID() { @@ -25,8 +24,9 @@ inline TActorId GetPQReadServiceActorID() { } struct TPartitionActorInfo { - TActorId Actor; + const TActorId Actor; const TPartitionId Partition; + NPersQueue::TTopicConverterPtr Topic; std::deque<ui64> Commits; bool Reading; bool Releasing; @@ -38,17 +38,18 @@ struct TPartitionActorInfo { ui64 ReadIdCommitted; TSet<ui64> NextCommits; TDisjointIntervalTree<ui64> NextRanges; - ui64 Offset; TInstant AssignTimestamp; - NPersQueue::TTopicConverterPtr Topic; - - TPartitionActorInfo(const TActorId& actor, const TPartitionId& partition, - const NPersQueue::TTopicConverterPtr& topic, const TActorContext& ctx) + explicit TPartitionActorInfo( + const TActorId& actor, + const TPartitionId& partition, + const NPersQueue::TTopicConverterPtr& topic, + const TInstant& timestamp) : Actor(actor) , Partition(partition) + , Topic(topic) , Reading(false) , Releasing(false) , Released(false) @@ -57,11 +58,9 @@ struct TPartitionActorInfo { , ReadIdToResponse(1) , ReadIdCommitted(0) , Offset(0) - , AssignTimestamp(ctx.Now()) - , Topic(topic) - { } - - void MakeCommit(const TActorContext& ctx); + , AssignTimestamp(timestamp) + { + } }; struct TPartitionInfo { @@ -69,6 +68,15 @@ struct TPartitionInfo { ui64 WTime; ui64 SizeLag; ui64 MsgLag; + + explicit TPartitionInfo(ui64 assignId, ui64 wTime, ui64 sizeLag, ui64 msgLag) + : AssignId(assignId) + , WTime(wTime) + , SizeLag(sizeLag) + , MsgLag(msgLag) + { + } + bool operator < (const TPartitionInfo& rhs) const { return std::tie(WTime, AssignId) < std::tie(rhs.WTime, rhs.AssignId); } @@ -94,15 +102,15 @@ struct TFormedReadResponse: public TSimpleRefCount<TFormedReadResponse<TServerMe i64 ByteSizeBeforeFiltering = 0; ui64 RequiredQuota = 0; - //returns byteSize diff + // returns byteSize diff i64 ApplyResponse(TServerMessage&& resp); THashSet<TActorId> PartitionsTookPartInRead; TSet<TPartitionId> PartitionsTookPartInControlMessages; - TSet<TPartitionInfo> PartitionsBecameAvailable; // Partitions that became available during this read request execution. - - // These partitions are bringed back to AvailablePartitions after reply to this read request. + // Partitions that became available during this read request execution. + // These partitions are bringed back to AvailablePartitions after reply to this read request. + TSet<TPartitionInfo> PartitionsBecameAvailable; const TString Guid; TInstant Start; @@ -110,23 +118,37 @@ struct TFormedReadResponse: public TSimpleRefCount<TFormedReadResponse<TServerMe TDuration WaitQuotaTime; }; - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> // Migration protocol is "pqv1" class TReadSessionActor : public TActorBootstrapped<TReadSessionActor<UseMigrationProtocol>> , private TRlHelpers { - using TClientMessage = typename std::conditional_t<UseMigrationProtocol, PersQueue::V1::MigrationStreamingReadClientMessage, Topic::StreamReadMessage::FromClient>; - using TServerMessage = typename std::conditional_t<UseMigrationProtocol, PersQueue::V1::MigrationStreamingReadServerMessage, Topic::StreamReadMessage::FromServer>; + using TClientMessage = typename std::conditional_t<UseMigrationProtocol, + PersQueue::V1::MigrationStreamingReadClientMessage, + Topic::StreamReadMessage::FromClient>; + + using TServerMessage = typename std::conditional_t<UseMigrationProtocol, + PersQueue::V1::MigrationStreamingReadServerMessage, + Topic::StreamReadMessage::FromServer>; + + using TEvReadInit = typename std::conditional_t<UseMigrationProtocol, + TEvPQProxy::TEvMigrationReadInit, + TEvPQProxy::TEvReadInit>; + + using TEvReadResponse = typename std::conditional_t<UseMigrationProtocol, + TEvPQProxy::TEvMigrationReadResponse, + TEvPQProxy::TEvReadResponse>; + + using TEvStreamReadRequest = typename std::conditional_t<UseMigrationProtocol, + NGRpcService::TEvStreamPQMigrationReadRequest, + NGRpcService::TEvStreamTopicReadRequest>; using IContext = NGRpcServer::IGRpcStreamingContext<TClientMessage, TServerMessage>; - using TEvReadInit = typename std::conditional_t<UseMigrationProtocol, TEvPQProxy::TEvMigrationReadInit, TEvPQProxy::TEvReadInit>; - using TEvReadResponse = typename std::conditional_t<UseMigrationProtocol, TEvPQProxy::TEvMigrationReadResponse, TEvPQProxy::TEvReadResponse>; - using TEvStreamPQReadRequest = typename std::conditional_t<UseMigrationProtocol, NKikimr::NGRpcService::TEvStreamPQMigrationReadRequest, NKikimr::NGRpcService::TEvStreamTopicReadRequest>; + using TPartitionsMap = THashMap<ui64, TPartitionActorInfo>; private: - //11 tries = 10,23 seconds, then each try for 5 seconds , so 21 retries will take near 1 min + // 11 tries = 10,23 seconds, then each try for 5 seconds , so 21 retries will take near 1 min static constexpr NTabletPipe::TClientRetryPolicy RetryPolicyForPipes = { .RetryLimitCount = 21, .MinRetryTime = TDuration::MilliSeconds(10), @@ -138,145 +160,143 @@ private: static constexpr ui64 MAX_INFLY_BYTES = 25_MB; static constexpr ui32 MAX_INFLY_READS = 10; - static constexpr ui64 MAX_READ_SIZE = 100 << 20; //100mb; + static constexpr ui64 MAX_READ_SIZE = 100_MB; static constexpr ui64 READ_BLOCK_SIZE = 8_KB; // metering - static constexpr double LAG_GROW_MULTIPLIER = 1.2; //assume that 20% more data arrived to partitions + static constexpr double LAG_GROW_MULTIPLIER = 1.2; // assume that 20% more data arrived to partitions public: - TReadSessionActor(TEvStreamPQReadRequest* request, const ui64 cookie, - const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC, - const NPersQueue::TTopicsListController& topicsHandler); - ~TReadSessionActor(); - - void Bootstrap(const NActors::TActorContext& ctx); + TReadSessionActor(TEvStreamReadRequest* request, const ui64 cookie, + const TActorId& schemeCache, const TActorId& newSchemeCache, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TMaybe<TString> clientDC, + const NPersQueue::TTopicsListController& topicsHandler); - void Die(const NActors::TActorContext& ctx) override; + void Bootstrap(const TActorContext& ctx); - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::FRONT_PQ_READ; } + void Die(const TActorContext& ctx) override; + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::FRONT_PQ_READ; + } private: STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { - HFunc(TEvents::TEvWakeup, Handle); - + // grpc events HFunc(IContext::TEvReadFinished, Handle); HFunc(IContext::TEvWriteFinished, Handle); - CFunc(IContext::TEvNotifiedWhenDone::EventType, HandleDone); + HFunc(IContext::TEvNotifiedWhenDone, Handle) HFunc(NGRpcService::TGRpcRequestProxy::TEvRefreshTokenResponse, Handle); + // proxy events HFunc(TEvPQProxy::TEvAuthResultOk, Handle); // form auth actor - - HFunc(TEvPQProxy::TEvDieCommand, HandlePoison) - - HFunc(/* type alias */ TEvReadInit, Handle) //from gRPC - HFunc(TEvPQProxy::TEvReadSessionStatus, Handle) // from read sessions info builder proxy - HFunc(TEvPQProxy::TEvRead, Handle) //from gRPC - HFunc(TEvPQProxy::TEvDone, Handle) //from gRPC - HFunc(TEvPQProxy::TEvCloseSession, Handle) //from partitionActor - HFunc(TEvPQProxy::TEvPartitionReady, Handle) //from partitionActor - HFunc(TEvPQProxy::TEvPartitionReleased, Handle) //from partitionActor - - HFunc(/* type alias */ TEvReadResponse, Handle) //from partitionActor - HFunc(TEvPQProxy::TEvCommitCookie, Handle) //from gRPC - HFunc(TEvPQProxy::TEvCommitRange, Handle) //from gRPC - HFunc(TEvPQProxy::TEvStartRead, Handle) //from gRPC - HFunc(TEvPQProxy::TEvReleased, Handle) //from gRPC - HFunc(TEvPQProxy::TEvGetStatus, Handle) //from gRPC - HFunc(TEvPQProxy::TEvAuth, Handle) //from gRPC - - HFunc(TEvPQProxy::TEvCommitDone, Handle) //from PartitionActor - HFunc(TEvPQProxy::TEvPartitionStatus, Handle) //from partitionActor - - HFunc(TEvPersQueue::TEvLockPartition, Handle) //from Balancer - HFunc(TEvPersQueue::TEvReleasePartition, Handle) //from Balancer - HFunc(TEvPersQueue::TEvError, Handle) //from Balancer - - HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + HFunc(/* type alias */ TEvReadInit, Handle); // from gRPC + HFunc(TEvPQProxy::TEvReadSessionStatus, Handle); // from read sessions info builder proxy + HFunc(TEvPQProxy::TEvRead, Handle); // from gRPC + HFunc(/* type alias */ TEvReadResponse, Handle); // from partitionActor + HFunc(TEvPQProxy::TEvDone, Handle); // from gRPC + HFunc(TEvPQProxy::TEvCloseSession, Handle); // from partitionActor + HFunc(TEvPQProxy::TEvDieCommand, Handle); + HFunc(TEvPQProxy::TEvPartitionReady, Handle); // from partitionActor + HFunc(TEvPQProxy::TEvPartitionReleased, Handle); // from partitionActor + HFunc(TEvPQProxy::TEvCommitCookie, Handle); // from gRPC + HFunc(TEvPQProxy::TEvCommitRange, Handle); // from gRPC + HFunc(TEvPQProxy::TEvStartRead, Handle); // from gRPC + HFunc(TEvPQProxy::TEvReleased, Handle); // from gRPC + HFunc(TEvPQProxy::TEvGetStatus, Handle); // from gRPC + HFunc(TEvPQProxy::TEvAuth, Handle); // from gRPC + HFunc(TEvPQProxy::TEvCommitDone, Handle); // from PartitionActor + HFunc(TEvPQProxy::TEvPartitionStatus, Handle); // from partitionActor + + // Balancer events + HFunc(TEvPersQueue::TEvLockPartition, Handle); + HFunc(TEvPersQueue::TEvReleasePartition, Handle); + HFunc(TEvPersQueue::TEvError, Handle); + + // pipe events HFunc(TEvTabletPipe::TEvClientConnected, Handle); + HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + + // system events + HFunc(TEvents::TEvWakeup, Handle); default: break; - }; + } } - ui64 PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse); // returns estimated response's size - bool WriteResponse(TServerMessage&& response, bool finish = false); + bool ReadFromStreamOrDie(const TActorContext& ctx); + bool WriteToStreamOrDie(const TActorContext& ctx, TServerMessage&& response, bool finish = false); + bool SendControlMessage(TPartitionId id, TServerMessage&& message, const TActorContext& ctx); + // grpc events void Handle(typename IContext::TEvReadFinished::TPtr& ev, const TActorContext &ctx); void Handle(typename IContext::TEvWriteFinished::TPtr& ev, const TActorContext &ctx); - void HandleDone(const TActorContext &ctx); - + void Handle(typename IContext::TEvNotifiedWhenDone::TPtr& ev, const TActorContext &ctx); void Handle(NGRpcService::TGRpcRequestProxy::TEvRefreshTokenResponse::TPtr& ev, const TActorContext &ctx); - - void Handle(typename TEvReadInit::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvReadSessionStatus::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvRead::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(typename TEvReadResponse::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvDone::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvPartitionReady::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvPartitionReleased::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvCommitCookie::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvCommitRange::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvStartRead::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvReleased::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPQProxy::TEvAuth::TPtr& ev, const NActors::TActorContext& ctx); - void ProcessAuth(const TString& auth, const TActorContext& ctx); - void Handle(TEvPQProxy::TEvCommitDone::TPtr& ev, const NActors::TActorContext& ctx); - - void Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const NActors::TActorContext& ctx); - - void Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvPersQueue::TEvError::TPtr& ev, const NActors::TActorContext& ctx); - - void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx); - void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx); - [[nodiscard]] bool ProcessBalancerDead(const ui64 tabletId, const NActors::TActorContext& ctx); // returns false if actor died - - void HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const NActors::TActorContext& ctx); + // proxy events + void Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx); + void Handle(typename TEvReadInit::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvReadSessionStatus::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext& ctx); + void Handle(typename TEvReadResponse::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvDone::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvDieCommand::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvPartitionReady::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvPartitionReleased::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvCommitCookie::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvCommitRange::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvStartRead::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvReleased::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvAuth::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvCommitDone::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx); + + // Balancer events + void Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvError::TPtr& ev, const TActorContext& ctx); + + // pipe events + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx); + + // system events void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); - void Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const NActors::TActorContext& ctx); - void RecheckACL(const TActorContext& ctx); + TActorId CreatePipeClient(ui64 tabletId, const TActorContext& ctx); + void ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx); + + void RunAuthActor(const TActorContext& ctx); + void RecheckACL(const TActorContext& ctx); void InitSession(const TActorContext& ctx); - void CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, - const NActors::TActorContext& ctx); + void RegisterSession(const TString& topic, const TActorId& pipe, const TVector<ui32>& groups, const TActorContext& ctx); + void CloseSession(PersQueue::ErrorCode::ErrorCode code, const TString& reason, const TActorContext& ctx); void SetupCounters(); void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic); - void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, const TString& cloudId, const TString& dbId, - const TString& folderId); - - void ProcessReads(const NActors::TActorContext& ctx); // returns false if actor died - void ProcessAnswer(const NActors::TActorContext& ctx, typename TFormedReadResponse<TServerMessage>::TPtr formedResponse); // returns false if actor died - - void RegisterSessions(const NActors::TActorContext& ctx); - void RegisterSession(const TActorId& pipe, const TString& topic, const TVector<ui32>& groups, const TActorContext& ctx); - - void DropPartition(typename THashMap<ui64, TPartitionActorInfo>::iterator it, const TActorContext& ctx); + void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, + const TString& cloudId, const TString& dbId, const TString& folderId); - bool ActualPartitionActor(const TActorId& part); - void ReleasePartition(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it, - bool couldBeReads, const TActorContext& ctx); // returns false if actor died + void ProcessReads(const TActorContext& ctx); + ui64 PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse); + void ProcessAnswer(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse, const TActorContext& ctx); - void SendReleaseSignalToClient(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it, bool kill, const TActorContext& ctx); - - void InformBalancerAboutRelease(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it, const TActorContext& ctx); + void DropPartition(typename TPartitionsMap::iterator it, const TActorContext& ctx); + void ReleasePartition(typename TPartitionsMap::iterator it, bool couldBeReads, const TActorContext& ctx); + void SendReleaseSignal(typename TPartitionsMap::iterator it, bool kill, const TActorContext& ctx); + void InformBalancerAboutRelease(typename TPartitionsMap::iterator it, const TActorContext& ctx); static ui32 NormalizeMaxReadMessagesCount(ui32 sourceValue); static ui32 NormalizeMaxReadSize(ui32 sourceValue); private: - std::unique_ptr</* type alias */ TEvStreamPQReadRequest> Request; - + std::unique_ptr</* type alias */ TEvStreamReadRequest> Request; const TString ClientDC; - const TInstant StartTimestamp; TActorId SchemeCache; @@ -293,7 +313,7 @@ private: bool CommitsDisabled; bool InitDone; - bool RangesMode = false; + bool RangesMode; ui32 MaxReadMessagesCount; ui32 MaxReadSize; @@ -310,7 +330,7 @@ private: THashSet<TActorId> ActualPartitionActors; THashMap<ui64, std::pair<ui32, ui64>> BalancerGeneration; ui64 NextAssignId; - THashMap<ui64, TPartitionActorInfo> Partitions; //assignId -> info + TPartitionsMap Partitions; // assignId -> info THashMap<TString, TTopicHolder> Topics; // topic -> info THashMap<TString, NPersQueue::TTopicConverterPtr> FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching @@ -324,10 +344,15 @@ private: TSet<TPartitionInfo> AvailablePartitions; - THashMap<TActorId, typename TFormedReadResponse<TServerMessage>::TPtr> PartitionToReadResponse; // Partition actor -> TFormedReadResponse answer that has this partition. - // PartitionsTookPartInRead in formed read response contain this actor id. - typename TFormedReadResponse<TServerMessage>::TPtr PendingQuota; // response that currenly pending quota - std::deque<typename TFormedReadResponse<TServerMessage>::TPtr> WaitingQuota; // responses that will be quoted next + // Partition actor -> TFormedReadResponse answer that has this partition. + // PartitionsTookPartInRead in formed read response contain this actor id. + THashMap<TActorId, typename TFormedReadResponse<TServerMessage>::TPtr> PartitionToReadResponse; + + // Response that currenly pending quota + typename TFormedReadResponse<TServerMessage>::TPtr PendingQuota; + + // Responses that will be quoted next + std::deque<typename TFormedReadResponse<TServerMessage>::TPtr> WaitingQuota; struct TControlMessages { TVector<TServerMessage> ControlMessages; @@ -336,7 +361,6 @@ private: TMap<TPartitionId, TControlMessages> PartitionToControlMessages; - std::deque<THolder<TEvPQProxy::TEvRead>> Reads; ui64 Cookie; @@ -346,7 +370,7 @@ private: ui32 Partitions; }; - TMap<ui64, TCommitInfo> Commits; //readid->TCommitInfo + TMap<ui64, TCommitInfo> Commits; // readid -> TCommitInfo TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; @@ -361,22 +385,22 @@ private: ui32 ReadsInfly; std::queue<ui64> ActiveWrites; - NKikimr::NPQ::TPercentileCounter PartsPerSession; + NPQ::TPercentileCounter PartsPerSession; THashMap<TString, TTopicCounters> TopicCounters; THashMap<TString, ui32> NumPartitionsFromTopic; TVector<NPersQueue::TPQLabelsInfo> Aggr; - NKikimr::NPQ::TMultiCounter SLITotal; - NKikimr::NPQ::TMultiCounter SLIErrors; + NPQ::TMultiCounter SLITotal; + NPQ::TMultiCounter SLIErrors; TInstant StartTime; - NKikimr::NPQ::TPercentileCounter InitLatency; - NKikimr::NPQ::TPercentileCounter ReadLatency; - NKikimr::NPQ::TPercentileCounter ReadLatencyFromDisk; - NKikimr::NPQ::TPercentileCounter CommitLatency; - NKikimr::NPQ::TMultiCounter SLIBigLatency; - NKikimr::NPQ::TMultiCounter SLIBigReadLatency; - NKikimr::NPQ::TMultiCounter ReadsTotal; + NPQ::TPercentileCounter InitLatency; + NPQ::TPercentileCounter ReadLatency; + NPQ::TPercentileCounter ReadLatencyFromDisk; + NPQ::TPercentileCounter CommitLatency; + NPQ::TMultiCounter SLIBigLatency; + NPQ::TMultiCounter SLIBigReadLatency; + NPQ::TMultiCounter ReadsTotal; NPersQueue::TTopicsListController TopicsHandler; NPersQueue::TTopicsToConverter TopicsList; @@ -384,8 +408,7 @@ private: } -///////////////////////////////////////// // Implementation #define READ_SESSION_ACTOR_IMPL -#include "read_session_actor.ipp" + #include "read_session_actor.ipp" #undef READ_SESSION_ACTOR_IMPL diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 4a5da33c3a3..5b114cf8829 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -1,9 +1,9 @@ #ifndef READ_SESSION_ACTOR_IMPL -#error "Do not include this file directly" + #error "Do not include this file directly" #endif -#include "read_init_auth_actor.h" #include "helpers.h" +#include "read_init_auth_actor.h" #include <ydb/library/persqueue/topic_parser/counters.h> @@ -13,41 +13,33 @@ #include <util/string/join.h> #include <util/string/strip.h> -#include <util/charset/utf8.h> #include <utility> -using namespace NActors; -using namespace NKikimrClient; - -namespace NKikimr { +namespace NKikimr::NGRpcProxy::V1 { +using namespace NKikimrClient; using namespace NMsgBusProxy; - -namespace NGRpcProxy::V1 { - using namespace PersQueue::V1; -//TODO: add here tracking of bytes in/out +// TODO: add here tracking of bytes in/out template <bool UseMigrationProtocol> -TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(TEvStreamPQReadRequest* request, const ui64 cookie, - const TActorId& schemeCache, const TActorId& newSchemeCache, - TIntrusivePtr<NMonitoring::TDynamicCounters> counters, - const TMaybe<TString> clientDC, - const NPersQueue::TTopicsListController& topicsHandler) +TReadSessionActor<UseMigrationProtocol>::TReadSessionActor( + TEvStreamReadRequest* request, const ui64 cookie, + const TActorId& schemeCache, const TActorId& newSchemeCache, + TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + const TMaybe<TString> clientDC, + const NPersQueue::TTopicsListController& topicsHandler) : TRlHelpers(request, READ_BLOCK_SIZE, TDuration::Minutes(1)) , Request(request) - , ClientDC(clientDC ? *clientDC : "other") + , ClientDC(clientDC.GetOrElse("other")) , StartTimestamp(TInstant::Now()) , SchemeCache(schemeCache) , NewSchemeCache(newSchemeCache) - , AuthInitActor() - , ClientId() - , ClientPath() - , Session() , CommitsDisabled(false) , InitDone(false) + , RangesMode(false) , MaxReadMessagesCount(0) , MaxReadSize(0) , MaxTimeLagMs(0) @@ -63,57 +55,60 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(TEvStreamPQReadReques , BytesInflight_(0) , RequestedBytes(0) , ReadsInfly(0) - , TopicsHandler(topicsHandler) { + , TopicsHandler(topicsHandler) +{ Y_ASSERT(Request); } -template<bool UseMigrationProtocol> -TReadSessionActor<UseMigrationProtocol>::~TReadSessionActor() = default; - - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Bootstrap(const TActorContext& ctx) { - Y_VERIFY(Request); if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { ++(*GetServiceCounters(Counters, "pqproxy|readSession") ->GetNamedCounter("sensor", "SessionsCreatedTotal", true)); } Request->GetStreamCtx()->Attach(ctx.SelfID); - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); + if (!ReadFromStreamOrDie(ctx)) { return; } - StartTime = ctx.Now(); - TReadSessionActor<UseMigrationProtocol>::Become(&TReadSessionActor<UseMigrationProtocol>::TThis::StateFunc); + StartTime = ctx.Now(); + this->Become(&TReadSessionActor<UseMigrationProtocol>::TThis::StateFunc); } -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::HandleDone(const TActorContext& ctx) { - +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvNotifiedWhenDone::TPtr&, const TActorContext& ctx) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc closed"); Die(ctx); } +template <bool UseMigrationProtocol> +bool TReadSessionActor<UseMigrationProtocol>::ReadFromStreamOrDie(const TActorContext& ctx) { + if (!Request->GetStreamCtx()->Read()) { + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); + Die(ctx); + return false; + } -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) { + return true; +} +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) { auto& request = ev->Get()->Record; if constexpr (UseMigrationProtocol) { - auto token = request.token(); + const auto token = request.token(); request.set_token(""); - if (!token.empty()) { //TODO refreshtoken here + if (!token.empty()) { // TODO: refresh token here ctx.Send(ctx.SelfID, new TEvPQProxy::TEvAuth(token)); } } - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, - PQ_LOG_PREFIX << " grpc read done: success: " << ev->Get()->Success << " data: " << request); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read done" + << ": success# " << ev->Get()->Success + << ", data# " << request); if (!ev->Get()->Success) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed"); @@ -121,7 +116,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF return; } - auto GetAssignId = [](auto& request) { + auto getAssignId = [](auto& request) { if constexpr (UseMigrationProtocol) { return request.assign_id(); } else { @@ -132,33 +127,23 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF if constexpr (UseMigrationProtocol) { switch (request.request_case()) { case TClientMessage::kInitRequest: { - ctx.Send(ctx.SelfID, new TEvReadInit(request, Request->GetStreamCtx()->GetPeerName())); - break; + return (void)ctx.Send(ctx.SelfID, new TEvReadInit(request, Request->GetStreamCtx()->GetPeerName())); } - case TClientMessage::kStatus: { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(GetAssignId(request.status()))); - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); - return; - } - break; + case TClientMessage::kStatus: { + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(getAssignId(request.status()))); + return (void)ReadFromStreamOrDie(ctx); } + case TClientMessage::kRead: { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead()); // Proto read message have no parameters - break; + return (void)ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead()); } - case TClientMessage::kReleased: { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(GetAssignId(request.released()))); - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); - return; - } - break; + case TClientMessage::kReleased: { + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(getAssignId(request.released()))); + return (void)ReadFromStreamOrDie(ctx); } + case TClientMessage::kStartRead: { const auto& req = request.start_read(); @@ -166,188 +151,186 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF const ui64 commitOffset = req.commit_offset(); const bool verifyReadOffset = req.verify_read_offset(); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(GetAssignId(request.start_read()), readOffset, commitOffset, verifyReadOffset)); - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); - return; - } - break; + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(getAssignId(request.start_read()), readOffset, commitOffset, verifyReadOffset)); + return (void)ReadFromStreamOrDie(ctx); } + case TClientMessage::kCommit: { const auto& req = request.commit(); if (!req.cookies_size() && !RangesMode) { - CloseSession(TStringBuilder() << "can't commit without cookies", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "can't commit without cookies", ctx); } - if (RangesMode && !req.offset_ranges_size()) { - CloseSession(TStringBuilder() << "can't commit without offsets", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + if (RangesMode && !req.offset_ranges_size()) { + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "can't commit without offsets", ctx); } THashMap<ui64, TEvPQProxy::TCommitCookie> commitCookie; THashMap<ui64, TEvPQProxy::TCommitRange> commitRange; - for (auto& c: req.cookies()) { + for (const auto& c : req.cookies()) { commitCookie[c.assign_id()].Cookies.push_back(c.partition_cookie()); } - for (auto& c: req.offset_ranges()) { - commitRange[c.assign_id()].Ranges.push_back(std::make_pair(c.start_offset(), c.end_offset())); - } - for (auto& c : commitCookie) { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitCookie(c.first, std::move(c.second))); + for (const auto& c : req.offset_ranges()) { + commitRange[c.assign_id()].Ranges.emplace_back(c.start_offset(), c.end_offset()); } - for (auto& c : commitRange) { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitRange(c.first, std::move(c.second))); + for (auto& [id, cookies] : commitCookie) { + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitCookie(id, std::move(cookies))); } - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); - return; + for (auto& [id, range] : commitRange) { + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitRange(id, std::move(range))); } - break; + + return (void)ReadFromStreamOrDie(ctx); } default: { - CloseSession(TStringBuilder() << "unsupported request", PersQueue::ErrorCode::BAD_REQUEST, ctx); - break; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "unsupported request", ctx); } } } else { - switch(request.client_message_case()) { + switch (request.client_message_case()) { case TClientMessage::kInitRequest: { - ctx.Send(ctx.SelfID, new TEvReadInit(request, Request->GetStreamCtx()->GetPeerName())); - break; + return (void)ctx.Send(ctx.SelfID, new TEvReadInit(request, Request->GetStreamCtx()->GetPeerName())); } + case TClientMessage::kReadRequest: { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead(request.read_request().bytes_size())); - break; + return (void)ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead(request.read_request().bytes_size())); } - case TClientMessage::kPartitionSessionStatusRequest: { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(GetAssignId(request.partition_session_status_request()))); - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); - return; - } - break; + case TClientMessage::kPartitionSessionStatusRequest: { + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(getAssignId(request.partition_session_status_request()))); + return (void)ReadFromStreamOrDie(ctx); } - case TClientMessage::kStopPartitionSessionResponse: { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(GetAssignId(request.stop_partition_session_response()))); - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); - return; - } - break; + case TClientMessage::kStopPartitionSessionResponse: { + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(getAssignId(request.stop_partition_session_response()))); + return (void)ReadFromStreamOrDie(ctx); } + case TClientMessage::kStartPartitionSessionResponse: { const auto& req = request.start_partition_session_response(); const ui64 readOffset = req.read_offset(); const ui64 commitOffset = req.commit_offset(); - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(GetAssignId(req), readOffset, commitOffset, req.has_read_offset())); - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); - return; - } - break; + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(getAssignId(req), readOffset, commitOffset, req.has_read_offset())); + return (void)ReadFromStreamOrDie(ctx); } + case TClientMessage::kCommitOffsetRequest: { const auto& req = request.commit_offset_request(); if (!RangesMode || !req.commit_offsets_size()) { - CloseSession(TStringBuilder() << "can't commit without offsets", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "can't commit without offsets", ctx); } THashMap<ui64, TEvPQProxy::TCommitRange> commitRange; - for (auto& pc: req.commit_offsets()) { - auto id = pc.partition_session_id(); - for (auto& c: pc.offsets()) { - commitRange[id].Ranges.push_back(std::make_pair(c.start(), c.end())); + for (const auto& pc : req.commit_offsets()) { + for (const auto& c : pc.offsets()) { + commitRange[pc.partition_session_id()].Ranges.emplace_back(c.start(), c.end()); } } - for (auto& c : commitRange) { - ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitRange(c.first, std::move(c.second))); + for (auto& [id, range] : commitRange) { + ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitRange(id, std::move(range))); } - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); - return; - } - break; + return (void)ReadFromStreamOrDie(ctx); } + case TClientMessage::kUpdateTokenRequest: { - auto token = request.update_token_request().token(); - if (!token.empty()) { //TODO refreshtoken here + if (const auto token = request.update_token_request().token()) { // TODO: refresh token here ctx.Send(ctx.SelfID, new TEvPQProxy::TEvAuth(token)); } break; } default: { - CloseSession(TStringBuilder() << "unsupported request", PersQueue::ErrorCode::BAD_REQUEST, ctx); - break; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "unsupported request", ctx); } } } } +template <bool UseMigrationProtocol> +bool TReadSessionActor<UseMigrationProtocol>::WriteToStreamOrDie(const TActorContext& ctx, TServerMessage&& response, bool finish) { + const ui64 sz = response.ByteSize(); + ActiveWrites.push(sz); + + BytesInflight_ += sz; + if (BytesInflight) { + (*BytesInflight) += sz; + } + + bool res = false; + if (!finish) { + res = Request->GetStreamCtx()->Write(std::move(response)); + } else { + res = Request->GetStreamCtx()->WriteAndFinish(std::move(response), grpc::Status::OK); + } + + if (!res) { + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed at start"); + Die(ctx); + } + + return res; +} -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvWriteFinished::TPtr& ev, const TActorContext& ctx) { if (!ev->Get()->Success) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); + return Die(ctx); } + Y_VERIFY(!ActiveWrites.empty()); - ui64 sz = ActiveWrites.front(); + const auto sz = ActiveWrites.front(); ActiveWrites.pop(); + Y_VERIFY(BytesInflight_ >= sz); BytesInflight_ -= sz; - if (BytesInflight) (*BytesInflight) -= sz; + if (BytesInflight) { + (*BytesInflight) -= sz; + } ProcessReads(ctx); } - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) { + if (AuthInitActor) { + ctx.Send(AuthInitActor, new TEvents::TEvPoisonPill()); + } - ctx.Send(AuthInitActor, new TEvents::TEvPoisonPill()); - - for (auto& p : Partitions) { - ctx.Send(p.second.Actor, new TEvents::TEvPoisonPill()); + for (const auto& [_, info] : Partitions) { + if (info.Actor) { + ctx.Send(info.Actor, new TEvents::TEvPoisonPill()); + } - if (!p.second.Released) { - // ToDo[counters] - auto it = TopicCounters.find(p.second.Topic->GetInternalName()); + if (!info.Released) { + // TODO: counters + auto it = TopicCounters.find(info.Topic->GetInternalName()); Y_VERIFY(it != TopicCounters.end()); it->second.PartitionsInfly.Dec(); it->second.PartitionsReleased.Inc(); - if (p.second.Releasing) + if (info.Releasing) { it->second.PartitionsToBeReleased.Dec(); + } } } - for (auto& t : Topics) { - if (t.second.PipeClient) - NTabletPipe::CloseClient(ctx, t.second.PipeClient); + for (const auto& [_, holder] : Topics) { + if (holder.PipeClient) { + NTabletPipe::CloseClient(ctx, holder.PipeClient); + } } - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD"); if (BytesInflight) { (*BytesInflight) -= BytesInflight_; @@ -359,96 +342,114 @@ void TReadSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) { PartsPerSession.DecFor(Partitions.size(), 1); } + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD"); ctx.Send(GetPQReadServiceActorID(), new TEvPQProxy::TEvSessionDead(Cookie)); TActorBootstrapped<TReadSessionActor>::Die(ctx); } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvDone::TPtr&, const TActorContext& ctx) { - CloseSession(TStringBuilder() << "Reads done signal - closing everything", PersQueue::ErrorCode::OK, ctx); + CloseSession(PersQueue::ErrorCode::OK, "reads done signal, closing everything", ctx); +} + +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) { + CloseSession(ev->Get()->ErrorCode, ev->Get()->Reason, ctx); +} + +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvDieCommand::TPtr& ev, const TActorContext& ctx) { + CloseSession(ev->Get()->ErrorCode, ev->Get()->Reason, ctx); } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCommitCookie::TPtr& ev, const TActorContext& ctx) { RequestNotChecked = true; if (CommitsDisabled) { - CloseSession("commits in session are disabled by client option", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "commits in session are disabled by client option", ctx); } - const ui64& assignId = ev->Get()->AssignId; - auto it = Partitions.find(assignId); - if (it == Partitions.end()) //stale commit - ignore it + + auto it = Partitions.find(ev->Get()->AssignId); + if (it == Partitions.end()) { // stale commit - ignore it return; + } - for (auto& c : ev->Get()->CommitInfo.Cookies) { - if(RangesMode) { - CloseSession("Commits cookies in ranges commit mode is illegal", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + for (const auto c : ev->Get()->CommitInfo.Cookies) { + if (RangesMode) { + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "commits cookies in ranges commit mode is prohibited", ctx); } + it->second.NextCommits.insert(c); } ctx.Send(it->second.Actor, new TEvPQProxy::TEvCommitCookie(ev->Get()->AssignId, std::move(ev->Get()->CommitInfo))); } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCommitRange::TPtr& ev, const TActorContext& ctx) { RequestNotChecked = true; if (CommitsDisabled) { - CloseSession("commits in session are disabled by client option", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "commits in session are disabled by client option", ctx); } - const ui64& assignId = ev->Get()->AssignId; - auto it = Partitions.find(assignId); - if (it == Partitions.end()) //stale commit - ignore it + + auto it = Partitions.find(ev->Get()->AssignId); + if (it == Partitions.end()) { // stale commit - ignore it return; + } - for (auto& c : ev->Get()->CommitInfo.Ranges) { - if(!RangesMode) { - CloseSession("Commits ranges in cookies commit mode is illegal", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + for (const auto& [b, e] : ev->Get()->CommitInfo.Ranges) { + if (!RangesMode) { + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "commits ranges in cookies commit mode is prohibited", ctx); } - if (c.first >= c.second || it->second.NextRanges.Intersects(c.first, c.second) || c.first < it->second.Offset) { - CloseSession(TStringBuilder() << "Offsets range [" << c.first << ", " << c.second << ") has already committed offsets, double committing is forbiden; or incorrect", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + if (b >= e || it->second.NextRanges.Intersects(b, e) || b < it->second.Offset) { + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() + << "offsets range [" << b << ", " << e << ")" + << " has already committed offsets, double committing is forbiden or incorrect", ctx); } - it->second.NextRanges.InsertInterval(c.first, c.second); + + it->second.NextRanges.InsertInterval(b, e); } ctx.Send(it->second.Actor, new TEvPQProxy::TEvCommitRange(ev->Get()->AssignId, std::move(ev->Get()->CommitInfo))); } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuth::TPtr& ev, const TActorContext& ctx) { - ProcessAuth(ev->Get()->Auth, ctx); + const auto& auth = ev->Get()->Auth; + if (!auth.empty() && auth != Auth) { + Auth = auth; + Request->RefreshToken(auth, ctx, ctx.SelfID); + } } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvStartRead::TPtr& ev, const TActorContext& ctx) { RequestNotChecked = true; auto it = Partitions.find(ev->Get()->AssignId); if (it == Partitions.end() || it->second.Releasing) { - //do nothing - already released partition - LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got NOTACTUAL StartRead from client for partition with assign id " << ev->Get()->AssignId - << " at offset " << ev->Get()->ReadOffset); + // do nothing - already released partition + LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got irrelevant StartRead from client" + << ": partition# " << ev->Get()->AssignId + << ", offset# " << ev->Get()->ReadOffset); return; } - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got StartRead from client for " - << it->second.Partition << - " at readOffset " << ev->Get()->ReadOffset << - " commitOffset " << ev->Get()->CommitOffset); - //proxy request to partition - allow initing - //TODO: add here VerifyReadOffset too and check it againts Committed position + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got StartRead from client" + << ": partition# " << it->second.Partition + << ", readOffset# " << ev->Get()->ReadOffset + << ", commitOffset# " << ev->Get()->CommitOffset); + + // proxy request to partition - allow initing + // TODO: add here VerifyReadOffset too and check it againts Committed position ctx.Send(it->second.Actor, new TEvPQProxy::TEvLockPartition(ev->Get()->ReadOffset, ev->Get()->CommitOffset, ev->Get()->VerifyReadOffset, true)); } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReleased::TPtr& ev, const TActorContext& ctx) { RequestNotChecked = true; @@ -456,167 +457,150 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReleased::TP if (it == Partitions.end()) { return; } - if (!it->second.Releasing) { - CloseSession(TStringBuilder() << "Release of partition that is not requested for release is forbiden for " << it->second.Partition, PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + if (!it->second.Releasing) { + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() + << "release of partition that is not requested for release is forbiden for " << it->second.Partition, ctx); } - Y_VERIFY(it->second.LockSent); - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got Released from client for partition " << it->second.Partition); + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got Released from client" + << ": partition# " << it->second.Partition); + + Y_VERIFY(it->second.LockSent); ReleasePartition(it, true, ctx); } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const TActorContext& ctx) { auto it = Partitions.find(ev->Get()->AssignId); if (it == Partitions.end() || it->second.Releasing) { // Ignore request - client asking status after releasing of partition. return; } + ctx.Send(it->second.Actor, new TEvPQProxy::TEvGetStatus(ev->Get()->AssignId)); } -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::DropPartition(typename THashMap<ui64, TPartitionActorInfo>::iterator it, const TActorContext& ctx) { +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::DropPartition(typename TPartitionsMap::iterator it, const TActorContext& ctx) { ctx.Send(it->second.Actor, new TEvents::TEvPoisonPill()); + bool res = ActualPartitionActors.erase(it->second.Actor); Y_VERIFY(res); if (--NumPartitionsFromTopic[it->second.Topic->GetInternalName()] == 0) { - //ToDo[counters] - bool res_ = TopicCounters.erase(it->second.Topic->GetInternalName()); - Y_VERIFY(res_); + // TODO: counters + res = TopicCounters.erase(it->second.Topic->GetInternalName()); + Y_VERIFY(res); } if (SessionsActive) { PartsPerSession.DecFor(Partitions.size(), 1); } + BalancerGeneration.erase(it->first); Partitions.erase(it); + if (SessionsActive) { PartsPerSession.IncFor(Partitions.size(), 1); } } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCommitDone::TPtr& ev, const TActorContext& ctx) { - Y_VERIFY(!CommitsDisabled); - if (!ActualPartitionActor(ev->Sender)) + if (!ActualPartitionActors.contains(ev->Sender)) { return; + } - ui64 assignId = ev->Get()->AssignId; - - auto it = Partitions.find(assignId); + auto it = Partitions.find(ev->Get()->AssignId); Y_VERIFY(it != Partitions.end()); Y_VERIFY(it->second.Offset < ev->Get()->Offset); it->second.NextRanges.EraseInterval(it->second.Offset, ev->Get()->Offset); - - if (ev->Get()->StartCookie == Max<ui64>()) //means commit at start + if (ev->Get()->StartCookie == Max<ui64>()) { // means commit at start return; + } TServerMessage result; result.set_status(Ydb::StatusIds::SUCCESS); + if (!RangesMode) { if constexpr (UseMigrationProtocol) { for (ui64 i = ev->Get()->StartCookie; i <= ev->Get()->LastCookie; ++i) { auto c = result.mutable_committed()->add_cookies(); c->set_partition_cookie(i); - c->set_assign_id(assignId); + c->set_assign_id(ev->Get()->AssignId); it->second.NextCommits.erase(i); it->second.ReadIdCommitted = i; } } else { // commit on cookies not supported in this case Y_VERIFY(false); } - } else { if constexpr (UseMigrationProtocol) { auto c = result.mutable_committed()->add_offset_ranges(); - c->set_assign_id(assignId); + c->set_assign_id(ev->Get()->AssignId); c->set_start_offset(it->second.Offset); c->set_end_offset(ev->Get()->Offset); - } else { auto c = result.mutable_commit_offset_response()->add_partitions_committed_offsets(); - c->set_partition_session_id(assignId); + c->set_partition_session_id(ev->Get()->AssignId); c->set_committed_offset(ev->Get()->Offset); } } it->second.Offset = ev->Get()->Offset; - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " replying for commits from assignId " << assignId << " from " << ev->Get()->StartCookie << " to " << ev->Get()->LastCookie << " to offset " << it->second.Offset); - if (!WriteResponse(std::move(result))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); - return; - } + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " replying for commits" + << ": assignId# " << ev->Get()->AssignId + << ", from# " << ev->Get()->StartCookie + << ", to# " << ev->Get()->LastCookie + << ", offset# " << it->second.Offset); + WriteToStreamOrDie(ctx, std::move(result)); } - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReadSessionStatus::TPtr& ev, const TActorContext& ctx) { - THolder<TEvPQProxy::TEvReadSessionStatusResponse> result(new TEvPQProxy::TEvReadSessionStatusResponse()); - for (auto& p : Partitions) { + auto result = MakeHolder<TEvPQProxy::TEvReadSessionStatusResponse>(); + + for (const auto& [_, info] : Partitions) { auto part = result->Record.AddPartition(); - part->SetTopic(p.second.Partition.DiscoveryConverter->GetPrimaryPath()); - part->SetPartition(p.second.Partition.Partition); - part->SetAssignId(p.second.Partition.AssignId); - for (auto& c : p.second.NextCommits) { + part->SetTopic(info.Partition.DiscoveryConverter->GetPrimaryPath()); + part->SetPartition(info.Partition.Partition); + part->SetAssignId(info.Partition.AssignId); + part->SetReadIdCommitted(info.ReadIdCommitted); + part->SetLastReadId(info.ReadIdToResponse - 1); + part->SetTimestampMs(info.AssignTimestamp.MilliSeconds()); + + for (const auto c : info.NextCommits) { part->AddNextCommits(c); } - part->SetReadIdCommitted(p.second.ReadIdCommitted); - part->SetLastReadId(p.second.ReadIdToResponse - 1); - part->SetTimestampMs(p.second.AssignTimestamp.MilliSeconds()); } + result->Record.SetSession(Session); result->Record.SetTimestamp(StartTimestamp.MilliSeconds()); - result->Record.SetClientNode(PeerName); result->Record.SetProxyNodeId(ctx.SelfID.NodeId()); ctx.Send(ev->Sender, result.Release()); } -inline TString GetTopicSettingsPath(const PersQueue::V1::MigrationStreamingReadClientMessage::TopicReadSettings& settings) { - return settings.topic(); -} -inline TString GetTopicSettingsPath(const Topic::StreamReadMessage::InitRequest::TopicReadSettings& settings) { - return settings.path(); -} -inline i64 GetTopicSettingsReadFrom(const PersQueue::V1::MigrationStreamingReadClientMessage::TopicReadSettings& settings) { - return settings.start_from_written_at_ms(); -} -inline i64 GetTopicSettingsReadFrom(const Topic::StreamReadMessage::InitRequest::TopicReadSettings& settings) { - return ::google::protobuf::util::TimeUtil::TimestampToMilliseconds(settings.read_from()); -} - - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& ev, const TActorContext& ctx) { - - THolder<TEvReadInit> event(ev->Release()); - if (!Topics.empty()) { - //answer error - CloseSession("got second init request", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "got second init request", ctx); } - const auto& init = event->Request.init_request(); + const auto& init = ev->Get()->Request.init_request(); if (!init.topics_read_settings_size()) { - CloseSession("no topics in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "no topics in init request", ctx); } if (init.consumer().empty()) { - CloseSession("no consumer in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "no consumer in init request", ctx); } ClientId = NPersQueue::ConvertNewConsumerName(init.consumer(), ctx); @@ -626,9 +610,11 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& ClientPath = NPersQueue::StripLeadSlash(NPersQueue::MakeConsumerPath(init.consumer())); } - TStringBuilder session; - session << ClientPath << "_" << ctx.SelfID.NodeId() << "_" << Cookie << "_" << TAppData::RandomProvider->GenRand64() << "_v1"; - Session = session; + Session = TStringBuilder() << ClientPath + << "_" << ctx.SelfID.NodeId() + << "_" << Cookie + << "_" << TAppData::RandomProvider->GenRand64() + << "_" << "v1"; CommitsDisabled = false; if constexpr (UseMigrationProtocol) { @@ -646,107 +632,118 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& ReadTimestampMs = 0; // read_from per topic only ReadOnlyLocal = true; } + if (MaxTimeLagMs < 0) { - CloseSession("max_lag_duration_ms must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "max_lag_duration_ms must be nonnegative number", ctx); } + if (ReadTimestampMs < 0) { - CloseSession("start_from_written_at_ms must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "start_from_written_at_ms must be nonnegative number", ctx); } - PeerName = event->PeerName; + PeerName = ev->Get()->PeerName; + + auto getTopicPath = [](const auto& settings) { + if constexpr (UseMigrationProtocol) { + return settings.topic(); + } else { + return settings.path(); + } + }; + + auto getReadFrom = [](const auto& settings) { + if constexpr (UseMigrationProtocol) { + return settings.start_from_written_at_ms(); + } else { + return ::google::protobuf::util::TimeUtil::TimestampToMilliseconds(settings.read_from()); + } + }; for (const auto& topic : init.topics_read_settings()) { - TString topic_path = GetTopicSettingsPath(topic); - if (topic_path.empty()) { - CloseSession("empty topic in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + const TString path = getTopicPath(topic); + if (path.empty()) { + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "empty topic in init request", ctx); } - i64 read_from = GetTopicSettingsReadFrom(topic); + + const i64 read_from = getReadFrom(topic); if (read_from < 0) { - CloseSession("start_from_written_at_ms must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "start_from_written_at_ms must be nonnegative number", ctx); } - TopicsToResolve.insert(topic_path); + + TopicsToResolve.insert(path); } if (Request->GetInternalToken().empty()) { if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { - CloseSession("Unauthenticated access is forbidden, please provide credentials", PersQueue::ErrorCode::ACCESS_DENIED, ctx); - return; + return CloseSession(PersQueue::ErrorCode::ACCESS_DENIED, + "unauthenticated access is forbidden, please provide credentials", ctx); } } else { Y_VERIFY(Request->GetYdbToken()); Auth = *(Request->GetYdbToken()); Token = new NACLib::TUserToken(Request->GetInternalToken()); } - TopicsList = TopicsHandler.GetReadTopicsList( - TopicsToResolve, ReadOnlyLocal, Request->GetDatabaseName().GetOrElse(TString()) - ); + + TopicsList = TopicsHandler.GetReadTopicsList(TopicsToResolve, ReadOnlyLocal, + Request->GetDatabaseName().GetOrElse(TString())); if (!TopicsList.IsValid) { - return CloseSession( - TopicsList.Reason, - PersQueue::ErrorCode::BAD_REQUEST, ctx - ); + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TopicsList.Reason, ctx); } for (const auto& topic : init.topics_read_settings()) { - auto topicIter = TopicsList.ClientTopics.find(GetTopicSettingsPath(topic)); - Y_VERIFY(!topicIter.IsEnd()); - for (const auto& converter: topicIter->second) { + auto it = TopicsList.ClientTopics.find(getTopicPath(topic)); + Y_VERIFY(it != TopicsList.ClientTopics.end()); + + for (const auto& converter : it->second) { const auto internalName = converter->GetOriginalPath(); if constexpr (UseMigrationProtocol) { - for (i64 pg: topic.partition_group_ids()) { + for (const i64 pg : topic.partition_group_ids()) { if (pg <= 0) { - CloseSession("partition group id must be positive number", PersQueue::ErrorCode::BAD_REQUEST, - ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, + "partition group id must be positive number", ctx); } + if (pg > Max<ui32>()) { - CloseSession( - TStringBuilder() << "partition group id is too big: " << pg << " > " << Max<ui32>(), - PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() + << "partition group id is too big: " << pg << " > " << Max<ui32>(), ctx); } + TopicGroups[internalName].push_back(static_cast<ui32>(pg)); } + MaxLagByTopic[internalName] = MaxTimeLagMs; - ReadFromTimestamp[internalName] = GetTopicSettingsReadFrom(topic); + ReadFromTimestamp[internalName] = getReadFrom(topic); } else { - for (i64 p: topic.partition_ids()) { + for (const i64 p : topic.partition_ids()) { if (p < 0) { - CloseSession("partition id must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, - ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, + "partition id must be nonnegative number", ctx); } + if (p + 1 > Max<ui32>()) { - CloseSession( - TStringBuilder() << "partition id is too big: " << p << " > " << Max<ui32>() - 1, - PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() + << "partition id is too big: " << p << " > " << Max<ui32>() - 1, ctx); } + TopicGroups[internalName].push_back(static_cast<ui32>(p + 1)); } - MaxLagByTopic[internalName] = - ::google::protobuf::util::TimeUtil::DurationToMilliseconds(topic.max_lag());; - ReadFromTimestamp[internalName] = GetTopicSettingsReadFrom(topic); + + MaxLagByTopic[internalName] = ::google::protobuf::util::TimeUtil::DurationToMilliseconds(topic.max_lag());; + ReadFromTimestamp[internalName] = getReadFrom(topic); } } } - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " init: " << event->Request << " from " << PeerName); + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " read init" + << ": from# " << PeerName + << ", request# " << ev->Get()->Request); if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { SetupCounters(); } - AuthInitActor = ctx.Register(new TReadInitAndAuthActor( - ctx, ctx.SelfID, ClientId, Cookie, Session, SchemeCache, NewSchemeCache, Counters, Token, TopicsList, - TopicsHandler.GetLocalCluster() - )); - + RunAuthActor(ctx); auto subGroup = GetServiceCounters(Counters, "pqproxy|SLI"); Aggr = {{{{"Account", ClientPath.substr(0, ClientPath.find("/"))}}, {"total"}}}; @@ -756,41 +753,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& SLITotal.Inc(); } - -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::RegisterSession(const TActorId& pipe, const TString& topic, const TVector<ui32>& groups, const TActorContext& ctx) -{ - - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " register session to " << topic); - THolder<TEvPersQueue::TEvRegisterReadSession> request; - request.Reset(new TEvPersQueue::TEvRegisterReadSession); - auto& req = request->Record; - req.SetSession(Session); - req.SetClientNode(PeerName); - ActorIdToProto(pipe, req.MutablePipeClient()); - req.SetClientId(ClientId); - - for (ui32 i = 0; i < groups.size(); ++i) { - req.AddGroups(groups[i]); - } - - NTabletPipe::SendData(ctx, pipe, request.Release()); -} - -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::RegisterSessions(const TActorContext& ctx) { - InitDone = true; - - for (auto& t : Topics) { - RegisterSession(t.second.PipeClient, t.second.FullConverter->GetInternalName(), t.second.Groups, ctx); - NumPartitionsFromTopic[t.second.FullConverter->GetInternalName()] = 0; - } -} - - -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::SetupCounters() -{ +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::SetupCounters() { if (SessionsCreated) { return; } @@ -815,74 +779,66 @@ void TReadSessionActor<UseMigrationProtocol>::SetupCounters() ++(*SessionsCreated); ++(*SessionsActive); - PartsPerSession.IncFor(Partitions.size(), 1); //for 0 + PartsPerSession.IncFor(Partitions.size(), 1); // for 0 } - -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic) -{ +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic) { auto& topicCounters = TopicCounters[topic->GetInternalName()]; auto subGroup = GetServiceCounters(Counters, "pqproxy|readSession"); -//client/consumerPath Account/Producer OriginDC Topic/TopicPath auto aggr = NPersQueue::GetLabels(topic); - TVector<std::pair<TString, TString>> cons = {{"Client", ClientId}, {"ConsumerPath", ClientPath}}; + const TVector<std::pair<TString, TString>> cons = {{"Client", ClientId}, {"ConsumerPath", ClientPath}}; - topicCounters.PartitionsLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsLocked"}, true); - topicCounters.PartitionsReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsReleased"}, true); - topicCounters.PartitionsToBeReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsToBeReleased"}, false); - topicCounters.PartitionsToBeLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsToBeLocked"}, false); - topicCounters.PartitionsInfly = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsInfly"}, false); - topicCounters.Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsErrors"}, true); - topicCounters.Commits = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"Commits"}, true); - topicCounters.WaitsForData = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"WaitsForData"}, true); + topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsLocked"}, true); + topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsReleased"}, true); + topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsToBeReleased"}, false); + topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsToBeLocked"}, false); + topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsInfly"}, false); + topicCounters.Errors = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsErrors"}, true); + topicCounters.Commits = NPQ::TMultiCounter(subGroup, aggr, cons, {"Commits"}, true); + topicCounters.WaitsForData = NPQ::TMultiCounter(subGroup, aggr, cons, {"WaitsForData"}, true); topicCounters.CommitLatency = CommitLatency; topicCounters.SLIBigLatency = SLIBigLatency; topicCounters.SLITotal = SLITotal; } -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, const TString& cloudId, - const TString& dbId, const TString& folderId) +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, + const TString& cloudId, const TString& dbId, const TString& folderId) { auto& topicCounters = TopicCounters[topic->GetInternalName()]; auto subGroup = NPersQueue::GetCountersForStream(Counters); -//client/consumerPath Account/Producer OriginDC Topic/TopicPath auto aggr = NPersQueue::GetLabelsForStream(topic, cloudId, dbId, folderId); - TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}}; + const TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}}; - topicCounters.PartitionsLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked_per_second"}, true, "name"); - topicCounters.PartitionsReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_released_per_second"}, true, "name"); - topicCounters.PartitionsToBeReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_released"}, false, "name"); - topicCounters.PartitionsToBeLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_locked"}, false, "name"); - topicCounters.PartitionsInfly = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked"}, false, "name"); - topicCounters.Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_errors_per_second"}, true, "name"); - topicCounters.Commits = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.commits_per_second"}, true, "name"); - topicCounters.WaitsForData = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.waits_for_data"}, true, "name"); + topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked_per_second"}, true, "name"); + topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_released_per_second"}, true, "name"); + topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_released"}, false, "name"); + topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_locked"}, false, "name"); + topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked"}, false, "name"); + topicCounters.Errors = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_errors_per_second"}, true, "name"); + topicCounters.Commits = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.commits_per_second"}, true, "name"); + topicCounters.WaitsForData = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.waits_for_data"}, true, "name"); topicCounters.CommitLatency = CommitLatency; topicCounters.SLIBigLatency = SLIBigLatency; topicCounters.SLITotal = SLITotal; } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " auth ok" + << ": topics# " << ev->Get()->TopicAndTablets.size() + << ", initDone# " << InitDone); LastACLCheckTimestamp = ctx.Now(); - - LOG_INFO_S( - ctx, - NKikimrServices::PQ_READ_PROXY, - PQ_LOG_PREFIX << " auth ok, got " << ev->Get()->TopicAndTablets.size() << " topics, init done " << InitDone - ); - AuthInitActor = TActorId(); if (!InitDone) { - ui32 initBorder = AppData(ctx)->PQConfig.GetReadInitLatencyBigMs(); - ui32 readBorder = AppData(ctx)->PQConfig.GetReadLatencyBigMs(); - ui32 readBorderFromDisk = AppData(ctx)->PQConfig.GetReadLatencyFromDiskBigMs(); + const ui32 initBorder = AppData(ctx)->PQConfig.GetReadInitLatencyBigMs(); + const ui32 readBorder = AppData(ctx)->PQConfig.GetReadLatencyBigMs(); + const ui32 readBorderFromDisk = AppData(ctx)->PQConfig.GetReadLatencyFromDiskBigMs(); auto subGroup = GetServiceCounters(Counters, "pqproxy|SLI"); InitLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, Aggr, "ReadInit", initBorder, {100, 200, 500, 1000, 1500, 2000, 5000, 10000, 30000, 99999999}); @@ -893,47 +849,48 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk SLIBigReadLatency = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"ReadBigLatency"}, true, "sensor", false); ReadsTotal = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"ReadsTotal"}, true, "sensor", false); - ui32 initDurationMs = (ctx.Now() - StartTime).MilliSeconds(); + const ui32 initDurationMs = (ctx.Now() - StartTime).MilliSeconds(); InitLatency.IncFor(initDurationMs, 1); if (initDurationMs >= initBorder) { SLIBigLatency.Inc(); } - for (auto& [name, t] : ev->Get()->TopicAndTablets) { // ToDo - return something from Init and Auth Actor (Full Path - ?) + for (const auto& [name, t] : ev->Get()->TopicAndTablets) { // TODO: return something from Init and Auth Actor (Full Path - ?) auto internalName = t.TopicNameConverter->GetInternalName(); - auto topicGrIter = TopicGroups.find(name); - if (!topicGrIter.IsEnd()) { - auto value = std::move(topicGrIter->second); - TopicGroups.erase(topicGrIter); - TopicGroups.insert(std::make_pair(internalName, std::move(value))); + { + auto it = TopicGroups.find(name); + if (it != TopicGroups.end()) { + auto value = std::move(it->second); + TopicGroups.erase(it); + TopicGroups[internalName] = std::move(value); + } } - auto rtfsIter = ReadFromTimestamp.find(name); - if (!rtfsIter.IsEnd()) { - auto value = std::move(rtfsIter->second); - ReadFromTimestamp.erase(rtfsIter); - ReadFromTimestamp[internalName] = value; + { + auto it = ReadFromTimestamp.find(name); + if (it != ReadFromTimestamp.end()) { + auto value = std::move(it->second); + ReadFromTimestamp.erase(it); + ReadFromTimestamp[internalName] = std::move(value); + } } - auto lagIter = MaxLagByTopic.find(name); - if (!lagIter.IsEnd()) { - auto value = std::move(lagIter->second); - MaxLagByTopic.erase(lagIter); - MaxLagByTopic[internalName] = value; + { + auto it = MaxLagByTopic.find(name); + if (it != MaxLagByTopic.end()) { + auto value = std::move(it->second); + MaxLagByTopic.erase(it); + MaxLagByTopic[internalName] = std::move(value); + } } - auto& topicHolder = Topics[internalName]; - topicHolder.TabletID = t.TabletID; - topicHolder.FullConverter = t.TopicNameConverter; - topicHolder.CloudId = t.CloudId; - topicHolder.DbId = t.DbId; - topicHolder.FolderId = t.FolderId; - topicHolder.MeteringMode = t.MeteringMode; + + Topics[internalName] = TTopicHolder::FromTopicInfo(t); FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter; FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter; if (!GetMeteringMode()) { SetMeteringMode(t.MeteringMode); } else if (*GetMeteringMode() != t.MeteringMode) { - return CloseSession("Cannot read from topics with different metering modes", - PersQueue::ErrorCode::BAD_REQUEST, ctx); + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, + "cannot read from topics with different metering modes", ctx); } } @@ -943,158 +900,168 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk InitSession(ctx); } } else { - for (auto& [name, t] : ev->Get()->TopicAndTablets) { + for (const auto& [name, t] : ev->Get()->TopicAndTablets) { auto it = Topics.find(t.TopicNameConverter->GetInternalName()); if (it == Topics.end()) { - return CloseSession( - TStringBuilder() << "list of topics changed - new topic '" - << t.TopicNameConverter->GetPrintableString() << "' found", - PersQueue::ErrorCode::BAD_REQUEST, ctx - ); + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() + << "list of topics changed, new topic found: " << t.TopicNameConverter->GetPrintableString(), ctx); } if (t.MeteringMode != *GetMeteringMode()) { - return CloseSession( - TStringBuilder() << "Metering mode of topic: " << name << " has been changed", - PersQueue::ErrorCode::OVERLOAD, ctx - ); + return CloseSession(PersQueue::ErrorCode::OVERLOAD, TStringBuilder() + << "metering mode of topic: " << name << " has been changed", ctx); } } } } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& ctx) { TServerMessage result; result.set_status(Ydb::StatusIds::SUCCESS); result.mutable_init_response()->set_session_id(Session); - if (!WriteResponse(std::move(result))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); + if (!WriteToStreamOrDie(ctx, std::move(result))) { return; } - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); + if (!ReadFromStreamOrDie(ctx)) { return; } - for (auto& t : Topics) { - NTabletPipe::TClientConfig clientConfig; - - clientConfig.CheckAliveness = false; - - clientConfig.RetryPolicy = RetryPolicyForPipes; - t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig)); - - Y_VERIFY(t.second.FullConverter); - auto it = TopicGroups.find(t.second.FullConverter->GetInternalName()); + for (auto& [_, holder] : Topics) { + holder.PipeClient = CreatePipeClient(holder.TabletID, ctx); + Y_VERIFY(holder.FullConverter); + auto it = TopicGroups.find(holder.FullConverter->GetInternalName()); if (it != TopicGroups.end()) { - t.second.Groups = it->second; + holder.Groups = it->second; } } - RegisterSessions(ctx); + InitDone = true; + + for (const auto& [_, holder] : Topics) { + RegisterSession(holder.FullConverter->GetInternalName(), holder.PipeClient, holder.Groups, ctx); + NumPartitionsFromTopic[holder.FullConverter->GetInternalName()] = 0; + } ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); } -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx) { +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::RegisterSession(const TString& topic, const TActorId& pipe, const TVector<ui32>& groups, const TActorContext& ctx) { + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " register session" + << ": topic# " << topic); + + auto request = MakeHolder<TEvPersQueue::TEvRegisterReadSession>(); + + auto& req = request->Record; + req.SetSession(Session); + req.SetClientNode(PeerName); + ActorIdToProto(pipe, req.MutablePipeClient()); + req.SetClientId(ClientId); - auto& record = ev->Get()->Record; + for (ui32 i = 0; i < groups.size(); ++i) { + req.AddGroups(groups[i]); + } + + NTabletPipe::SendData(ctx, pipe, request.Release()); +} + +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->Record; Y_VERIFY(record.GetSession() == Session); Y_VERIFY(record.GetClientId() == ClientId); - TActorId pipe = ActorIdFromProto(record.GetPipeClient()); auto path = record.GetPath(); if (path.empty()) { path = record.GetTopic(); } - auto converterIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(path)); - if (converterIter.IsEnd()) { - LOG_DEBUG_S( - ctx, NKikimrServices::PQ_READ_PROXY, - PQ_LOG_PREFIX << " ignored ev lock for path = " << record.GetPath() << ", path not recognized" - ); - return; - } - //const auto& topic = converterIter->second->GetPrimaryPath(); - const auto& intName = converterIter->second->GetInternalName(); - auto jt = Topics.find(intName); - if (jt == Topics.end() || pipe != jt->second.PipeClient) { //this is message from old version of pipe - LOG_ALERT_S( - ctx, NKikimrServices::PQ_READ_PROXY, - PQ_LOG_PREFIX << " ignored ev lock for topic = " << intName - << " path recognized, but topic is unknown, this is unexpected" - ); + auto converterIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(path)); + if (converterIter == FullPathToConverter.end()) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " ignored ev lock" + << ": path# " << path + << ", reason# " << "path not recognized"); return; } - //ToDo[counters] - if (NumPartitionsFromTopic[converterIter->second->GetInternalName()]++ == 0) { - if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { - SetupTopicCounters(converterIter->second, jt->second.CloudId, jt->second.DbId, jt->second.FolderId); - } else { - SetupTopicCounters(converterIter->second); + const auto name = converterIter->second->GetInternalName(); + + { + auto it = Topics.find(name); + if (it == Topics.end() || it->second.PipeClient != ActorIdFromProto(record.GetPipeClient())) { + LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " ignored ev lock" + << ": path# " << name + << ", reason# " << "topic is unknown"); + return; + } + + // TODO: counters + if (NumPartitionsFromTopic[name]++ == 0) { + if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { + SetupTopicCounters(converterIter->second, it->second.CloudId, it->second.DbId, it->second.FolderId); + } else { + SetupTopicCounters(converterIter->second); + } } } - //ToDo[counters] - auto it = TopicCounters.find(converterIter->second->GetInternalName()); + // TODO: counters + auto it = TopicCounters.find(name); Y_VERIFY(it != TopicCounters.end()); - ui64 assignId = NextAssignId++; + Y_VERIFY(record.GetGeneration() > 0); + const ui64 assignId = NextAssignId++; BalancerGeneration[assignId] = {record.GetGeneration(), record.GetStep()}; - TPartitionId partitionId{converterIter->second, record.GetPartition(), assignId}; + const TPartitionId partitionId{converterIter->second, record.GetPartition(), assignId}; - IActor* partitionActor = new TPartitionActor( - ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(), - record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC, RangesMode, - converterIter->second, UseMigrationProtocol); + const TActorId actorId = ctx.Register(new TPartitionActor( + ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(), + record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC, RangesMode, + converterIter->second, UseMigrationProtocol)); - TActorId actorId = ctx.Register(partitionActor); if (SessionsActive) { PartsPerSession.DecFor(Partitions.size(), 1); } - Y_VERIFY(record.GetGeneration() > 0); - auto pp = Partitions.insert(std::make_pair(assignId, TPartitionActorInfo{actorId, partitionId, converterIter->second, ctx})); - Y_VERIFY(pp.second); + + bool res = Partitions.emplace(assignId, TPartitionActorInfo(actorId, partitionId, converterIter->second, ctx.Now())).second; + Y_VERIFY(res); + if (SessionsActive) { PartsPerSession.IncFor(Partitions.size(), 1); } - bool res = ActualPartitionActors.insert(actorId).second; + res = ActualPartitionActors.insert(actorId).second; Y_VERIFY(res); it->second.PartitionsLocked.Inc(); it->second.PartitionsInfly.Inc(); - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Assign: " << record); + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign" + << ": record# " << record); ctx.Send(actorId, new TEvPQProxy::TEvLockPartition(0, 0, false, false)); } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) { - if (!ActualPartitionActor(ev->Sender)) + if (!ActualPartitionActors.contains(ev->Sender)) { return; + } auto it = Partitions.find(ev->Get()->Partition.AssignId); Y_VERIFY(it != Partitions.end()); Y_VERIFY(!it->second.Releasing); // if releasing and no lock sent yet - then server must already release partition + TServerMessage result; + result.set_status(Ydb::StatusIds::SUCCESS); + if (ev->Get()->Init) { Y_VERIFY(!it->second.LockSent); - it->second.LockSent = true; it->second.Offset = ev->Get()->Offset; - TServerMessage result; - result.set_status(Ydb::StatusIds::SUCCESS); - if constexpr (UseMigrationProtocol) { result.mutable_assigned()->mutable_topic()->set_path(it->second.Topic->GetFederationPath()); result.mutable_assigned()->set_cluster(it->second.Topic->GetCluster()); @@ -1103,9 +1070,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta result.mutable_assigned()->set_read_offset(ev->Get()->Offset); result.mutable_assigned()->set_end_offset(ev->Get()->EndOffset); - } else { - // TODO GetFederationPath() -> GetFederationPathWithDC() + // TODO: GetFederationPath() -> GetFederationPathWithDC() result.mutable_start_partition_session_request()->mutable_partition_session()->set_path(it->second.Topic->GetFederationPath()); result.mutable_start_partition_session_request()->mutable_partition_session()->set_partition_id(ev->Get()->Partition.Partition); result.mutable_start_partition_session_request()->mutable_partition_session()->set_partition_session_id(it->first); @@ -1114,29 +1080,10 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta result.mutable_start_partition_session_request()->mutable_partition_offsets()->set_start(ev->Get()->Offset); result.mutable_start_partition_session_request()->mutable_partition_offsets()->set_end(ev->Get()->EndOffset); } - - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " sending to client create partition stream event"); - - auto pp = it->second.Partition; - pp.AssignId = 0; - auto jt = PartitionToControlMessages.find(pp); - if (jt == PartitionToControlMessages.end()) { - if (!WriteResponse(std::move(result))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); - return; - } - } else { - Y_VERIFY(jt->second.Infly); - jt->second.ControlMessages.push_back(result); - } } else { Y_VERIFY(it->second.LockSent); - TServerMessage result; - result.set_status(Ydb::StatusIds::SUCCESS); - - if constexpr(UseMigrationProtocol) { + if constexpr (UseMigrationProtocol) { result.mutable_partition_status()->mutable_topic()->set_path(it->second.Topic->GetFederationPath()); result.mutable_partition_status()->set_cluster(it->second.Topic->GetCluster()); result.mutable_partition_status()->set_partition(ev->Get()->Partition.Partition); @@ -1145,7 +1092,6 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta result.mutable_partition_status()->set_committed_offset(ev->Get()->Offset); result.mutable_partition_status()->set_end_offset(ev->Get()->EndOffset); result.mutable_partition_status()->set_write_watermark_ms(ev->Get()->WriteTimestampEstimateMs); - } else { result.mutable_partition_session_status_response()->set_partition_session_id(it->first); @@ -1155,169 +1101,157 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta *result.mutable_partition_session_status_response()->mutable_write_time_high_watermark() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(ev->Get()->WriteTimestampEstimateMs); } + } - auto pp = it->second.Partition; - pp.AssignId = 0; - auto jt = PartitionToControlMessages.find(pp); - if (jt == PartitionToControlMessages.end()) { - if (!WriteResponse(std::move(result))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); - return; - } - } else { - Y_VERIFY(jt->second.Infly); - jt->second.ControlMessages.push_back(result); - } + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " sending to client create partition stream event"); + SendControlMessage(it->second.Partition, std::move(result), ctx); +} + +template <bool UseMigrationProtocol> +bool TReadSessionActor<UseMigrationProtocol>::SendControlMessage(TPartitionId id, TServerMessage&& message, const TActorContext& ctx) { + id.AssignId = 0; + + auto it = PartitionToControlMessages.find(id); + if (it == PartitionToControlMessages.end()) { + return WriteToStreamOrDie(ctx, std::move(message)); + } else { + Y_VERIFY(it->second.Infly); + it->second.ControlMessages.push_back(std::move(message)); } + + return true; } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvError::TPtr& ev, const TActorContext& ctx) { - CloseSession(ev->Get()->Record.GetDescription(), ConvertOldCode(ev->Get()->Record.GetCode()), ctx); + CloseSession(ConvertOldCode(ev->Get()->Record.GetCode()), ev->Get()->Record.GetDescription(), ctx); } - -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::SendReleaseSignalToClient(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it, bool kill, const TActorContext& ctx) -{ +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::SendReleaseSignal(typename TPartitionsMap::iterator it, bool kill, const TActorContext& ctx) { TServerMessage result; result.set_status(Ydb::StatusIds::SUCCESS); - if constexpr(UseMigrationProtocol) { + if constexpr (UseMigrationProtocol) { result.mutable_release()->mutable_topic()->set_path(it->second.Topic->GetFederationPath()); result.mutable_release()->set_cluster(it->second.Topic->GetCluster()); result.mutable_release()->set_partition(it->second.Partition.Partition); result.mutable_release()->set_assign_id(it->second.Partition.AssignId); result.mutable_release()->set_forceful_release(kill); result.mutable_release()->set_commit_offset(it->second.Offset); - } else { result.mutable_stop_partition_session_request()->set_partition_session_id(it->second.Partition.AssignId); result.mutable_stop_partition_session_request()->set_graceful(!kill); result.mutable_stop_partition_session_request()->set_committed_offset(it->second.Offset); } - auto pp = it->second.Partition; - pp.AssignId = 0; - auto jt = PartitionToControlMessages.find(pp); - if (jt == PartitionToControlMessages.end()) { - if (!WriteResponse(std::move(result))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); - return; - } - } else { - Y_VERIFY(jt->second.Infly); - jt->second.ControlMessages.push_back(result); + if (!SendControlMessage(it->second.Partition, std::move(result), ctx)) { + return; } + Y_VERIFY(it->second.LockSent); it->second.ReleaseSent = true; } - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record; + const auto& record = ev->Get()->Record; Y_VERIFY(record.GetSession() == Session); Y_VERIFY(record.GetClientId() == ClientId); - TString topicPath = NPersQueue::NormalizeFullPath(record.GetPath()); - ui32 group = record.HasGroup() ? record.GetGroup() : 0; - auto pathIter = FullPathToConverter.find(topicPath); - Y_VERIFY(!pathIter.IsEnd()); - auto it = Topics.find(pathIter->second->GetInternalName()); - Y_VERIFY(!it.IsEnd()); - auto& converter = it->second.FullConverter; + const ui32 group = record.HasGroup() ? record.GetGroup() : 0; - TActorId pipe = ActorIdFromProto(record.GetPipeClient()); + auto pathIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(record.GetPath())); + Y_VERIFY(pathIter != FullPathToConverter.end()); - if (pipe != it->second.PipeClient) { //this is message from old version of pipe + auto it = Topics.find(pathIter->second->GetInternalName()); + Y_VERIFY(it != Topics.end()); + + if (it->second.PipeClient != ActorIdFromProto(record.GetPipeClient())) { return; } + auto& converter = it->second.FullConverter; + for (ui32 c = 0; c < record.GetCount(); ++c) { Y_VERIFY(!Partitions.empty()); - TActorId actorId = TActorId{}; + TActorId actorId; auto jt = Partitions.begin(); ui32 i = 0; + for (auto it = Partitions.begin(); it != Partitions.end(); ++it) { if (it->second.Topic->GetInternalName() == converter->GetInternalName() && !it->second.Releasing && (group == 0 || it->second.Partition.Partition + 1 == group) ) { ++i; - if (rand() % i == 0) { //will lead to 1/n probability for each of n partitions + if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions actorId = it->second.Actor; jt = it; } } } + Y_VERIFY(actorId); - { - //ToDo[counters] - auto it = TopicCounters.find(converter->GetInternalName()); - Y_VERIFY(it != TopicCounters.end()); - it->second.PartitionsToBeReleased.Inc(); - } + // TODO: counters + auto it = TopicCounters.find(converter->GetInternalName()); + Y_VERIFY(it != TopicCounters.end()); + it->second.PartitionsToBeReleased.Inc(); - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " releasing " << jt->second.Partition); + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " releasing" + << ": partition# " << jt->second.Partition); jt->second.Releasing = true; - if (!jt->second.LockSent) { //no lock yet - can release silently + + if (!jt->second.LockSent) { // no lock yet - can release silently ReleasePartition(jt, true, ctx); } else { - SendReleaseSignalToClient(jt, false, ctx); + SendReleaseSignal(jt, false, ctx); } } } - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionReleased::TPtr& ev, const TActorContext& ctx) { - if (!ActualPartitionActor(ev->Sender)) + if (!ActualPartitionActors.contains(ev->Sender)) { return; + } - const auto assignId = ev->Get()->Partition.AssignId; - - auto it = Partitions.find(assignId); + auto it = Partitions.find(ev->Get()->Partition.AssignId); Y_VERIFY(it != Partitions.end()); Y_VERIFY(it->second.Releasing); - ReleasePartition(it, false, ctx); //no reads could be here - this is release from partition + ReleasePartition(it, false, ctx); // no reads could be here - this is release from partition } -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::InformBalancerAboutRelease(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it, const TActorContext& ctx) { - - THolder<TEvPersQueue::TEvPartitionReleased> request; - request.Reset(new TEvPersQueue::TEvPartitionReleased); - auto& req = request->Record; - +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::InformBalancerAboutRelease(typename TPartitionsMap::iterator it, const TActorContext& ctx) { const auto& converter = it->second.Topic; auto jt = Topics.find(converter->GetInternalName()); Y_VERIFY(jt != Topics.end()); + auto request = MakeHolder<TEvPersQueue::TEvPartitionReleased>(); + + auto& req = request->Record; req.SetSession(Session); ActorIdToProto(jt->second.PipeClient, req.MutablePipeClient()); req.SetClientId(ClientId); req.SetTopic(converter->GetPrimaryPath()); req.SetPartition(it->second.Partition.Partition); - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " released: " << it->second.Partition); - + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " released" + << ": partition# " << it->second.Partition); NTabletPipe::SendData(ctx, jt->second.PipeClient, request.Release()); } - -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx) { - - if (errorCode != PersQueue::ErrorCode::OK) { - if (InternalErrorCode(errorCode)) { +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::CloseSession(PersQueue::ErrorCode::ErrorCode code, const TString& reason, const TActorContext& ctx) { + if (code != PersQueue::ErrorCode::OK) { + if (InternalErrorCode(code)) { SLIErrors.Inc(); } + if (Errors) { ++(*Errors); } else if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { @@ -1325,169 +1259,155 @@ void TReadSessionActor<UseMigrationProtocol>::CloseSession(const TString& errorR } TServerMessage result; - result.set_status(ConvertPersQueueInternalCodeToStatus(errorCode)); - - FillIssue(result.add_issues(), errorCode, errorReason); - - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " closed with error reason: " << errorReason); + result.set_status(ConvertPersQueueInternalCodeToStatus(code)); + FillIssue(result.add_issues(), code, reason); - if (!WriteResponse(std::move(result), true)) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " closed with error" + << ": reason# " << reason); + if (!WriteToStreamOrDie(ctx, std::move(result), true)) { return; } } else { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " closed"); - if (!Request->GetStreamCtx()->Finish(std::move(grpc::Status::OK))) { + if (!Request->GetStreamCtx()->Finish(grpc::Status::OK)) { LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc double finish failed"); - Die(ctx); - return; } - } Die(ctx); } - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { - TEvTabletPipe::TEvClientConnected *msg = ev->Get(); + const auto* msg = ev->Get(); + if (msg->Status != NKikimrProto::OK) { if (msg->Dead) { - CloseSession(TStringBuilder() << "one of topics is deleted, tablet " << msg->TabletId, PersQueue::ErrorCode::BAD_REQUEST, ctx); - return; + return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() + << "one of topics is deleted, tablet " << msg->TabletId, ctx); } - //TODO: remove it - CloseSession(TStringBuilder() << "unable to connect to one of topics, tablet " << msg->TabletId, PersQueue::ErrorCode::ERROR, ctx); - return; + + // TODO: remove it + return CloseSession(PersQueue::ErrorCode::ERROR, TStringBuilder() + << "unable to connect to one of topics, tablet " << msg->TabletId, ctx); #if 0 - const bool isAlive = ProcessBalancerDead(msg->TabletId, ctx); // returns false if actor died - Y_UNUSED(isAlive); + ProcessBalancerDead(msg->TabletId, ctx); // returns false if actor died return; #endif } } -template<bool UseMigrationProtocol> -bool TReadSessionActor<UseMigrationProtocol>::ActualPartitionActor(const TActorId& part) { - return ActualPartitionActors.contains(part); +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + ProcessBalancerDead(ev->Get()->TabletId, ctx); } +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::ReleasePartition(typename TPartitionsMap::iterator it, bool couldBeReads, const TActorContext& ctx) { + // TODO: counters + auto jt = TopicCounters.find(it->second.Topic->GetInternalName()); + Y_VERIFY(jt != TopicCounters.end()); + + jt->second.PartitionsReleased.Inc(); + jt->second.PartitionsInfly.Dec(); -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::ReleasePartition(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it, - bool couldBeReads, const TActorContext& ctx) -{ - { - //ToDo[counters] - auto jt = TopicCounters.find(it->second.Topic->GetInternalName()); - Y_VERIFY(jt != TopicCounters.end()); - jt->second.PartitionsReleased.Inc(); - jt->second.PartitionsInfly.Dec(); - if (!it->second.Released && it->second.Releasing) { - jt->second.PartitionsToBeReleased.Dec(); - } + if (!it->second.Released && it->second.Releasing) { + jt->second.PartitionsToBeReleased.Dec(); } Y_VERIFY(couldBeReads || !it->second.Reading); - //process reads - typename TFormedReadResponse<TServerMessage>::TPtr formedResponseToAnswer; + typename TFormedReadResponse<TServerMessage>::TPtr response; + + // process reads if (it->second.Reading) { - const auto readIt = PartitionToReadResponse.find(it->second.Actor); + auto readIt = PartitionToReadResponse.find(it->second.Actor); Y_VERIFY(readIt != PartitionToReadResponse.end()); if (--readIt->second->RequestsInfly == 0) { - formedResponseToAnswer = readIt->second; + response = readIt->second; } } InformBalancerAboutRelease(it, ctx); - it->second.Released = true; //to force drop - DropPartition(it, ctx); //partition will be dropped + it->second.Released = true; // to force drop + DropPartition(it, ctx); // partition will be dropped - if (formedResponseToAnswer) { - if (const auto ru = CalcRuConsumption(PrepareResponse(formedResponseToAnswer))) { - formedResponseToAnswer->RequiredQuota = ru; + if (response) { + if (const auto ru = CalcRuConsumption(PrepareResponse(response))) { + response->RequiredQuota = ru; if (MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx)) { Y_VERIFY(!PendingQuota); - PendingQuota = formedResponseToAnswer; + PendingQuota = response; } else { - WaitingQuota.push_back(formedResponseToAnswer); + WaitingQuota.push_back(response); } } else { - ProcessAnswer(ctx, formedResponseToAnswer); // returns false if actor died + ProcessAnswer(response, ctx); } } } -template<bool UseMigrationProtocol> -bool TReadSessionActor<UseMigrationProtocol>::ProcessBalancerDead(const ui64 tablet, const TActorContext& ctx) { +template <bool UseMigrationProtocol> +TActorId TReadSessionActor<UseMigrationProtocol>::CreatePipeClient(ui64 tabletId, const TActorContext& ctx) { + NTabletPipe::TClientConfig clientConfig; + clientConfig.CheckAliveness = false; + clientConfig.RetryPolicy = RetryPolicyForPipes; + return ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig)); +} + +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) { for (auto& t : Topics) { - if (t.second.TabletID == tablet) { - LOG_INFO_S( - ctx, NKikimrServices::PQ_READ_PROXY, - PQ_LOG_PREFIX << " balancer for topic " << t.second.FullConverter->GetPrintableString() - << " is dead, restarting all from this topic" - ); - - //Drop all partitions from this topic + if (t.second.TabletID == tabletId) { + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " balancer dead, restarting all from topic" + << ": topic# " << t.second.FullConverter->GetPrintableString()); + + // Drop all partitions from this topic for (auto it = Partitions.begin(); it != Partitions.end();) { - if (it->second.Topic->GetInternalName() == t.first) { //partition from this topic + if (it->second.Topic->GetInternalName() == t.first) { // partition from this topic // kill actor auto jt = it; ++it; + if (jt->second.LockSent) { - SendReleaseSignalToClient(jt, true, ctx); + SendReleaseSignal(jt, true, ctx); } + ReleasePartition(jt, true, ctx); } else { ++it; } } - //reconnect pipe - NTabletPipe::TClientConfig clientConfig; - clientConfig.CheckAliveness = false; - clientConfig.RetryPolicy = RetryPolicyForPipes; - t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig)); + t.second.PipeClient = CreatePipeClient(t.second.TabletID, ctx); + if (InitDone) { if (PipeReconnects) { ++(*PipeReconnects); } + if (Errors) { ++(*Errors); } - RegisterSession(t.second.PipeClient, t.first, t.second.Groups, ctx); + RegisterSession(t.first, t.second.PipeClient, t.second.Groups, ctx); } } } - return true; -} - - -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { - const bool isAlive = ProcessBalancerDead(ev->Get()->TabletId, ctx); // returns false if actor died - Y_UNUSED(isAlive); } -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequestProxy::TEvRefreshTokenResponse::TPtr &ev , const TActorContext& ctx) { +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequestProxy::TEvRefreshTokenResponse::TPtr& ev , const TActorContext& ctx) { if (ev->Get()->Authenticated && !ev->Get()->InternalToken.empty()) { Token = new NACLib::TUserToken(ev->Get()->InternalToken); ForceACLCheck = true; + if constexpr (!UseMigrationProtocol) { TServerMessage result; result.set_status(Ydb::StatusIds::SUCCESS); result.mutable_update_token_response(); - if (!WriteResponse(std::move(result))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); - return; - } + WriteToStreamOrDie(ctx, std::move(result)); } } else { Request->ReplyUnauthenticated("refreshed token is invalid"); @@ -1495,46 +1415,33 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequestP } } -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::ProcessAuth(const TString& auth, const TActorContext& ctx) { - if (!auth.empty() && auth != Auth) { - Auth = auth; - Request->RefreshToken(auth, ctx, ctx.SelfID); - } -} - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext& ctx) { RequestNotChecked = true; - THolder<TEvPQProxy::TEvRead> event(ev->Release()); - - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); + if (!ReadFromStreamOrDie(ctx)) { return; } - - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got read request with guid: " << event->Guid); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got read request" + << ": guid# " << ev->Get()->Guid); if constexpr (UseMigrationProtocol) { - Reads.emplace_back(event.Release()); + Reads.emplace_back(ev->Release()); } else { - ReadSizeBudget += event->MaxSize; + ReadSizeBudget += ev->Get()->MaxSize; } ProcessReads(ctx); } - -template<typename TServerMessage> +template <typename TServerMessage> i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) { constexpr bool UseMigrationProtocol = std::is_same_v<TServerMessage, PersQueue::V1::MigrationStreamingReadServerMessage>; + if constexpr (UseMigrationProtocol) { Y_VERIFY(resp.data_batch().partition_data_size() == 1); Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0)); - } else { Y_VERIFY(resp.read_response().partition_data_size() == 1); Response.mutable_read_response()->add_partition_data()->Swap(resp.mutable_read_response()->mutable_partition_data(0)); @@ -1547,50 +1454,53 @@ i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) { return ByteSize - prev; } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::TPtr& ev, const TActorContext& ctx) { - TActorId sender = ev->Sender; - if (!ActualPartitionActor(sender)) + if (!ActualPartitionActors.contains(ev->Sender)) { return; + } - THolder<TEvReadResponse> event(ev->Release()); - + auto& response = ev->Get()->Response; ui64 partitionCookie; ui64 assignId; + if constexpr (UseMigrationProtocol) { - Y_VERIFY(event->Response.data_batch().partition_data_size() == 1); - partitionCookie = event->Response.data_batch().partition_data(0).cookie().partition_cookie(); + Y_VERIFY(response.data_batch().partition_data_size() == 1); + partitionCookie = response.data_batch().partition_data(0).cookie().partition_cookie(); Y_VERIFY(partitionCookie != 0); // cookie is assigned - assignId = event->Response.data_batch().partition_data(0).cookie().assign_id(); - + assignId = response.data_batch().partition_data(0).cookie().assign_id(); } else { - Y_VERIFY(event->Response.read_response().partition_data_size() == 1); - assignId = event->Response.read_response().partition_data(0).partition_session_id(); + Y_VERIFY(response.read_response().partition_data_size() == 1); + assignId = response.read_response().partition_data(0).partition_session_id(); } - const auto partitionIt = Partitions.find(assignId); - Y_VERIFY(partitionIt != Partitions.end()); - Y_VERIFY(partitionIt->second.Reading); - partitionIt->second.Reading = false; - - if constexpr (UseMigrationProtocol) { - partitionIt->second.ReadIdToResponse = partitionCookie + 1; + typename TFormedReadResponse<TServerMessage>::TPtr formedResponse; + { + auto it = PartitionToReadResponse.find(ev->Sender); + Y_VERIFY(it != PartitionToReadResponse.end()); + formedResponse = it->second; } - auto it = PartitionToReadResponse.find(sender); - Y_VERIFY(it != PartitionToReadResponse.end()); + auto it = Partitions.find(assignId); + Y_VERIFY(it != Partitions.end()); + Y_VERIFY(it->second.Reading); + it->second.Reading = false; - typename TFormedReadResponse<TServerMessage>::TPtr formedResponse = it->second; + if constexpr (UseMigrationProtocol) { + it->second.ReadIdToResponse = partitionCookie + 1; + } - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " read done guid " << formedResponse->Guid - << partitionIt->second.Partition - << " size " << event->Response.ByteSize()); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " read done" + << ": guid# " << formedResponse->Guid + << ", partition# " << it->second.Partition + << ", size# " << response.ByteSize()); - const i64 diff = formedResponse->ApplyResponse(std::move(event->Response)); - if (event->FromDisk) { + const i64 diff = formedResponse->ApplyResponse(std::move(response)); + if (ev->Get()->FromDisk) { formedResponse->FromDisk = true; } - formedResponse->WaitQuotaTime = Max(formedResponse->WaitQuotaTime, event->WaitQuotaTime); + + formedResponse->WaitQuotaTime = Max(formedResponse->WaitQuotaTime, ev->Get()->WaitQuotaTime); --formedResponse->RequestsInfly; BytesInflight_ += diff; @@ -1608,28 +1518,16 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::T WaitingQuota.push_back(formedResponse); } } else { - ProcessAnswer(ctx, formedResponse); + ProcessAnswer(formedResponse, ctx); } } } -template<bool UseMigrationProtocol> -bool TReadSessionActor<UseMigrationProtocol>::WriteResponse(TServerMessage&& response, bool finish) { - ui64 sz = response.ByteSize(); - ActiveWrites.push(sz); - BytesInflight_ += sz; - if (BytesInflight) { - (*BytesInflight) += sz; - } - - return finish ? Request->GetStreamCtx()->WriteAndFinish(std::move(response), grpc::Status::OK) : Request->GetStreamCtx()->Write(std::move(response)); -} - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> ui64 TReadSessionActor<UseMigrationProtocol>::PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse) { formedResponse->ByteSizeBeforeFiltering = formedResponse->Response.ByteSize(); - if constexpr(UseMigrationProtocol) { + if constexpr (UseMigrationProtocol) { formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch()); } else { formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response()); @@ -1638,35 +1536,40 @@ ui64 TReadSessionActor<UseMigrationProtocol>::PrepareResponse(typename TFormedRe return formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0; } -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& ctx, typename TFormedReadResponse<TServerMessage>::TPtr formedResponse) { +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse, const TActorContext& ctx) { ui32 readDurationMs = (ctx.Now() - formedResponse->Start - formedResponse->WaitQuotaTime).MilliSeconds(); + if (formedResponse->FromDisk) { ReadLatencyFromDisk.IncFor(readDurationMs, 1); } else { ReadLatency.IncFor(readDurationMs, 1); } - if (readDurationMs >= (formedResponse->FromDisk ? AppData(ctx)->PQConfig.GetReadLatencyFromDiskBigMs() : AppData(ctx)->PQConfig.GetReadLatencyBigMs())) { + + const auto latencyThreshold = formedResponse->FromDisk + ? AppData(ctx)->PQConfig.GetReadLatencyFromDiskBigMs() + : AppData(ctx)->PQConfig.GetReadLatencyBigMs(); + if (readDurationMs >= latencyThreshold) { SLIBigReadLatency.Inc(); } Y_VERIFY(formedResponse->RequestsInfly == 0); const ui64 diff = formedResponse->ByteSizeBeforeFiltering; - ui64 sizeEstimation = formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0; + const ui64 sizeEstimation = formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0; + if constexpr (!UseMigrationProtocol) { formedResponse->Response.mutable_read_response()->set_bytes_size(sizeEstimation); } if (formedResponse->HasMessages) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " response to read " << formedResponse->Guid); - - if (!WriteResponse(std::move(formedResponse->Response))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " response to read" + << ": guid# " << formedResponse->Guid); + if (!WriteToStreamOrDie(ctx, std::move(formedResponse->Response))) { return; } } else { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " empty read result " << formedResponse->Guid << ", start new reading"); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " empty read result, start new reading" + << ": guid# " << formedResponse->Guid); } BytesInflight_ -= diff; @@ -1674,27 +1577,28 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& (*BytesInflight) -= diff; } - for (auto& pp : formedResponse->PartitionsTookPartInControlMessages) { - auto it = PartitionToControlMessages.find(pp); + for (const auto& id : formedResponse->PartitionsTookPartInControlMessages) { + auto it = PartitionToControlMessages.find(id); Y_VERIFY(it != PartitionToControlMessages.end()); + if (--it->second.Infly == 0) { for (auto& r : it->second.ControlMessages) { - if (!WriteResponse(std::move(r))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); + if (!WriteToStreamOrDie(ctx, std::move(r))) { return; } } + PartitionToControlMessages.erase(it); } } - for (const TActorId& p : formedResponse->PartitionsTookPartInRead) { - PartitionToReadResponse.erase(p); + for (const auto& id : formedResponse->PartitionsTookPartInRead) { + PartitionToReadResponse.erase(id); } RequestedBytes -= formedResponse->RequestedBytes; ReadsInfly--; + if constexpr (!UseMigrationProtocol) { ReadSizeBudget += formedResponse->RequestedBytes; ReadSizeBudget -= sizeEstimation; @@ -1712,35 +1616,34 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& } } - ProcessReads(ctx); // returns false if actor died -} - -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) { - CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx); + ProcessReads(ctx); } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> ui32 TReadSessionActor<UseMigrationProtocol>::NormalizeMaxReadMessagesCount(ui32 sourceValue) { ui32 count = Min<ui32>(sourceValue, Max<i32>()); + if (count == 0) { count = Max<i32>(); } + return count; } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> ui32 TReadSessionActor<UseMigrationProtocol>::NormalizeMaxReadSize(ui32 sourceValue) { ui32 size = Min<ui32>(sourceValue, MAX_READ_SIZE); + if (size == 0) { size = MAX_READ_SIZE; } + return size; } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& ctx) { - auto ShouldContinueReads = [this]() { + auto shouldContinueReads = [this]() { if constexpr (UseMigrationProtocol) { return !Reads.empty() && ReadsInfly < MAX_INFLY_READS; } else { @@ -1748,7 +1651,7 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& } }; - while (ShouldContinueReads() && BytesInflight_ + RequestedBytes < MAX_INFLY_BYTES) { + while (shouldContinueReads() && BytesInflight_ + RequestedBytes < MAX_INFLY_BYTES) { ui32 count = MaxReadMessagesCount; ui64 size = MaxReadSize; ui32 partitionsAsked = 0; @@ -1759,127 +1662,141 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& } else { guid = CreateGuidAsString(); } + typename TFormedReadResponse<TServerMessage>::TPtr formedResponse = new TFormedReadResponse<TServerMessage>(guid, ctx.Now()); + while (!AvailablePartitions.empty()) { auto part = *AvailablePartitions.begin(); AvailablePartitions.erase(AvailablePartitions.begin()); auto it = Partitions.find(part.AssignId); - if (it == Partitions.end() || it->second.Releasing) { //this is already released partition + if (it == Partitions.end() || it->second.Releasing) { // this is already released partition continue; } - //add this partition to reading - ++partitionsAsked; + + ++partitionsAsked; // add this partition to reading const ui32 ccount = Min<ui32>(part.MsgLag * LAG_GROW_MULTIPLIER, count); count -= ccount; + ui64 csize = (ui64)Min<double>(part.SizeLag * LAG_GROW_MULTIPLIER, size); if constexpr (!UseMigrationProtocol) { csize = Min<i64>(csize, ReadSizeBudget); } + size -= csize; Y_VERIFY(csize < Max<i32>()); auto jt = ReadFromTimestamp.find(it->second.Topic->GetInternalName()); if (jt == ReadFromTimestamp.end()) { - LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Error searching for topic: " << it->second.Topic->GetInternalName() - << " (" << it->second.Topic->GetPrintableString() << ")"); - for (const auto& [k, v] : ReadFromTimestamp) { - const auto& kk = k; - LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Have topic: " << kk); + LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " error searching for topic" + << ": internalName# " << it->second.Topic->GetInternalName() + << ", prettyName# " << it->second.Topic->GetPrintableString()); + + for (const auto& kv : ReadFromTimestamp) { + LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " have topic" + << ": topic# " << kv.first); } - CloseSession(TStringBuilder() << "Internal error", PersQueue::ErrorCode::ERROR, ctx); - return; + + return CloseSession(PersQueue::ErrorCode::ERROR, "internal error", ctx); } - ui64 readTimestampMs = Max(ReadTimestampMs, jt->second); - auto lags_it = MaxLagByTopic.find(it->second.Topic->GetInternalName()); - Y_VERIFY(lags_it != MaxLagByTopic.end()); - ui32 maxLag = lags_it->second; + ui64 readTimestampMs = Max(ReadTimestampMs, jt->second); - TAutoPtr<TEvPQProxy::TEvRead> read = new TEvPQProxy::TEvRead(guid, ccount, csize, maxLag, readTimestampMs); + auto lagsIt = MaxLagByTopic.find(it->second.Topic->GetInternalName()); + Y_VERIFY(lagsIt != MaxLagByTopic.end()); + const ui32 maxLag = lagsIt->second; - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX - << " performing read request with guid " << read->Guid - << " from " << it->second.Partition << " count " << ccount << " size " << csize - << " partitionsAsked " << partitionsAsked << " maxTimeLag " << maxLag << "ms"); + auto ev = MakeHolder<TEvPQProxy::TEvRead>(guid, ccount, csize, maxLag, readTimestampMs); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " performing read request" + << ": guid# " << ev->Guid + << ", from# " << it->second.Partition + << ", count# " << ccount + << ", size# " << csize + << ", partitionsAsked# " << partitionsAsked + << ", maxTimeLag# " << maxLag << "ms"); Y_VERIFY(!it->second.Reading); it->second.Reading = true; + formedResponse->PartitionsTookPartInRead.insert(it->second.Actor); - auto pp = it->second.Partition; - pp.AssignId = 0; - PartitionToControlMessages[pp].Infly++; - bool res = formedResponse->PartitionsTookPartInControlMessages.insert(pp).second; + auto id = it->second.Partition; + id.AssignId = 0; + PartitionToControlMessages[id].Infly++; + + bool res = formedResponse->PartitionsTookPartInControlMessages.insert(id).second; Y_VERIFY(res); RequestedBytes += csize; formedResponse->RequestedBytes += csize; ReadSizeBudget -= csize; - ctx.Send(it->second.Actor, read.Release()); - const auto insertResult = PartitionToReadResponse.insert(std::make_pair(it->second.Actor, formedResponse)); - Y_VERIFY(insertResult.second); + ctx.Send(it->second.Actor, ev.Release()); + res = PartitionToReadResponse.emplace(it->second.Actor, formedResponse).second; + Y_VERIFY(res); - // TODO (ildar-khisam@): Gather data from all partitions; - // For now send messages only from single partition + // TODO (ildar-khisam@): Gather data from all partitions. + // For now send messages only from single partition. if constexpr (!UseMigrationProtocol) { break; } - if (count == 0 || size == 0) + if (count == 0 || size == 0) { break; + } } - if (partitionsAsked == 0) + if (partitionsAsked == 0) { break; + } + ReadsTotal.Inc(); formedResponse->RequestsInfly = partitionsAsked; - ReadsInfly++; i64 diff = formedResponse->Response.ByteSize(); BytesInflight_ += diff; formedResponse->ByteSize = diff; + if (BytesInflight) { (*BytesInflight) += diff; } + if constexpr (UseMigrationProtocol) { Reads.pop_front(); } } } - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionReady::TPtr& ev, const TActorContext& ctx) { - - if (!ActualPartitionActor(ev->Sender)) + if (!ActualPartitionActors.contains(ev->Sender)) { return; + } - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << ev->Get()->Partition - << " ready for read with readOffset " - << ev->Get()->ReadOffset << " endOffset " << ev->Get()->EndOffset << " WTime " - << ev->Get()->WTime << " sizeLag " << ev->Get()->SizeLag); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " partition ready for read" + << ": partition# " << ev->Get()->Partition + << ", readOffset# " << ev->Get()->ReadOffset + << ", endOffset# " << ev->Get()->EndOffset + << ", WTime# " << ev->Get()->WTime + << ", sizeLag# " << ev->Get()->SizeLag); - const auto it = PartitionToReadResponse.find(ev->Sender); // check whether this partition is taking part in read response + auto it = PartitionToReadResponse.find(ev->Sender); // check whether this partition is taking part in read response auto& container = it != PartitionToReadResponse.end() ? it->second->PartitionsBecameAvailable : AvailablePartitions; - auto res = container.insert(TPartitionInfo{ev->Get()->Partition.AssignId, ev->Get()->WTime, ev->Get()->SizeLag, - ev->Get()->EndOffset - ev->Get()->ReadOffset}); - Y_VERIFY(res.second); - ProcessReads(ctx); -} + bool res = container.emplace( + ev->Get()->Partition.AssignId, + ev->Get()->WTime, + ev->Get()->SizeLag, + ev->Get()->EndOffset - ev->Get()->ReadOffset).second; + Y_VERIFY(res); -template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const TActorContext& ctx) { - CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx); + ProcessReads(ctx); } - -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag); OnWakeup(tag); @@ -1892,7 +1809,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e return RecheckACL(ctx); case EWakeupTag::RlAllowed: - ProcessAnswer(ctx, PendingQuota); + ProcessAnswer(PendingQuota, ctx); if (!WaitingQuota.empty()) { PendingQuota = WaitingQuota.front(); WaitingQuota.pop_front(); @@ -1908,27 +1825,34 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e if (PendingQuota) { Y_VERIFY(MaybeRequestQuota(PendingQuota->RequiredQuota, EWakeupTag::RlAllowed, ctx)); } else { - return CloseSession("Throughput limit exceeded", PersQueue::ErrorCode::OVERLOAD, ctx); + return CloseSession(PersQueue::ErrorCode::OVERLOAD, "throughput limit exceeded", ctx); } break; } } -template<bool UseMigrationProtocol> +template <bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) { ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); - if (Token && !AuthInitActor && (ForceACLCheck || (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()) && RequestNotChecked))) { + + const auto timeout = TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()); + const bool authTimedOut = (ctx.Now() - LastACLCheckTimestamp) > timeout; + + if (Token && !AuthInitActor && (ForceACLCheck || (authTimedOut && RequestNotChecked))) { ForceACLCheck = false; RequestNotChecked = false; - Y_VERIFY(!AuthInitActor); - LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " checking auth because of timeout"); - AuthInitActor = ctx.Register(new TReadInitAndAuthActor( - ctx, ctx.SelfID, ClientId, Cookie, Session, SchemeCache, NewSchemeCache, Counters, Token, TopicsList, - TopicsHandler.GetLocalCluster() - )); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " checking auth because of timeout"); + RunAuthActor(ctx); } } -} // namespace NGRpcProxy::V1 -} // namespace NKikimr +template <bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::RunAuthActor(const TActorContext& ctx) { + Y_VERIFY(!AuthInitActor); + AuthInitActor = ctx.Register(new TReadInitAndAuthActor( + ctx, ctx.SelfID, ClientId, Cookie, Session, SchemeCache, NewSchemeCache, Counters, Token, TopicsList, + TopicsHandler.GetLocalCluster())); +} + +} |