summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <[email protected]>2022-09-14 13:00:41 +0300
committerilnaz <[email protected]>2022-09-14 13:00:41 +0300
commitd6e8190159c9b88668658afdd7c796133f8f69ba (patch)
tree818f561cfa415c0938253d559e342e2fd8748d8c
parenta1b046d8ad132a2052989ac30585ab2c5a0e7f6f (diff)
Read session refactoring
-rw-r--r--ydb/services/lib/actors/type_definitions.h80
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.h313
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp1580
3 files changed, 965 insertions, 1008 deletions
diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h
index 89e0168a71e..fbd7622b852 100644
--- a/ydb/services/lib/actors/type_definitions.h
+++ b/ydb/services/lib/actors/type_definitions.h
@@ -1,43 +1,53 @@
#pragma once
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
+
#include <library/cpp/actors/core/actor.h>
-#include <library/cpp/actors/core/event_local.h>
+#include <util/generic/hash.h>
+#include <util/generic/map.h>
+#include <util/generic/maybe.h>
+#include <util/generic/vector.h>
namespace NKikimr::NGRpcProxy {
- struct TTopicHolder {
- ui64 TabletID;
- TActorId PipeClient;
- bool ACLRequestInfly;
- TString CloudId;
- TString DbId;
- TString FolderId;
- NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
- NPersQueue::TDiscoveryConverterPtr DiscoveryConverter;
- NPersQueue::TTopicConverterPtr FullConverter;
- TMaybe<TString> CdcStreamPath;
-
- TVector<ui32> Groups;
- TMap<ui64, ui64> Partitions;
-
- TTopicHolder()
- : TabletID(0)
- , PipeClient()
- , ACLRequestInfly(false)
- {}
- };
-
- struct TTopicInitInfo {
- NPersQueue::TTopicConverterPtr TopicNameConverter;
- ui64 TabletID;
- TString CloudId;
- TString DbId;
- TString FolderId;
- NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
- };
-
- using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>;
-
-} // namespace NKikimr::NGRpcProxy
+struct TTopicInitInfo {
+ NPersQueue::TTopicConverterPtr TopicNameConverter;
+ ui64 TabletID;
+ TString CloudId;
+ TString DbId;
+ TString FolderId;
+ NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
+};
+
+using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>;
+
+struct TTopicHolder {
+ ui64 TabletID = 0;
+ TActorId PipeClient;
+ bool ACLRequestInfly = false;
+ TString CloudId;
+ TString DbId;
+ TString FolderId;
+ NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
+ NPersQueue::TDiscoveryConverterPtr DiscoveryConverter;
+ NPersQueue::TTopicConverterPtr FullConverter;
+ TMaybe<TString> CdcStreamPath;
+
+ TVector<ui32> Groups;
+ TMap<ui64, ui64> Partitions;
+
+ inline static TTopicHolder FromTopicInfo(const TTopicInitInfo& info) {
+ return TTopicHolder{
+ .TabletID = info.TabletID,
+ .ACLRequestInfly = false,
+ .CloudId = info.CloudId,
+ .DbId = info.DbId,
+ .FolderId = info.FolderId,
+ .MeteringMode = info.MeteringMode,
+ .FullConverter = info.TopicNameConverter,
+ };
+ }
+};
+
+} // namespace NKikimr::NGRpcProxy
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h
index d9e048394de..857a83d6c7e 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.h
@@ -4,20 +4,19 @@
#include "partition_actor.h"
#include "persqueue_utils.h"
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>
-
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/services/lib/actors/pq_rl_helpers.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>
+
#include <util/generic/guid.h>
#include <util/system/compiler.h>
#include <type_traits>
-
namespace NKikimr::NGRpcProxy::V1 {
inline TActorId GetPQReadServiceActorID() {
@@ -25,8 +24,9 @@ inline TActorId GetPQReadServiceActorID() {
}
struct TPartitionActorInfo {
- TActorId Actor;
+ const TActorId Actor;
const TPartitionId Partition;
+ NPersQueue::TTopicConverterPtr Topic;
std::deque<ui64> Commits;
bool Reading;
bool Releasing;
@@ -38,17 +38,18 @@ struct TPartitionActorInfo {
ui64 ReadIdCommitted;
TSet<ui64> NextCommits;
TDisjointIntervalTree<ui64> NextRanges;
-
ui64 Offset;
TInstant AssignTimestamp;
- NPersQueue::TTopicConverterPtr Topic;
-
- TPartitionActorInfo(const TActorId& actor, const TPartitionId& partition,
- const NPersQueue::TTopicConverterPtr& topic, const TActorContext& ctx)
+ explicit TPartitionActorInfo(
+ const TActorId& actor,
+ const TPartitionId& partition,
+ const NPersQueue::TTopicConverterPtr& topic,
+ const TInstant& timestamp)
: Actor(actor)
, Partition(partition)
+ , Topic(topic)
, Reading(false)
, Releasing(false)
, Released(false)
@@ -57,11 +58,9 @@ struct TPartitionActorInfo {
, ReadIdToResponse(1)
, ReadIdCommitted(0)
, Offset(0)
- , AssignTimestamp(ctx.Now())
- , Topic(topic)
- { }
-
- void MakeCommit(const TActorContext& ctx);
+ , AssignTimestamp(timestamp)
+ {
+ }
};
struct TPartitionInfo {
@@ -69,6 +68,15 @@ struct TPartitionInfo {
ui64 WTime;
ui64 SizeLag;
ui64 MsgLag;
+
+ explicit TPartitionInfo(ui64 assignId, ui64 wTime, ui64 sizeLag, ui64 msgLag)
+ : AssignId(assignId)
+ , WTime(wTime)
+ , SizeLag(sizeLag)
+ , MsgLag(msgLag)
+ {
+ }
+
bool operator < (const TPartitionInfo& rhs) const {
return std::tie(WTime, AssignId) < std::tie(rhs.WTime, rhs.AssignId);
}
@@ -94,15 +102,15 @@ struct TFormedReadResponse: public TSimpleRefCount<TFormedReadResponse<TServerMe
i64 ByteSizeBeforeFiltering = 0;
ui64 RequiredQuota = 0;
- //returns byteSize diff
+ // returns byteSize diff
i64 ApplyResponse(TServerMessage&& resp);
THashSet<TActorId> PartitionsTookPartInRead;
TSet<TPartitionId> PartitionsTookPartInControlMessages;
- TSet<TPartitionInfo> PartitionsBecameAvailable; // Partitions that became available during this read request execution.
-
- // These partitions are bringed back to AvailablePartitions after reply to this read request.
+ // Partitions that became available during this read request execution.
+ // These partitions are bringed back to AvailablePartitions after reply to this read request.
+ TSet<TPartitionInfo> PartitionsBecameAvailable;
const TString Guid;
TInstant Start;
@@ -110,23 +118,37 @@ struct TFormedReadResponse: public TSimpleRefCount<TFormedReadResponse<TServerMe
TDuration WaitQuotaTime;
};
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol> // Migration protocol is "pqv1"
class TReadSessionActor
: public TActorBootstrapped<TReadSessionActor<UseMigrationProtocol>>
, private TRlHelpers
{
- using TClientMessage = typename std::conditional_t<UseMigrationProtocol, PersQueue::V1::MigrationStreamingReadClientMessage, Topic::StreamReadMessage::FromClient>;
- using TServerMessage = typename std::conditional_t<UseMigrationProtocol, PersQueue::V1::MigrationStreamingReadServerMessage, Topic::StreamReadMessage::FromServer>;
+ using TClientMessage = typename std::conditional_t<UseMigrationProtocol,
+ PersQueue::V1::MigrationStreamingReadClientMessage,
+ Topic::StreamReadMessage::FromClient>;
+
+ using TServerMessage = typename std::conditional_t<UseMigrationProtocol,
+ PersQueue::V1::MigrationStreamingReadServerMessage,
+ Topic::StreamReadMessage::FromServer>;
+
+ using TEvReadInit = typename std::conditional_t<UseMigrationProtocol,
+ TEvPQProxy::TEvMigrationReadInit,
+ TEvPQProxy::TEvReadInit>;
+
+ using TEvReadResponse = typename std::conditional_t<UseMigrationProtocol,
+ TEvPQProxy::TEvMigrationReadResponse,
+ TEvPQProxy::TEvReadResponse>;
+
+ using TEvStreamReadRequest = typename std::conditional_t<UseMigrationProtocol,
+ NGRpcService::TEvStreamPQMigrationReadRequest,
+ NGRpcService::TEvStreamTopicReadRequest>;
using IContext = NGRpcServer::IGRpcStreamingContext<TClientMessage, TServerMessage>;
- using TEvReadInit = typename std::conditional_t<UseMigrationProtocol, TEvPQProxy::TEvMigrationReadInit, TEvPQProxy::TEvReadInit>;
- using TEvReadResponse = typename std::conditional_t<UseMigrationProtocol, TEvPQProxy::TEvMigrationReadResponse, TEvPQProxy::TEvReadResponse>;
- using TEvStreamPQReadRequest = typename std::conditional_t<UseMigrationProtocol, NKikimr::NGRpcService::TEvStreamPQMigrationReadRequest, NKikimr::NGRpcService::TEvStreamTopicReadRequest>;
+ using TPartitionsMap = THashMap<ui64, TPartitionActorInfo>;
private:
- //11 tries = 10,23 seconds, then each try for 5 seconds , so 21 retries will take near 1 min
+ // 11 tries = 10,23 seconds, then each try for 5 seconds , so 21 retries will take near 1 min
static constexpr NTabletPipe::TClientRetryPolicy RetryPolicyForPipes = {
.RetryLimitCount = 21,
.MinRetryTime = TDuration::MilliSeconds(10),
@@ -138,145 +160,143 @@ private:
static constexpr ui64 MAX_INFLY_BYTES = 25_MB;
static constexpr ui32 MAX_INFLY_READS = 10;
- static constexpr ui64 MAX_READ_SIZE = 100 << 20; //100mb;
+ static constexpr ui64 MAX_READ_SIZE = 100_MB;
static constexpr ui64 READ_BLOCK_SIZE = 8_KB; // metering
- static constexpr double LAG_GROW_MULTIPLIER = 1.2; //assume that 20% more data arrived to partitions
+ static constexpr double LAG_GROW_MULTIPLIER = 1.2; // assume that 20% more data arrived to partitions
public:
- TReadSessionActor(TEvStreamPQReadRequest* request, const ui64 cookie,
- const NActors::TActorId& schemeCache, const NActors::TActorId& newSchemeCache,
- TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC,
- const NPersQueue::TTopicsListController& topicsHandler);
- ~TReadSessionActor();
-
- void Bootstrap(const NActors::TActorContext& ctx);
+ TReadSessionActor(TEvStreamReadRequest* request, const ui64 cookie,
+ const TActorId& schemeCache, const TActorId& newSchemeCache,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TMaybe<TString> clientDC,
+ const NPersQueue::TTopicsListController& topicsHandler);
- void Die(const NActors::TActorContext& ctx) override;
+ void Bootstrap(const TActorContext& ctx);
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::FRONT_PQ_READ; }
+ void Die(const TActorContext& ctx) override;
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
+ return NKikimrServices::TActivity::FRONT_PQ_READ;
+ }
private:
STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
- HFunc(TEvents::TEvWakeup, Handle);
-
+ // grpc events
HFunc(IContext::TEvReadFinished, Handle);
HFunc(IContext::TEvWriteFinished, Handle);
- CFunc(IContext::TEvNotifiedWhenDone::EventType, HandleDone);
+ HFunc(IContext::TEvNotifiedWhenDone, Handle)
HFunc(NGRpcService::TGRpcRequestProxy::TEvRefreshTokenResponse, Handle);
+ // proxy events
HFunc(TEvPQProxy::TEvAuthResultOk, Handle); // form auth actor
-
- HFunc(TEvPQProxy::TEvDieCommand, HandlePoison)
-
- HFunc(/* type alias */ TEvReadInit, Handle) //from gRPC
- HFunc(TEvPQProxy::TEvReadSessionStatus, Handle) // from read sessions info builder proxy
- HFunc(TEvPQProxy::TEvRead, Handle) //from gRPC
- HFunc(TEvPQProxy::TEvDone, Handle) //from gRPC
- HFunc(TEvPQProxy::TEvCloseSession, Handle) //from partitionActor
- HFunc(TEvPQProxy::TEvPartitionReady, Handle) //from partitionActor
- HFunc(TEvPQProxy::TEvPartitionReleased, Handle) //from partitionActor
-
- HFunc(/* type alias */ TEvReadResponse, Handle) //from partitionActor
- HFunc(TEvPQProxy::TEvCommitCookie, Handle) //from gRPC
- HFunc(TEvPQProxy::TEvCommitRange, Handle) //from gRPC
- HFunc(TEvPQProxy::TEvStartRead, Handle) //from gRPC
- HFunc(TEvPQProxy::TEvReleased, Handle) //from gRPC
- HFunc(TEvPQProxy::TEvGetStatus, Handle) //from gRPC
- HFunc(TEvPQProxy::TEvAuth, Handle) //from gRPC
-
- HFunc(TEvPQProxy::TEvCommitDone, Handle) //from PartitionActor
- HFunc(TEvPQProxy::TEvPartitionStatus, Handle) //from partitionActor
-
- HFunc(TEvPersQueue::TEvLockPartition, Handle) //from Balancer
- HFunc(TEvPersQueue::TEvReleasePartition, Handle) //from Balancer
- HFunc(TEvPersQueue::TEvError, Handle) //from Balancer
-
- HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ HFunc(/* type alias */ TEvReadInit, Handle); // from gRPC
+ HFunc(TEvPQProxy::TEvReadSessionStatus, Handle); // from read sessions info builder proxy
+ HFunc(TEvPQProxy::TEvRead, Handle); // from gRPC
+ HFunc(/* type alias */ TEvReadResponse, Handle); // from partitionActor
+ HFunc(TEvPQProxy::TEvDone, Handle); // from gRPC
+ HFunc(TEvPQProxy::TEvCloseSession, Handle); // from partitionActor
+ HFunc(TEvPQProxy::TEvDieCommand, Handle);
+ HFunc(TEvPQProxy::TEvPartitionReady, Handle); // from partitionActor
+ HFunc(TEvPQProxy::TEvPartitionReleased, Handle); // from partitionActor
+ HFunc(TEvPQProxy::TEvCommitCookie, Handle); // from gRPC
+ HFunc(TEvPQProxy::TEvCommitRange, Handle); // from gRPC
+ HFunc(TEvPQProxy::TEvStartRead, Handle); // from gRPC
+ HFunc(TEvPQProxy::TEvReleased, Handle); // from gRPC
+ HFunc(TEvPQProxy::TEvGetStatus, Handle); // from gRPC
+ HFunc(TEvPQProxy::TEvAuth, Handle); // from gRPC
+ HFunc(TEvPQProxy::TEvCommitDone, Handle); // from PartitionActor
+ HFunc(TEvPQProxy::TEvPartitionStatus, Handle); // from partitionActor
+
+ // Balancer events
+ HFunc(TEvPersQueue::TEvLockPartition, Handle);
+ HFunc(TEvPersQueue::TEvReleasePartition, Handle);
+ HFunc(TEvPersQueue::TEvError, Handle);
+
+ // pipe events
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+
+ // system events
+ HFunc(TEvents::TEvWakeup, Handle);
default:
break;
- };
+ }
}
- ui64 PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse); // returns estimated response's size
- bool WriteResponse(TServerMessage&& response, bool finish = false);
+ bool ReadFromStreamOrDie(const TActorContext& ctx);
+ bool WriteToStreamOrDie(const TActorContext& ctx, TServerMessage&& response, bool finish = false);
+ bool SendControlMessage(TPartitionId id, TServerMessage&& message, const TActorContext& ctx);
+ // grpc events
void Handle(typename IContext::TEvReadFinished::TPtr& ev, const TActorContext &ctx);
void Handle(typename IContext::TEvWriteFinished::TPtr& ev, const TActorContext &ctx);
- void HandleDone(const TActorContext &ctx);
-
+ void Handle(typename IContext::TEvNotifiedWhenDone::TPtr& ev, const TActorContext &ctx);
void Handle(NGRpcService::TGRpcRequestProxy::TEvRefreshTokenResponse::TPtr& ev, const TActorContext &ctx);
-
- void Handle(typename TEvReadInit::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvReadSessionStatus::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvRead::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(typename TEvReadResponse::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvDone::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvPartitionReady::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvPartitionReleased::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvCommitCookie::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvCommitRange::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvStartRead::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvReleased::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPQProxy::TEvAuth::TPtr& ev, const NActors::TActorContext& ctx);
- void ProcessAuth(const TString& auth, const TActorContext& ctx);
- void Handle(TEvPQProxy::TEvCommitDone::TPtr& ev, const NActors::TActorContext& ctx);
-
- void Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const NActors::TActorContext& ctx);
-
- void Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvPersQueue::TEvError::TPtr& ev, const NActors::TActorContext& ctx);
-
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx);
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext& ctx);
- [[nodiscard]] bool ProcessBalancerDead(const ui64 tabletId, const NActors::TActorContext& ctx); // returns false if actor died
-
- void HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const NActors::TActorContext& ctx);
+ // proxy events
+ void Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx);
+ void Handle(typename TEvReadInit::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvReadSessionStatus::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext& ctx);
+ void Handle(typename TEvReadResponse::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvDone::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvDieCommand::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvPartitionReady::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvPartitionReleased::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvCommitCookie::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvCommitRange::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvStartRead::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvReleased::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvAuth::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvCommitDone::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx);
+
+ // Balancer events
+ void Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPersQueue::TEvError::TPtr& ev, const TActorContext& ctx);
+
+ // pipe events
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);
+
+ // system events
void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const NActors::TActorContext& ctx);
- void RecheckACL(const TActorContext& ctx);
+ TActorId CreatePipeClient(ui64 tabletId, const TActorContext& ctx);
+ void ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx);
+
+ void RunAuthActor(const TActorContext& ctx);
+ void RecheckACL(const TActorContext& ctx);
void InitSession(const TActorContext& ctx);
- void CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode,
- const NActors::TActorContext& ctx);
+ void RegisterSession(const TString& topic, const TActorId& pipe, const TVector<ui32>& groups, const TActorContext& ctx);
+ void CloseSession(PersQueue::ErrorCode::ErrorCode code, const TString& reason, const TActorContext& ctx);
void SetupCounters();
void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic);
- void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, const TString& cloudId, const TString& dbId,
- const TString& folderId);
-
- void ProcessReads(const NActors::TActorContext& ctx); // returns false if actor died
- void ProcessAnswer(const NActors::TActorContext& ctx, typename TFormedReadResponse<TServerMessage>::TPtr formedResponse); // returns false if actor died
-
- void RegisterSessions(const NActors::TActorContext& ctx);
- void RegisterSession(const TActorId& pipe, const TString& topic, const TVector<ui32>& groups, const TActorContext& ctx);
-
- void DropPartition(typename THashMap<ui64, TPartitionActorInfo>::iterator it, const TActorContext& ctx);
+ void SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic,
+ const TString& cloudId, const TString& dbId, const TString& folderId);
- bool ActualPartitionActor(const TActorId& part);
- void ReleasePartition(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it,
- bool couldBeReads, const TActorContext& ctx); // returns false if actor died
+ void ProcessReads(const TActorContext& ctx);
+ ui64 PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse);
+ void ProcessAnswer(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse, const TActorContext& ctx);
- void SendReleaseSignalToClient(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it, bool kill, const TActorContext& ctx);
-
- void InformBalancerAboutRelease(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it, const TActorContext& ctx);
+ void DropPartition(typename TPartitionsMap::iterator it, const TActorContext& ctx);
+ void ReleasePartition(typename TPartitionsMap::iterator it, bool couldBeReads, const TActorContext& ctx);
+ void SendReleaseSignal(typename TPartitionsMap::iterator it, bool kill, const TActorContext& ctx);
+ void InformBalancerAboutRelease(typename TPartitionsMap::iterator it, const TActorContext& ctx);
static ui32 NormalizeMaxReadMessagesCount(ui32 sourceValue);
static ui32 NormalizeMaxReadSize(ui32 sourceValue);
private:
- std::unique_ptr</* type alias */ TEvStreamPQReadRequest> Request;
-
+ std::unique_ptr</* type alias */ TEvStreamReadRequest> Request;
const TString ClientDC;
-
const TInstant StartTimestamp;
TActorId SchemeCache;
@@ -293,7 +313,7 @@ private:
bool CommitsDisabled;
bool InitDone;
- bool RangesMode = false;
+ bool RangesMode;
ui32 MaxReadMessagesCount;
ui32 MaxReadSize;
@@ -310,7 +330,7 @@ private:
THashSet<TActorId> ActualPartitionActors;
THashMap<ui64, std::pair<ui32, ui64>> BalancerGeneration;
ui64 NextAssignId;
- THashMap<ui64, TPartitionActorInfo> Partitions; //assignId -> info
+ TPartitionsMap Partitions; // assignId -> info
THashMap<TString, TTopicHolder> Topics; // topic -> info
THashMap<TString, NPersQueue::TTopicConverterPtr> FullPathToConverter; // PrimaryFullPath -> Converter, for balancer replies matching
@@ -324,10 +344,15 @@ private:
TSet<TPartitionInfo> AvailablePartitions;
- THashMap<TActorId, typename TFormedReadResponse<TServerMessage>::TPtr> PartitionToReadResponse; // Partition actor -> TFormedReadResponse answer that has this partition.
- // PartitionsTookPartInRead in formed read response contain this actor id.
- typename TFormedReadResponse<TServerMessage>::TPtr PendingQuota; // response that currenly pending quota
- std::deque<typename TFormedReadResponse<TServerMessage>::TPtr> WaitingQuota; // responses that will be quoted next
+ // Partition actor -> TFormedReadResponse answer that has this partition.
+ // PartitionsTookPartInRead in formed read response contain this actor id.
+ THashMap<TActorId, typename TFormedReadResponse<TServerMessage>::TPtr> PartitionToReadResponse;
+
+ // Response that currenly pending quota
+ typename TFormedReadResponse<TServerMessage>::TPtr PendingQuota;
+
+ // Responses that will be quoted next
+ std::deque<typename TFormedReadResponse<TServerMessage>::TPtr> WaitingQuota;
struct TControlMessages {
TVector<TServerMessage> ControlMessages;
@@ -336,7 +361,6 @@ private:
TMap<TPartitionId, TControlMessages> PartitionToControlMessages;
-
std::deque<THolder<TEvPQProxy::TEvRead>> Reads;
ui64 Cookie;
@@ -346,7 +370,7 @@ private:
ui32 Partitions;
};
- TMap<ui64, TCommitInfo> Commits; //readid->TCommitInfo
+ TMap<ui64, TCommitInfo> Commits; // readid -> TCommitInfo
TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
@@ -361,22 +385,22 @@ private:
ui32 ReadsInfly;
std::queue<ui64> ActiveWrites;
- NKikimr::NPQ::TPercentileCounter PartsPerSession;
+ NPQ::TPercentileCounter PartsPerSession;
THashMap<TString, TTopicCounters> TopicCounters;
THashMap<TString, ui32> NumPartitionsFromTopic;
TVector<NPersQueue::TPQLabelsInfo> Aggr;
- NKikimr::NPQ::TMultiCounter SLITotal;
- NKikimr::NPQ::TMultiCounter SLIErrors;
+ NPQ::TMultiCounter SLITotal;
+ NPQ::TMultiCounter SLIErrors;
TInstant StartTime;
- NKikimr::NPQ::TPercentileCounter InitLatency;
- NKikimr::NPQ::TPercentileCounter ReadLatency;
- NKikimr::NPQ::TPercentileCounter ReadLatencyFromDisk;
- NKikimr::NPQ::TPercentileCounter CommitLatency;
- NKikimr::NPQ::TMultiCounter SLIBigLatency;
- NKikimr::NPQ::TMultiCounter SLIBigReadLatency;
- NKikimr::NPQ::TMultiCounter ReadsTotal;
+ NPQ::TPercentileCounter InitLatency;
+ NPQ::TPercentileCounter ReadLatency;
+ NPQ::TPercentileCounter ReadLatencyFromDisk;
+ NPQ::TPercentileCounter CommitLatency;
+ NPQ::TMultiCounter SLIBigLatency;
+ NPQ::TMultiCounter SLIBigReadLatency;
+ NPQ::TMultiCounter ReadsTotal;
NPersQueue::TTopicsListController TopicsHandler;
NPersQueue::TTopicsToConverter TopicsList;
@@ -384,8 +408,7 @@ private:
}
-/////////////////////////////////////////
// Implementation
#define READ_SESSION_ACTOR_IMPL
-#include "read_session_actor.ipp"
+ #include "read_session_actor.ipp"
#undef READ_SESSION_ACTOR_IMPL
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 4a5da33c3a3..5b114cf8829 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -1,9 +1,9 @@
#ifndef READ_SESSION_ACTOR_IMPL
-#error "Do not include this file directly"
+ #error "Do not include this file directly"
#endif
-#include "read_init_auth_actor.h"
#include "helpers.h"
+#include "read_init_auth_actor.h"
#include <ydb/library/persqueue/topic_parser/counters.h>
@@ -13,41 +13,33 @@
#include <util/string/join.h>
#include <util/string/strip.h>
-#include <util/charset/utf8.h>
#include <utility>
-using namespace NActors;
-using namespace NKikimrClient;
-
-namespace NKikimr {
+namespace NKikimr::NGRpcProxy::V1 {
+using namespace NKikimrClient;
using namespace NMsgBusProxy;
-
-namespace NGRpcProxy::V1 {
-
using namespace PersQueue::V1;
-//TODO: add here tracking of bytes in/out
+// TODO: add here tracking of bytes in/out
template <bool UseMigrationProtocol>
-TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(TEvStreamPQReadRequest* request, const ui64 cookie,
- const TActorId& schemeCache, const TActorId& newSchemeCache,
- TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
- const TMaybe<TString> clientDC,
- const NPersQueue::TTopicsListController& topicsHandler)
+TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(
+ TEvStreamReadRequest* request, const ui64 cookie,
+ const TActorId& schemeCache, const TActorId& newSchemeCache,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ const TMaybe<TString> clientDC,
+ const NPersQueue::TTopicsListController& topicsHandler)
: TRlHelpers(request, READ_BLOCK_SIZE, TDuration::Minutes(1))
, Request(request)
- , ClientDC(clientDC ? *clientDC : "other")
+ , ClientDC(clientDC.GetOrElse("other"))
, StartTimestamp(TInstant::Now())
, SchemeCache(schemeCache)
, NewSchemeCache(newSchemeCache)
- , AuthInitActor()
- , ClientId()
- , ClientPath()
- , Session()
, CommitsDisabled(false)
, InitDone(false)
+ , RangesMode(false)
, MaxReadMessagesCount(0)
, MaxReadSize(0)
, MaxTimeLagMs(0)
@@ -63,57 +55,60 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(TEvStreamPQReadReques
, BytesInflight_(0)
, RequestedBytes(0)
, ReadsInfly(0)
- , TopicsHandler(topicsHandler) {
+ , TopicsHandler(topicsHandler)
+{
Y_ASSERT(Request);
}
-template<bool UseMigrationProtocol>
-TReadSessionActor<UseMigrationProtocol>::~TReadSessionActor() = default;
-
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Bootstrap(const TActorContext& ctx) {
- Y_VERIFY(Request);
if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
++(*GetServiceCounters(Counters, "pqproxy|readSession")
->GetNamedCounter("sensor", "SessionsCreatedTotal", true));
}
Request->GetStreamCtx()->Attach(ctx.SelfID);
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
+ if (!ReadFromStreamOrDie(ctx)) {
return;
}
- StartTime = ctx.Now();
- TReadSessionActor<UseMigrationProtocol>::Become(&TReadSessionActor<UseMigrationProtocol>::TThis::StateFunc);
+ StartTime = ctx.Now();
+ this->Become(&TReadSessionActor<UseMigrationProtocol>::TThis::StateFunc);
}
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::HandleDone(const TActorContext& ctx) {
-
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvNotifiedWhenDone::TPtr&, const TActorContext& ctx) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc closed");
Die(ctx);
}
+template <bool UseMigrationProtocol>
+bool TReadSessionActor<UseMigrationProtocol>::ReadFromStreamOrDie(const TActorContext& ctx) {
+ if (!Request->GetStreamCtx()->Read()) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
+ Die(ctx);
+ return false;
+ }
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) {
+ return true;
+}
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadFinished::TPtr& ev, const TActorContext& ctx) {
auto& request = ev->Get()->Record;
if constexpr (UseMigrationProtocol) {
- auto token = request.token();
+ const auto token = request.token();
request.set_token("");
- if (!token.empty()) { //TODO refreshtoken here
+ if (!token.empty()) { // TODO: refresh token here
ctx.Send(ctx.SelfID, new TEvPQProxy::TEvAuth(token));
}
}
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY,
- PQ_LOG_PREFIX << " grpc read done: success: " << ev->Get()->Success << " data: " << request);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read done"
+ << ": success# " << ev->Get()->Success
+ << ", data# " << request);
if (!ev->Get()->Success) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed");
@@ -121,7 +116,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF
return;
}
- auto GetAssignId = [](auto& request) {
+ auto getAssignId = [](auto& request) {
if constexpr (UseMigrationProtocol) {
return request.assign_id();
} else {
@@ -132,33 +127,23 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF
if constexpr (UseMigrationProtocol) {
switch (request.request_case()) {
case TClientMessage::kInitRequest: {
- ctx.Send(ctx.SelfID, new TEvReadInit(request, Request->GetStreamCtx()->GetPeerName()));
- break;
+ return (void)ctx.Send(ctx.SelfID, new TEvReadInit(request, Request->GetStreamCtx()->GetPeerName()));
}
- case TClientMessage::kStatus: {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(GetAssignId(request.status())));
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
- return;
- }
- break;
+ case TClientMessage::kStatus: {
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(getAssignId(request.status())));
+ return (void)ReadFromStreamOrDie(ctx);
}
+
case TClientMessage::kRead: {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead()); // Proto read message have no parameters
- break;
+ return (void)ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead());
}
- case TClientMessage::kReleased: {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(GetAssignId(request.released())));
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
- return;
- }
- break;
+ case TClientMessage::kReleased: {
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(getAssignId(request.released())));
+ return (void)ReadFromStreamOrDie(ctx);
}
+
case TClientMessage::kStartRead: {
const auto& req = request.start_read();
@@ -166,188 +151,186 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvReadF
const ui64 commitOffset = req.commit_offset();
const bool verifyReadOffset = req.verify_read_offset();
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(GetAssignId(request.start_read()), readOffset, commitOffset, verifyReadOffset));
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
- return;
- }
- break;
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(getAssignId(request.start_read()), readOffset, commitOffset, verifyReadOffset));
+ return (void)ReadFromStreamOrDie(ctx);
}
+
case TClientMessage::kCommit: {
const auto& req = request.commit();
if (!req.cookies_size() && !RangesMode) {
- CloseSession(TStringBuilder() << "can't commit without cookies", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "can't commit without cookies", ctx);
}
- if (RangesMode && !req.offset_ranges_size()) {
- CloseSession(TStringBuilder() << "can't commit without offsets", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ if (RangesMode && !req.offset_ranges_size()) {
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "can't commit without offsets", ctx);
}
THashMap<ui64, TEvPQProxy::TCommitCookie> commitCookie;
THashMap<ui64, TEvPQProxy::TCommitRange> commitRange;
- for (auto& c: req.cookies()) {
+ for (const auto& c : req.cookies()) {
commitCookie[c.assign_id()].Cookies.push_back(c.partition_cookie());
}
- for (auto& c: req.offset_ranges()) {
- commitRange[c.assign_id()].Ranges.push_back(std::make_pair(c.start_offset(), c.end_offset()));
- }
- for (auto& c : commitCookie) {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitCookie(c.first, std::move(c.second)));
+ for (const auto& c : req.offset_ranges()) {
+ commitRange[c.assign_id()].Ranges.emplace_back(c.start_offset(), c.end_offset());
}
- for (auto& c : commitRange) {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitRange(c.first, std::move(c.second)));
+ for (auto& [id, cookies] : commitCookie) {
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitCookie(id, std::move(cookies)));
}
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
- return;
+ for (auto& [id, range] : commitRange) {
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitRange(id, std::move(range)));
}
- break;
+
+ return (void)ReadFromStreamOrDie(ctx);
}
default: {
- CloseSession(TStringBuilder() << "unsupported request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- break;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "unsupported request", ctx);
}
}
} else {
- switch(request.client_message_case()) {
+ switch (request.client_message_case()) {
case TClientMessage::kInitRequest: {
- ctx.Send(ctx.SelfID, new TEvReadInit(request, Request->GetStreamCtx()->GetPeerName()));
- break;
+ return (void)ctx.Send(ctx.SelfID, new TEvReadInit(request, Request->GetStreamCtx()->GetPeerName()));
}
+
case TClientMessage::kReadRequest: {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead(request.read_request().bytes_size()));
- break;
+ return (void)ctx.Send(ctx.SelfID, new TEvPQProxy::TEvRead(request.read_request().bytes_size()));
}
- case TClientMessage::kPartitionSessionStatusRequest: {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(GetAssignId(request.partition_session_status_request())));
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
- return;
- }
- break;
+ case TClientMessage::kPartitionSessionStatusRequest: {
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvGetStatus(getAssignId(request.partition_session_status_request())));
+ return (void)ReadFromStreamOrDie(ctx);
}
- case TClientMessage::kStopPartitionSessionResponse: {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(GetAssignId(request.stop_partition_session_response())));
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
- return;
- }
- break;
+ case TClientMessage::kStopPartitionSessionResponse: {
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvReleased(getAssignId(request.stop_partition_session_response())));
+ return (void)ReadFromStreamOrDie(ctx);
}
+
case TClientMessage::kStartPartitionSessionResponse: {
const auto& req = request.start_partition_session_response();
const ui64 readOffset = req.read_offset();
const ui64 commitOffset = req.commit_offset();
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(GetAssignId(req), readOffset, commitOffset, req.has_read_offset()));
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
- return;
- }
- break;
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvStartRead(getAssignId(req), readOffset, commitOffset, req.has_read_offset()));
+ return (void)ReadFromStreamOrDie(ctx);
}
+
case TClientMessage::kCommitOffsetRequest: {
const auto& req = request.commit_offset_request();
if (!RangesMode || !req.commit_offsets_size()) {
- CloseSession(TStringBuilder() << "can't commit without offsets", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "can't commit without offsets", ctx);
}
THashMap<ui64, TEvPQProxy::TCommitRange> commitRange;
- for (auto& pc: req.commit_offsets()) {
- auto id = pc.partition_session_id();
- for (auto& c: pc.offsets()) {
- commitRange[id].Ranges.push_back(std::make_pair(c.start(), c.end()));
+ for (const auto& pc : req.commit_offsets()) {
+ for (const auto& c : pc.offsets()) {
+ commitRange[pc.partition_session_id()].Ranges.emplace_back(c.start(), c.end());
}
}
- for (auto& c : commitRange) {
- ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitRange(c.first, std::move(c.second)));
+ for (auto& [id, range] : commitRange) {
+ ctx.Send(ctx.SelfID, new TEvPQProxy::TEvCommitRange(id, std::move(range)));
}
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
- return;
- }
- break;
+ return (void)ReadFromStreamOrDie(ctx);
}
+
case TClientMessage::kUpdateTokenRequest: {
- auto token = request.update_token_request().token();
- if (!token.empty()) { //TODO refreshtoken here
+ if (const auto token = request.update_token_request().token()) { // TODO: refresh token here
ctx.Send(ctx.SelfID, new TEvPQProxy::TEvAuth(token));
}
break;
}
default: {
- CloseSession(TStringBuilder() << "unsupported request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- break;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "unsupported request", ctx);
}
}
}
}
+template <bool UseMigrationProtocol>
+bool TReadSessionActor<UseMigrationProtocol>::WriteToStreamOrDie(const TActorContext& ctx, TServerMessage&& response, bool finish) {
+ const ui64 sz = response.ByteSize();
+ ActiveWrites.push(sz);
+
+ BytesInflight_ += sz;
+ if (BytesInflight) {
+ (*BytesInflight) += sz;
+ }
+
+ bool res = false;
+ if (!finish) {
+ res = Request->GetStreamCtx()->Write(std::move(response));
+ } else {
+ res = Request->GetStreamCtx()->WriteAndFinish(std::move(response), grpc::Status::OK);
+ }
+
+ if (!res) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed at start");
+ Die(ctx);
+ }
+
+ return res;
+}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(typename IContext::TEvWriteFinished::TPtr& ev, const TActorContext& ctx) {
if (!ev->Get()->Success) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
+ return Die(ctx);
}
+
Y_VERIFY(!ActiveWrites.empty());
- ui64 sz = ActiveWrites.front();
+ const auto sz = ActiveWrites.front();
ActiveWrites.pop();
+
Y_VERIFY(BytesInflight_ >= sz);
BytesInflight_ -= sz;
- if (BytesInflight) (*BytesInflight) -= sz;
+ if (BytesInflight) {
+ (*BytesInflight) -= sz;
+ }
ProcessReads(ctx);
}
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {
+ if (AuthInitActor) {
+ ctx.Send(AuthInitActor, new TEvents::TEvPoisonPill());
+ }
- ctx.Send(AuthInitActor, new TEvents::TEvPoisonPill());
-
- for (auto& p : Partitions) {
- ctx.Send(p.second.Actor, new TEvents::TEvPoisonPill());
+ for (const auto& [_, info] : Partitions) {
+ if (info.Actor) {
+ ctx.Send(info.Actor, new TEvents::TEvPoisonPill());
+ }
- if (!p.second.Released) {
- // ToDo[counters]
- auto it = TopicCounters.find(p.second.Topic->GetInternalName());
+ if (!info.Released) {
+ // TODO: counters
+ auto it = TopicCounters.find(info.Topic->GetInternalName());
Y_VERIFY(it != TopicCounters.end());
it->second.PartitionsInfly.Dec();
it->second.PartitionsReleased.Inc();
- if (p.second.Releasing)
+ if (info.Releasing) {
it->second.PartitionsToBeReleased.Dec();
+ }
}
}
- for (auto& t : Topics) {
- if (t.second.PipeClient)
- NTabletPipe::CloseClient(ctx, t.second.PipeClient);
+ for (const auto& [_, holder] : Topics) {
+ if (holder.PipeClient) {
+ NTabletPipe::CloseClient(ctx, holder.PipeClient);
+ }
}
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD");
if (BytesInflight) {
(*BytesInflight) -= BytesInflight_;
@@ -359,96 +342,114 @@ void TReadSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {
PartsPerSession.DecFor(Partitions.size(), 1);
}
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " is DEAD");
ctx.Send(GetPQReadServiceActorID(), new TEvPQProxy::TEvSessionDead(Cookie));
TActorBootstrapped<TReadSessionActor>::Die(ctx);
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvDone::TPtr&, const TActorContext& ctx) {
- CloseSession(TStringBuilder() << "Reads done signal - closing everything", PersQueue::ErrorCode::OK, ctx);
+ CloseSession(PersQueue::ErrorCode::OK, "reads done signal, closing everything", ctx);
+}
+
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) {
+ CloseSession(ev->Get()->ErrorCode, ev->Get()->Reason, ctx);
+}
+
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvDieCommand::TPtr& ev, const TActorContext& ctx) {
+ CloseSession(ev->Get()->ErrorCode, ev->Get()->Reason, ctx);
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCommitCookie::TPtr& ev, const TActorContext& ctx) {
RequestNotChecked = true;
if (CommitsDisabled) {
- CloseSession("commits in session are disabled by client option", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "commits in session are disabled by client option", ctx);
}
- const ui64& assignId = ev->Get()->AssignId;
- auto it = Partitions.find(assignId);
- if (it == Partitions.end()) //stale commit - ignore it
+
+ auto it = Partitions.find(ev->Get()->AssignId);
+ if (it == Partitions.end()) { // stale commit - ignore it
return;
+ }
- for (auto& c : ev->Get()->CommitInfo.Cookies) {
- if(RangesMode) {
- CloseSession("Commits cookies in ranges commit mode is illegal", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ for (const auto c : ev->Get()->CommitInfo.Cookies) {
+ if (RangesMode) {
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "commits cookies in ranges commit mode is prohibited", ctx);
}
+
it->second.NextCommits.insert(c);
}
ctx.Send(it->second.Actor, new TEvPQProxy::TEvCommitCookie(ev->Get()->AssignId, std::move(ev->Get()->CommitInfo)));
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCommitRange::TPtr& ev, const TActorContext& ctx) {
RequestNotChecked = true;
if (CommitsDisabled) {
- CloseSession("commits in session are disabled by client option", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "commits in session are disabled by client option", ctx);
}
- const ui64& assignId = ev->Get()->AssignId;
- auto it = Partitions.find(assignId);
- if (it == Partitions.end()) //stale commit - ignore it
+
+ auto it = Partitions.find(ev->Get()->AssignId);
+ if (it == Partitions.end()) { // stale commit - ignore it
return;
+ }
- for (auto& c : ev->Get()->CommitInfo.Ranges) {
- if(!RangesMode) {
- CloseSession("Commits ranges in cookies commit mode is illegal", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ for (const auto& [b, e] : ev->Get()->CommitInfo.Ranges) {
+ if (!RangesMode) {
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "commits ranges in cookies commit mode is prohibited", ctx);
}
- if (c.first >= c.second || it->second.NextRanges.Intersects(c.first, c.second) || c.first < it->second.Offset) {
- CloseSession(TStringBuilder() << "Offsets range [" << c.first << ", " << c.second << ") has already committed offsets, double committing is forbiden; or incorrect", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ if (b >= e || it->second.NextRanges.Intersects(b, e) || b < it->second.Offset) {
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder()
+ << "offsets range [" << b << ", " << e << ")"
+ << " has already committed offsets, double committing is forbiden or incorrect", ctx);
}
- it->second.NextRanges.InsertInterval(c.first, c.second);
+
+ it->second.NextRanges.InsertInterval(b, e);
}
ctx.Send(it->second.Actor, new TEvPQProxy::TEvCommitRange(ev->Get()->AssignId, std::move(ev->Get()->CommitInfo)));
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuth::TPtr& ev, const TActorContext& ctx) {
- ProcessAuth(ev->Get()->Auth, ctx);
+ const auto& auth = ev->Get()->Auth;
+ if (!auth.empty() && auth != Auth) {
+ Auth = auth;
+ Request->RefreshToken(auth, ctx, ctx.SelfID);
+ }
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvStartRead::TPtr& ev, const TActorContext& ctx) {
RequestNotChecked = true;
auto it = Partitions.find(ev->Get()->AssignId);
if (it == Partitions.end() || it->second.Releasing) {
- //do nothing - already released partition
- LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got NOTACTUAL StartRead from client for partition with assign id " << ev->Get()->AssignId
- << " at offset " << ev->Get()->ReadOffset);
+ // do nothing - already released partition
+ LOG_WARN_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got irrelevant StartRead from client"
+ << ": partition# " << ev->Get()->AssignId
+ << ", offset# " << ev->Get()->ReadOffset);
return;
}
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got StartRead from client for "
- << it->second.Partition <<
- " at readOffset " << ev->Get()->ReadOffset <<
- " commitOffset " << ev->Get()->CommitOffset);
- //proxy request to partition - allow initing
- //TODO: add here VerifyReadOffset too and check it againts Committed position
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got StartRead from client"
+ << ": partition# " << it->second.Partition
+ << ", readOffset# " << ev->Get()->ReadOffset
+ << ", commitOffset# " << ev->Get()->CommitOffset);
+
+ // proxy request to partition - allow initing
+ // TODO: add here VerifyReadOffset too and check it againts Committed position
ctx.Send(it->second.Actor, new TEvPQProxy::TEvLockPartition(ev->Get()->ReadOffset, ev->Get()->CommitOffset, ev->Get()->VerifyReadOffset, true));
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReleased::TPtr& ev, const TActorContext& ctx) {
RequestNotChecked = true;
@@ -456,167 +457,150 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReleased::TP
if (it == Partitions.end()) {
return;
}
- if (!it->second.Releasing) {
- CloseSession(TStringBuilder() << "Release of partition that is not requested for release is forbiden for " << it->second.Partition, PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ if (!it->second.Releasing) {
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder()
+ << "release of partition that is not requested for release is forbiden for " << it->second.Partition, ctx);
}
- Y_VERIFY(it->second.LockSent);
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got Released from client for partition " << it->second.Partition);
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got Released from client"
+ << ": partition# " << it->second.Partition);
+
+ Y_VERIFY(it->second.LockSent);
ReleasePartition(it, true, ctx);
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvGetStatus::TPtr& ev, const TActorContext& ctx) {
auto it = Partitions.find(ev->Get()->AssignId);
if (it == Partitions.end() || it->second.Releasing) {
// Ignore request - client asking status after releasing of partition.
return;
}
+
ctx.Send(it->second.Actor, new TEvPQProxy::TEvGetStatus(ev->Get()->AssignId));
}
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::DropPartition(typename THashMap<ui64, TPartitionActorInfo>::iterator it, const TActorContext& ctx) {
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::DropPartition(typename TPartitionsMap::iterator it, const TActorContext& ctx) {
ctx.Send(it->second.Actor, new TEvents::TEvPoisonPill());
+
bool res = ActualPartitionActors.erase(it->second.Actor);
Y_VERIFY(res);
if (--NumPartitionsFromTopic[it->second.Topic->GetInternalName()] == 0) {
- //ToDo[counters]
- bool res_ = TopicCounters.erase(it->second.Topic->GetInternalName());
- Y_VERIFY(res_);
+ // TODO: counters
+ res = TopicCounters.erase(it->second.Topic->GetInternalName());
+ Y_VERIFY(res);
}
if (SessionsActive) {
PartsPerSession.DecFor(Partitions.size(), 1);
}
+
BalancerGeneration.erase(it->first);
Partitions.erase(it);
+
if (SessionsActive) {
PartsPerSession.IncFor(Partitions.size(), 1);
}
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCommitDone::TPtr& ev, const TActorContext& ctx) {
-
Y_VERIFY(!CommitsDisabled);
- if (!ActualPartitionActor(ev->Sender))
+ if (!ActualPartitionActors.contains(ev->Sender)) {
return;
+ }
- ui64 assignId = ev->Get()->AssignId;
-
- auto it = Partitions.find(assignId);
+ auto it = Partitions.find(ev->Get()->AssignId);
Y_VERIFY(it != Partitions.end());
Y_VERIFY(it->second.Offset < ev->Get()->Offset);
it->second.NextRanges.EraseInterval(it->second.Offset, ev->Get()->Offset);
-
- if (ev->Get()->StartCookie == Max<ui64>()) //means commit at start
+ if (ev->Get()->StartCookie == Max<ui64>()) { // means commit at start
return;
+ }
TServerMessage result;
result.set_status(Ydb::StatusIds::SUCCESS);
+
if (!RangesMode) {
if constexpr (UseMigrationProtocol) {
for (ui64 i = ev->Get()->StartCookie; i <= ev->Get()->LastCookie; ++i) {
auto c = result.mutable_committed()->add_cookies();
c->set_partition_cookie(i);
- c->set_assign_id(assignId);
+ c->set_assign_id(ev->Get()->AssignId);
it->second.NextCommits.erase(i);
it->second.ReadIdCommitted = i;
}
} else { // commit on cookies not supported in this case
Y_VERIFY(false);
}
-
} else {
if constexpr (UseMigrationProtocol) {
auto c = result.mutable_committed()->add_offset_ranges();
- c->set_assign_id(assignId);
+ c->set_assign_id(ev->Get()->AssignId);
c->set_start_offset(it->second.Offset);
c->set_end_offset(ev->Get()->Offset);
-
} else {
auto c = result.mutable_commit_offset_response()->add_partitions_committed_offsets();
- c->set_partition_session_id(assignId);
+ c->set_partition_session_id(ev->Get()->AssignId);
c->set_committed_offset(ev->Get()->Offset);
}
}
it->second.Offset = ev->Get()->Offset;
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " replying for commits from assignId " << assignId << " from " << ev->Get()->StartCookie << " to " << ev->Get()->LastCookie << " to offset " << it->second.Offset);
- if (!WriteResponse(std::move(result))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
- return;
- }
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " replying for commits"
+ << ": assignId# " << ev->Get()->AssignId
+ << ", from# " << ev->Get()->StartCookie
+ << ", to# " << ev->Get()->LastCookie
+ << ", offset# " << it->second.Offset);
+ WriteToStreamOrDie(ctx, std::move(result));
}
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvReadSessionStatus::TPtr& ev, const TActorContext& ctx) {
- THolder<TEvPQProxy::TEvReadSessionStatusResponse> result(new TEvPQProxy::TEvReadSessionStatusResponse());
- for (auto& p : Partitions) {
+ auto result = MakeHolder<TEvPQProxy::TEvReadSessionStatusResponse>();
+
+ for (const auto& [_, info] : Partitions) {
auto part = result->Record.AddPartition();
- part->SetTopic(p.second.Partition.DiscoveryConverter->GetPrimaryPath());
- part->SetPartition(p.second.Partition.Partition);
- part->SetAssignId(p.second.Partition.AssignId);
- for (auto& c : p.second.NextCommits) {
+ part->SetTopic(info.Partition.DiscoveryConverter->GetPrimaryPath());
+ part->SetPartition(info.Partition.Partition);
+ part->SetAssignId(info.Partition.AssignId);
+ part->SetReadIdCommitted(info.ReadIdCommitted);
+ part->SetLastReadId(info.ReadIdToResponse - 1);
+ part->SetTimestampMs(info.AssignTimestamp.MilliSeconds());
+
+ for (const auto c : info.NextCommits) {
part->AddNextCommits(c);
}
- part->SetReadIdCommitted(p.second.ReadIdCommitted);
- part->SetLastReadId(p.second.ReadIdToResponse - 1);
- part->SetTimestampMs(p.second.AssignTimestamp.MilliSeconds());
}
+
result->Record.SetSession(Session);
result->Record.SetTimestamp(StartTimestamp.MilliSeconds());
-
result->Record.SetClientNode(PeerName);
result->Record.SetProxyNodeId(ctx.SelfID.NodeId());
ctx.Send(ev->Sender, result.Release());
}
-inline TString GetTopicSettingsPath(const PersQueue::V1::MigrationStreamingReadClientMessage::TopicReadSettings& settings) {
- return settings.topic();
-}
-inline TString GetTopicSettingsPath(const Topic::StreamReadMessage::InitRequest::TopicReadSettings& settings) {
- return settings.path();
-}
-inline i64 GetTopicSettingsReadFrom(const PersQueue::V1::MigrationStreamingReadClientMessage::TopicReadSettings& settings) {
- return settings.start_from_written_at_ms();
-}
-inline i64 GetTopicSettingsReadFrom(const Topic::StreamReadMessage::InitRequest::TopicReadSettings& settings) {
- return ::google::protobuf::util::TimeUtil::TimestampToMilliseconds(settings.read_from());
-}
-
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr& ev, const TActorContext& ctx) {
-
- THolder<TEvReadInit> event(ev->Release());
-
if (!Topics.empty()) {
- //answer error
- CloseSession("got second init request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "got second init request", ctx);
}
- const auto& init = event->Request.init_request();
+ const auto& init = ev->Get()->Request.init_request();
if (!init.topics_read_settings_size()) {
- CloseSession("no topics in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "no topics in init request", ctx);
}
if (init.consumer().empty()) {
- CloseSession("no consumer in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "no consumer in init request", ctx);
}
ClientId = NPersQueue::ConvertNewConsumerName(init.consumer(), ctx);
@@ -626,9 +610,11 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
ClientPath = NPersQueue::StripLeadSlash(NPersQueue::MakeConsumerPath(init.consumer()));
}
- TStringBuilder session;
- session << ClientPath << "_" << ctx.SelfID.NodeId() << "_" << Cookie << "_" << TAppData::RandomProvider->GenRand64() << "_v1";
- Session = session;
+ Session = TStringBuilder() << ClientPath
+ << "_" << ctx.SelfID.NodeId()
+ << "_" << Cookie
+ << "_" << TAppData::RandomProvider->GenRand64()
+ << "_" << "v1";
CommitsDisabled = false;
if constexpr (UseMigrationProtocol) {
@@ -646,107 +632,118 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
ReadTimestampMs = 0; // read_from per topic only
ReadOnlyLocal = true;
}
+
if (MaxTimeLagMs < 0) {
- CloseSession("max_lag_duration_ms must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "max_lag_duration_ms must be nonnegative number", ctx);
}
+
if (ReadTimestampMs < 0) {
- CloseSession("start_from_written_at_ms must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "start_from_written_at_ms must be nonnegative number", ctx);
}
- PeerName = event->PeerName;
+ PeerName = ev->Get()->PeerName;
+
+ auto getTopicPath = [](const auto& settings) {
+ if constexpr (UseMigrationProtocol) {
+ return settings.topic();
+ } else {
+ return settings.path();
+ }
+ };
+
+ auto getReadFrom = [](const auto& settings) {
+ if constexpr (UseMigrationProtocol) {
+ return settings.start_from_written_at_ms();
+ } else {
+ return ::google::protobuf::util::TimeUtil::TimestampToMilliseconds(settings.read_from());
+ }
+ };
for (const auto& topic : init.topics_read_settings()) {
- TString topic_path = GetTopicSettingsPath(topic);
- if (topic_path.empty()) {
- CloseSession("empty topic in init request", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ const TString path = getTopicPath(topic);
+ if (path.empty()) {
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "empty topic in init request", ctx);
}
- i64 read_from = GetTopicSettingsReadFrom(topic);
+
+ const i64 read_from = getReadFrom(topic);
if (read_from < 0) {
- CloseSession("start_from_written_at_ms must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, "start_from_written_at_ms must be nonnegative number", ctx);
}
- TopicsToResolve.insert(topic_path);
+
+ TopicsToResolve.insert(path);
}
if (Request->GetInternalToken().empty()) {
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
- CloseSession("Unauthenticated access is forbidden, please provide credentials", PersQueue::ErrorCode::ACCESS_DENIED, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::ACCESS_DENIED,
+ "unauthenticated access is forbidden, please provide credentials", ctx);
}
} else {
Y_VERIFY(Request->GetYdbToken());
Auth = *(Request->GetYdbToken());
Token = new NACLib::TUserToken(Request->GetInternalToken());
}
- TopicsList = TopicsHandler.GetReadTopicsList(
- TopicsToResolve, ReadOnlyLocal, Request->GetDatabaseName().GetOrElse(TString())
- );
+
+ TopicsList = TopicsHandler.GetReadTopicsList(TopicsToResolve, ReadOnlyLocal,
+ Request->GetDatabaseName().GetOrElse(TString()));
if (!TopicsList.IsValid) {
- return CloseSession(
- TopicsList.Reason,
- PersQueue::ErrorCode::BAD_REQUEST, ctx
- );
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TopicsList.Reason, ctx);
}
for (const auto& topic : init.topics_read_settings()) {
- auto topicIter = TopicsList.ClientTopics.find(GetTopicSettingsPath(topic));
- Y_VERIFY(!topicIter.IsEnd());
- for (const auto& converter: topicIter->second) {
+ auto it = TopicsList.ClientTopics.find(getTopicPath(topic));
+ Y_VERIFY(it != TopicsList.ClientTopics.end());
+
+ for (const auto& converter : it->second) {
const auto internalName = converter->GetOriginalPath();
if constexpr (UseMigrationProtocol) {
- for (i64 pg: topic.partition_group_ids()) {
+ for (const i64 pg : topic.partition_group_ids()) {
if (pg <= 0) {
- CloseSession("partition group id must be positive number", PersQueue::ErrorCode::BAD_REQUEST,
- ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST,
+ "partition group id must be positive number", ctx);
}
+
if (pg > Max<ui32>()) {
- CloseSession(
- TStringBuilder() << "partition group id is too big: " << pg << " > " << Max<ui32>(),
- PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder()
+ << "partition group id is too big: " << pg << " > " << Max<ui32>(), ctx);
}
+
TopicGroups[internalName].push_back(static_cast<ui32>(pg));
}
+
MaxLagByTopic[internalName] = MaxTimeLagMs;
- ReadFromTimestamp[internalName] = GetTopicSettingsReadFrom(topic);
+ ReadFromTimestamp[internalName] = getReadFrom(topic);
} else {
- for (i64 p: topic.partition_ids()) {
+ for (const i64 p : topic.partition_ids()) {
if (p < 0) {
- CloseSession("partition id must be nonnegative number", PersQueue::ErrorCode::BAD_REQUEST,
- ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST,
+ "partition id must be nonnegative number", ctx);
}
+
if (p + 1 > Max<ui32>()) {
- CloseSession(
- TStringBuilder() << "partition id is too big: " << p << " > " << Max<ui32>() - 1,
- PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder()
+ << "partition id is too big: " << p << " > " << Max<ui32>() - 1, ctx);
}
+
TopicGroups[internalName].push_back(static_cast<ui32>(p + 1));
}
- MaxLagByTopic[internalName] =
- ::google::protobuf::util::TimeUtil::DurationToMilliseconds(topic.max_lag());;
- ReadFromTimestamp[internalName] = GetTopicSettingsReadFrom(topic);
+
+ MaxLagByTopic[internalName] = ::google::protobuf::util::TimeUtil::DurationToMilliseconds(topic.max_lag());;
+ ReadFromTimestamp[internalName] = getReadFrom(topic);
}
}
}
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " init: " << event->Request << " from " << PeerName);
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " read init"
+ << ": from# " << PeerName
+ << ", request# " << ev->Get()->Request);
if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
SetupCounters();
}
- AuthInitActor = ctx.Register(new TReadInitAndAuthActor(
- ctx, ctx.SelfID, ClientId, Cookie, Session, SchemeCache, NewSchemeCache, Counters, Token, TopicsList,
- TopicsHandler.GetLocalCluster()
- ));
-
+ RunAuthActor(ctx);
auto subGroup = GetServiceCounters(Counters, "pqproxy|SLI");
Aggr = {{{{"Account", ClientPath.substr(0, ClientPath.find("/"))}}, {"total"}}};
@@ -756,41 +753,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadInit::TPtr&
SLITotal.Inc();
}
-
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::RegisterSession(const TActorId& pipe, const TString& topic, const TVector<ui32>& groups, const TActorContext& ctx)
-{
-
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " register session to " << topic);
- THolder<TEvPersQueue::TEvRegisterReadSession> request;
- request.Reset(new TEvPersQueue::TEvRegisterReadSession);
- auto& req = request->Record;
- req.SetSession(Session);
- req.SetClientNode(PeerName);
- ActorIdToProto(pipe, req.MutablePipeClient());
- req.SetClientId(ClientId);
-
- for (ui32 i = 0; i < groups.size(); ++i) {
- req.AddGroups(groups[i]);
- }
-
- NTabletPipe::SendData(ctx, pipe, request.Release());
-}
-
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::RegisterSessions(const TActorContext& ctx) {
- InitDone = true;
-
- for (auto& t : Topics) {
- RegisterSession(t.second.PipeClient, t.second.FullConverter->GetInternalName(), t.second.Groups, ctx);
- NumPartitionsFromTopic[t.second.FullConverter->GetInternalName()] = 0;
- }
-}
-
-
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::SetupCounters()
-{
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::SetupCounters() {
if (SessionsCreated) {
return;
}
@@ -815,74 +779,66 @@ void TReadSessionActor<UseMigrationProtocol>::SetupCounters()
++(*SessionsCreated);
++(*SessionsActive);
- PartsPerSession.IncFor(Partitions.size(), 1); //for 0
+ PartsPerSession.IncFor(Partitions.size(), 1); // for 0
}
-
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic)
-{
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic) {
auto& topicCounters = TopicCounters[topic->GetInternalName()];
auto subGroup = GetServiceCounters(Counters, "pqproxy|readSession");
-//client/consumerPath Account/Producer OriginDC Topic/TopicPath
auto aggr = NPersQueue::GetLabels(topic);
- TVector<std::pair<TString, TString>> cons = {{"Client", ClientId}, {"ConsumerPath", ClientPath}};
+ const TVector<std::pair<TString, TString>> cons = {{"Client", ClientId}, {"ConsumerPath", ClientPath}};
- topicCounters.PartitionsLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsLocked"}, true);
- topicCounters.PartitionsReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsReleased"}, true);
- topicCounters.PartitionsToBeReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsToBeReleased"}, false);
- topicCounters.PartitionsToBeLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsToBeLocked"}, false);
- topicCounters.PartitionsInfly = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsInfly"}, false);
- topicCounters.Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsErrors"}, true);
- topicCounters.Commits = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"Commits"}, true);
- topicCounters.WaitsForData = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"WaitsForData"}, true);
+ topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsLocked"}, true);
+ topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsReleased"}, true);
+ topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsToBeReleased"}, false);
+ topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsToBeLocked"}, false);
+ topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsInfly"}, false);
+ topicCounters.Errors = NPQ::TMultiCounter(subGroup, aggr, cons, {"PartitionsErrors"}, true);
+ topicCounters.Commits = NPQ::TMultiCounter(subGroup, aggr, cons, {"Commits"}, true);
+ topicCounters.WaitsForData = NPQ::TMultiCounter(subGroup, aggr, cons, {"WaitsForData"}, true);
topicCounters.CommitLatency = CommitLatency;
topicCounters.SLIBigLatency = SLIBigLatency;
topicCounters.SLITotal = SLITotal;
}
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic, const TString& cloudId,
- const TString& dbId, const TString& folderId)
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::SetupTopicCounters(const NPersQueue::TTopicConverterPtr& topic,
+ const TString& cloudId, const TString& dbId, const TString& folderId)
{
auto& topicCounters = TopicCounters[topic->GetInternalName()];
auto subGroup = NPersQueue::GetCountersForStream(Counters);
-//client/consumerPath Account/Producer OriginDC Topic/TopicPath
auto aggr = NPersQueue::GetLabelsForStream(topic, cloudId, dbId, folderId);
- TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}};
+ const TVector<std::pair<TString, TString>> cons{{"consumer", ClientPath}};
- topicCounters.PartitionsLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked_per_second"}, true, "name");
- topicCounters.PartitionsReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_released_per_second"}, true, "name");
- topicCounters.PartitionsToBeReleased = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_released"}, false, "name");
- topicCounters.PartitionsToBeLocked = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_locked"}, false, "name");
- topicCounters.PartitionsInfly = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked"}, false, "name");
- topicCounters.Errors = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_errors_per_second"}, true, "name");
- topicCounters.Commits = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.commits_per_second"}, true, "name");
- topicCounters.WaitsForData = NKikimr::NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.waits_for_data"}, true, "name");
+ topicCounters.PartitionsLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked_per_second"}, true, "name");
+ topicCounters.PartitionsReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_released_per_second"}, true, "name");
+ topicCounters.PartitionsToBeReleased = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_released"}, false, "name");
+ topicCounters.PartitionsToBeLocked = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_to_be_locked"}, false, "name");
+ topicCounters.PartitionsInfly = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_locked"}, false, "name");
+ topicCounters.Errors = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.partitions_errors_per_second"}, true, "name");
+ topicCounters.Commits = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.commits_per_second"}, true, "name");
+ topicCounters.WaitsForData = NPQ::TMultiCounter(subGroup, aggr, cons, {"stream.internal_read.waits_for_data"}, true, "name");
topicCounters.CommitLatency = CommitLatency;
topicCounters.SLIBigLatency = SLIBigLatency;
topicCounters.SLITotal = SLITotal;
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const TActorContext& ctx) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " auth ok"
+ << ": topics# " << ev->Get()->TopicAndTablets.size()
+ << ", initDone# " << InitDone);
LastACLCheckTimestamp = ctx.Now();
-
- LOG_INFO_S(
- ctx,
- NKikimrServices::PQ_READ_PROXY,
- PQ_LOG_PREFIX << " auth ok, got " << ev->Get()->TopicAndTablets.size() << " topics, init done " << InitDone
- );
-
AuthInitActor = TActorId();
if (!InitDone) {
- ui32 initBorder = AppData(ctx)->PQConfig.GetReadInitLatencyBigMs();
- ui32 readBorder = AppData(ctx)->PQConfig.GetReadLatencyBigMs();
- ui32 readBorderFromDisk = AppData(ctx)->PQConfig.GetReadLatencyFromDiskBigMs();
+ const ui32 initBorder = AppData(ctx)->PQConfig.GetReadInitLatencyBigMs();
+ const ui32 readBorder = AppData(ctx)->PQConfig.GetReadLatencyBigMs();
+ const ui32 readBorderFromDisk = AppData(ctx)->PQConfig.GetReadLatencyFromDiskBigMs();
auto subGroup = GetServiceCounters(Counters, "pqproxy|SLI");
InitLatency = NKikimr::NPQ::CreateSLIDurationCounter(subGroup, Aggr, "ReadInit", initBorder, {100, 200, 500, 1000, 1500, 2000, 5000, 10000, 30000, 99999999});
@@ -893,47 +849,48 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk
SLIBigReadLatency = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"ReadBigLatency"}, true, "sensor", false);
ReadsTotal = NKikimr::NPQ::TMultiCounter(subGroup, Aggr, {}, {"ReadsTotal"}, true, "sensor", false);
- ui32 initDurationMs = (ctx.Now() - StartTime).MilliSeconds();
+ const ui32 initDurationMs = (ctx.Now() - StartTime).MilliSeconds();
InitLatency.IncFor(initDurationMs, 1);
if (initDurationMs >= initBorder) {
SLIBigLatency.Inc();
}
- for (auto& [name, t] : ev->Get()->TopicAndTablets) { // ToDo - return something from Init and Auth Actor (Full Path - ?)
+ for (const auto& [name, t] : ev->Get()->TopicAndTablets) { // TODO: return something from Init and Auth Actor (Full Path - ?)
auto internalName = t.TopicNameConverter->GetInternalName();
- auto topicGrIter = TopicGroups.find(name);
- if (!topicGrIter.IsEnd()) {
- auto value = std::move(topicGrIter->second);
- TopicGroups.erase(topicGrIter);
- TopicGroups.insert(std::make_pair(internalName, std::move(value)));
+ {
+ auto it = TopicGroups.find(name);
+ if (it != TopicGroups.end()) {
+ auto value = std::move(it->second);
+ TopicGroups.erase(it);
+ TopicGroups[internalName] = std::move(value);
+ }
}
- auto rtfsIter = ReadFromTimestamp.find(name);
- if (!rtfsIter.IsEnd()) {
- auto value = std::move(rtfsIter->second);
- ReadFromTimestamp.erase(rtfsIter);
- ReadFromTimestamp[internalName] = value;
+ {
+ auto it = ReadFromTimestamp.find(name);
+ if (it != ReadFromTimestamp.end()) {
+ auto value = std::move(it->second);
+ ReadFromTimestamp.erase(it);
+ ReadFromTimestamp[internalName] = std::move(value);
+ }
}
- auto lagIter = MaxLagByTopic.find(name);
- if (!lagIter.IsEnd()) {
- auto value = std::move(lagIter->second);
- MaxLagByTopic.erase(lagIter);
- MaxLagByTopic[internalName] = value;
+ {
+ auto it = MaxLagByTopic.find(name);
+ if (it != MaxLagByTopic.end()) {
+ auto value = std::move(it->second);
+ MaxLagByTopic.erase(it);
+ MaxLagByTopic[internalName] = std::move(value);
+ }
}
- auto& topicHolder = Topics[internalName];
- topicHolder.TabletID = t.TabletID;
- topicHolder.FullConverter = t.TopicNameConverter;
- topicHolder.CloudId = t.CloudId;
- topicHolder.DbId = t.DbId;
- topicHolder.FolderId = t.FolderId;
- topicHolder.MeteringMode = t.MeteringMode;
+
+ Topics[internalName] = TTopicHolder::FromTopicInfo(t);
FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter;
FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter;
if (!GetMeteringMode()) {
SetMeteringMode(t.MeteringMode);
} else if (*GetMeteringMode() != t.MeteringMode) {
- return CloseSession("Cannot read from topics with different metering modes",
- PersQueue::ErrorCode::BAD_REQUEST, ctx);
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST,
+ "cannot read from topics with different metering modes", ctx);
}
}
@@ -943,158 +900,168 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk
InitSession(ctx);
}
} else {
- for (auto& [name, t] : ev->Get()->TopicAndTablets) {
+ for (const auto& [name, t] : ev->Get()->TopicAndTablets) {
auto it = Topics.find(t.TopicNameConverter->GetInternalName());
if (it == Topics.end()) {
- return CloseSession(
- TStringBuilder() << "list of topics changed - new topic '"
- << t.TopicNameConverter->GetPrintableString() << "' found",
- PersQueue::ErrorCode::BAD_REQUEST, ctx
- );
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder()
+ << "list of topics changed, new topic found: " << t.TopicNameConverter->GetPrintableString(), ctx);
}
if (t.MeteringMode != *GetMeteringMode()) {
- return CloseSession(
- TStringBuilder() << "Metering mode of topic: " << name << " has been changed",
- PersQueue::ErrorCode::OVERLOAD, ctx
- );
+ return CloseSession(PersQueue::ErrorCode::OVERLOAD, TStringBuilder()
+ << "metering mode of topic: " << name << " has been changed", ctx);
}
}
}
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& ctx) {
TServerMessage result;
result.set_status(Ydb::StatusIds::SUCCESS);
result.mutable_init_response()->set_session_id(Session);
- if (!WriteResponse(std::move(result))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
+ if (!WriteToStreamOrDie(ctx, std::move(result))) {
return;
}
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
+ if (!ReadFromStreamOrDie(ctx)) {
return;
}
- for (auto& t : Topics) {
- NTabletPipe::TClientConfig clientConfig;
-
- clientConfig.CheckAliveness = false;
-
- clientConfig.RetryPolicy = RetryPolicyForPipes;
- t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig));
-
- Y_VERIFY(t.second.FullConverter);
- auto it = TopicGroups.find(t.second.FullConverter->GetInternalName());
+ for (auto& [_, holder] : Topics) {
+ holder.PipeClient = CreatePipeClient(holder.TabletID, ctx);
+ Y_VERIFY(holder.FullConverter);
+ auto it = TopicGroups.find(holder.FullConverter->GetInternalName());
if (it != TopicGroups.end()) {
- t.second.Groups = it->second;
+ holder.Groups = it->second;
}
}
- RegisterSessions(ctx);
+ InitDone = true;
+
+ for (const auto& [_, holder] : Topics) {
+ RegisterSession(holder.FullConverter->GetInternalName(), holder.PipeClient, holder.Groups, ctx);
+ NumPartitionsFromTopic[holder.FullConverter->GetInternalName()] = 0;
+ }
ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
}
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx) {
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::RegisterSession(const TString& topic, const TActorId& pipe, const TVector<ui32>& groups, const TActorContext& ctx) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " register session"
+ << ": topic# " << topic);
+
+ auto request = MakeHolder<TEvPersQueue::TEvRegisterReadSession>();
+
+ auto& req = request->Record;
+ req.SetSession(Session);
+ req.SetClientNode(PeerName);
+ ActorIdToProto(pipe, req.MutablePipeClient());
+ req.SetClientId(ClientId);
- auto& record = ev->Get()->Record;
+ for (ui32 i = 0; i < groups.size(); ++i) {
+ req.AddGroups(groups[i]);
+ }
+
+ NTabletPipe::SendData(ctx, pipe, request.Release());
+}
+
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx) {
+ const auto& record = ev->Get()->Record;
Y_VERIFY(record.GetSession() == Session);
Y_VERIFY(record.GetClientId() == ClientId);
- TActorId pipe = ActorIdFromProto(record.GetPipeClient());
auto path = record.GetPath();
if (path.empty()) {
path = record.GetTopic();
}
- auto converterIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(path));
- if (converterIter.IsEnd()) {
- LOG_DEBUG_S(
- ctx, NKikimrServices::PQ_READ_PROXY,
- PQ_LOG_PREFIX << " ignored ev lock for path = " << record.GetPath() << ", path not recognized"
- );
- return;
- }
- //const auto& topic = converterIter->second->GetPrimaryPath();
- const auto& intName = converterIter->second->GetInternalName();
- auto jt = Topics.find(intName);
- if (jt == Topics.end() || pipe != jt->second.PipeClient) { //this is message from old version of pipe
- LOG_ALERT_S(
- ctx, NKikimrServices::PQ_READ_PROXY,
- PQ_LOG_PREFIX << " ignored ev lock for topic = " << intName
- << " path recognized, but topic is unknown, this is unexpected"
- );
+ auto converterIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(path));
+ if (converterIter == FullPathToConverter.end()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " ignored ev lock"
+ << ": path# " << path
+ << ", reason# " << "path not recognized");
return;
}
- //ToDo[counters]
- if (NumPartitionsFromTopic[converterIter->second->GetInternalName()]++ == 0) {
- if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
- SetupTopicCounters(converterIter->second, jt->second.CloudId, jt->second.DbId, jt->second.FolderId);
- } else {
- SetupTopicCounters(converterIter->second);
+ const auto name = converterIter->second->GetInternalName();
+
+ {
+ auto it = Topics.find(name);
+ if (it == Topics.end() || it->second.PipeClient != ActorIdFromProto(record.GetPipeClient())) {
+ LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " ignored ev lock"
+ << ": path# " << name
+ << ", reason# " << "topic is unknown");
+ return;
+ }
+
+ // TODO: counters
+ if (NumPartitionsFromTopic[name]++ == 0) {
+ if (AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ SetupTopicCounters(converterIter->second, it->second.CloudId, it->second.DbId, it->second.FolderId);
+ } else {
+ SetupTopicCounters(converterIter->second);
+ }
}
}
- //ToDo[counters]
- auto it = TopicCounters.find(converterIter->second->GetInternalName());
+ // TODO: counters
+ auto it = TopicCounters.find(name);
Y_VERIFY(it != TopicCounters.end());
- ui64 assignId = NextAssignId++;
+ Y_VERIFY(record.GetGeneration() > 0);
+ const ui64 assignId = NextAssignId++;
BalancerGeneration[assignId] = {record.GetGeneration(), record.GetStep()};
- TPartitionId partitionId{converterIter->second, record.GetPartition(), assignId};
+ const TPartitionId partitionId{converterIter->second, record.GetPartition(), assignId};
- IActor* partitionActor = new TPartitionActor(
- ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(),
- record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC, RangesMode,
- converterIter->second, UseMigrationProtocol);
+ const TActorId actorId = ctx.Register(new TPartitionActor(
+ ctx.SelfID, ClientId, ClientPath, Cookie, Session, partitionId, record.GetGeneration(),
+ record.GetStep(), record.GetTabletId(), it->second, CommitsDisabled, ClientDC, RangesMode,
+ converterIter->second, UseMigrationProtocol));
- TActorId actorId = ctx.Register(partitionActor);
if (SessionsActive) {
PartsPerSession.DecFor(Partitions.size(), 1);
}
- Y_VERIFY(record.GetGeneration() > 0);
- auto pp = Partitions.insert(std::make_pair(assignId, TPartitionActorInfo{actorId, partitionId, converterIter->second, ctx}));
- Y_VERIFY(pp.second);
+
+ bool res = Partitions.emplace(assignId, TPartitionActorInfo(actorId, partitionId, converterIter->second, ctx.Now())).second;
+ Y_VERIFY(res);
+
if (SessionsActive) {
PartsPerSession.IncFor(Partitions.size(), 1);
}
- bool res = ActualPartitionActors.insert(actorId).second;
+ res = ActualPartitionActors.insert(actorId).second;
Y_VERIFY(res);
it->second.PartitionsLocked.Inc();
it->second.PartitionsInfly.Inc();
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Assign: " << record);
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign"
+ << ": record# " << record);
ctx.Send(actorId, new TEvPQProxy::TEvLockPartition(0, 0, false, false));
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) {
- if (!ActualPartitionActor(ev->Sender))
+ if (!ActualPartitionActors.contains(ev->Sender)) {
return;
+ }
auto it = Partitions.find(ev->Get()->Partition.AssignId);
Y_VERIFY(it != Partitions.end());
Y_VERIFY(!it->second.Releasing); // if releasing and no lock sent yet - then server must already release partition
+ TServerMessage result;
+ result.set_status(Ydb::StatusIds::SUCCESS);
+
if (ev->Get()->Init) {
Y_VERIFY(!it->second.LockSent);
-
it->second.LockSent = true;
it->second.Offset = ev->Get()->Offset;
- TServerMessage result;
- result.set_status(Ydb::StatusIds::SUCCESS);
-
if constexpr (UseMigrationProtocol) {
result.mutable_assigned()->mutable_topic()->set_path(it->second.Topic->GetFederationPath());
result.mutable_assigned()->set_cluster(it->second.Topic->GetCluster());
@@ -1103,9 +1070,8 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta
result.mutable_assigned()->set_read_offset(ev->Get()->Offset);
result.mutable_assigned()->set_end_offset(ev->Get()->EndOffset);
-
} else {
- // TODO GetFederationPath() -> GetFederationPathWithDC()
+ // TODO: GetFederationPath() -> GetFederationPathWithDC()
result.mutable_start_partition_session_request()->mutable_partition_session()->set_path(it->second.Topic->GetFederationPath());
result.mutable_start_partition_session_request()->mutable_partition_session()->set_partition_id(ev->Get()->Partition.Partition);
result.mutable_start_partition_session_request()->mutable_partition_session()->set_partition_session_id(it->first);
@@ -1114,29 +1080,10 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta
result.mutable_start_partition_session_request()->mutable_partition_offsets()->set_start(ev->Get()->Offset);
result.mutable_start_partition_session_request()->mutable_partition_offsets()->set_end(ev->Get()->EndOffset);
}
-
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " sending to client create partition stream event");
-
- auto pp = it->second.Partition;
- pp.AssignId = 0;
- auto jt = PartitionToControlMessages.find(pp);
- if (jt == PartitionToControlMessages.end()) {
- if (!WriteResponse(std::move(result))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
- return;
- }
- } else {
- Y_VERIFY(jt->second.Infly);
- jt->second.ControlMessages.push_back(result);
- }
} else {
Y_VERIFY(it->second.LockSent);
- TServerMessage result;
- result.set_status(Ydb::StatusIds::SUCCESS);
-
- if constexpr(UseMigrationProtocol) {
+ if constexpr (UseMigrationProtocol) {
result.mutable_partition_status()->mutable_topic()->set_path(it->second.Topic->GetFederationPath());
result.mutable_partition_status()->set_cluster(it->second.Topic->GetCluster());
result.mutable_partition_status()->set_partition(ev->Get()->Partition.Partition);
@@ -1145,7 +1092,6 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta
result.mutable_partition_status()->set_committed_offset(ev->Get()->Offset);
result.mutable_partition_status()->set_end_offset(ev->Get()->EndOffset);
result.mutable_partition_status()->set_write_watermark_ms(ev->Get()->WriteTimestampEstimateMs);
-
} else {
result.mutable_partition_session_status_response()->set_partition_session_id(it->first);
@@ -1155,169 +1101,157 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionSta
*result.mutable_partition_session_status_response()->mutable_write_time_high_watermark() =
::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(ev->Get()->WriteTimestampEstimateMs);
}
+ }
- auto pp = it->second.Partition;
- pp.AssignId = 0;
- auto jt = PartitionToControlMessages.find(pp);
- if (jt == PartitionToControlMessages.end()) {
- if (!WriteResponse(std::move(result))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
- return;
- }
- } else {
- Y_VERIFY(jt->second.Infly);
- jt->second.ControlMessages.push_back(result);
- }
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " sending to client create partition stream event");
+ SendControlMessage(it->second.Partition, std::move(result), ctx);
+}
+
+template <bool UseMigrationProtocol>
+bool TReadSessionActor<UseMigrationProtocol>::SendControlMessage(TPartitionId id, TServerMessage&& message, const TActorContext& ctx) {
+ id.AssignId = 0;
+
+ auto it = PartitionToControlMessages.find(id);
+ if (it == PartitionToControlMessages.end()) {
+ return WriteToStreamOrDie(ctx, std::move(message));
+ } else {
+ Y_VERIFY(it->second.Infly);
+ it->second.ControlMessages.push_back(std::move(message));
}
+
+ return true;
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvError::TPtr& ev, const TActorContext& ctx) {
- CloseSession(ev->Get()->Record.GetDescription(), ConvertOldCode(ev->Get()->Record.GetCode()), ctx);
+ CloseSession(ConvertOldCode(ev->Get()->Record.GetCode()), ev->Get()->Record.GetDescription(), ctx);
}
-
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::SendReleaseSignalToClient(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it, bool kill, const TActorContext& ctx)
-{
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::SendReleaseSignal(typename TPartitionsMap::iterator it, bool kill, const TActorContext& ctx) {
TServerMessage result;
result.set_status(Ydb::StatusIds::SUCCESS);
- if constexpr(UseMigrationProtocol) {
+ if constexpr (UseMigrationProtocol) {
result.mutable_release()->mutable_topic()->set_path(it->second.Topic->GetFederationPath());
result.mutable_release()->set_cluster(it->second.Topic->GetCluster());
result.mutable_release()->set_partition(it->second.Partition.Partition);
result.mutable_release()->set_assign_id(it->second.Partition.AssignId);
result.mutable_release()->set_forceful_release(kill);
result.mutable_release()->set_commit_offset(it->second.Offset);
-
} else {
result.mutable_stop_partition_session_request()->set_partition_session_id(it->second.Partition.AssignId);
result.mutable_stop_partition_session_request()->set_graceful(!kill);
result.mutable_stop_partition_session_request()->set_committed_offset(it->second.Offset);
}
- auto pp = it->second.Partition;
- pp.AssignId = 0;
- auto jt = PartitionToControlMessages.find(pp);
- if (jt == PartitionToControlMessages.end()) {
- if (!WriteResponse(std::move(result))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
- return;
- }
- } else {
- Y_VERIFY(jt->second.Infly);
- jt->second.ControlMessages.push_back(result);
+ if (!SendControlMessage(it->second.Partition, std::move(result), ctx)) {
+ return;
}
+
Y_VERIFY(it->second.LockSent);
it->second.ReleaseSent = true;
}
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record;
+ const auto& record = ev->Get()->Record;
Y_VERIFY(record.GetSession() == Session);
Y_VERIFY(record.GetClientId() == ClientId);
- TString topicPath = NPersQueue::NormalizeFullPath(record.GetPath());
- ui32 group = record.HasGroup() ? record.GetGroup() : 0;
- auto pathIter = FullPathToConverter.find(topicPath);
- Y_VERIFY(!pathIter.IsEnd());
- auto it = Topics.find(pathIter->second->GetInternalName());
- Y_VERIFY(!it.IsEnd());
- auto& converter = it->second.FullConverter;
+ const ui32 group = record.HasGroup() ? record.GetGroup() : 0;
- TActorId pipe = ActorIdFromProto(record.GetPipeClient());
+ auto pathIter = FullPathToConverter.find(NPersQueue::NormalizeFullPath(record.GetPath()));
+ Y_VERIFY(pathIter != FullPathToConverter.end());
- if (pipe != it->second.PipeClient) { //this is message from old version of pipe
+ auto it = Topics.find(pathIter->second->GetInternalName());
+ Y_VERIFY(it != Topics.end());
+
+ if (it->second.PipeClient != ActorIdFromProto(record.GetPipeClient())) {
return;
}
+ auto& converter = it->second.FullConverter;
+
for (ui32 c = 0; c < record.GetCount(); ++c) {
Y_VERIFY(!Partitions.empty());
- TActorId actorId = TActorId{};
+ TActorId actorId;
auto jt = Partitions.begin();
ui32 i = 0;
+
for (auto it = Partitions.begin(); it != Partitions.end(); ++it) {
if (it->second.Topic->GetInternalName() == converter->GetInternalName()
&& !it->second.Releasing
&& (group == 0 || it->second.Partition.Partition + 1 == group)
) {
++i;
- if (rand() % i == 0) { //will lead to 1/n probability for each of n partitions
+ if (rand() % i == 0) { // will lead to 1/n probability for each of n partitions
actorId = it->second.Actor;
jt = it;
}
}
}
+
Y_VERIFY(actorId);
- {
- //ToDo[counters]
- auto it = TopicCounters.find(converter->GetInternalName());
- Y_VERIFY(it != TopicCounters.end());
- it->second.PartitionsToBeReleased.Inc();
- }
+ // TODO: counters
+ auto it = TopicCounters.find(converter->GetInternalName());
+ Y_VERIFY(it != TopicCounters.end());
+ it->second.PartitionsToBeReleased.Inc();
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " releasing " << jt->second.Partition);
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " releasing"
+ << ": partition# " << jt->second.Partition);
jt->second.Releasing = true;
- if (!jt->second.LockSent) { //no lock yet - can release silently
+
+ if (!jt->second.LockSent) { // no lock yet - can release silently
ReleasePartition(jt, true, ctx);
} else {
- SendReleaseSignalToClient(jt, false, ctx);
+ SendReleaseSignal(jt, false, ctx);
}
}
}
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionReleased::TPtr& ev, const TActorContext& ctx) {
- if (!ActualPartitionActor(ev->Sender))
+ if (!ActualPartitionActors.contains(ev->Sender)) {
return;
+ }
- const auto assignId = ev->Get()->Partition.AssignId;
-
- auto it = Partitions.find(assignId);
+ auto it = Partitions.find(ev->Get()->Partition.AssignId);
Y_VERIFY(it != Partitions.end());
Y_VERIFY(it->second.Releasing);
- ReleasePartition(it, false, ctx); //no reads could be here - this is release from partition
+ ReleasePartition(it, false, ctx); // no reads could be here - this is release from partition
}
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::InformBalancerAboutRelease(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it, const TActorContext& ctx) {
-
- THolder<TEvPersQueue::TEvPartitionReleased> request;
- request.Reset(new TEvPersQueue::TEvPartitionReleased);
- auto& req = request->Record;
-
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::InformBalancerAboutRelease(typename TPartitionsMap::iterator it, const TActorContext& ctx) {
const auto& converter = it->second.Topic;
auto jt = Topics.find(converter->GetInternalName());
Y_VERIFY(jt != Topics.end());
+ auto request = MakeHolder<TEvPersQueue::TEvPartitionReleased>();
+
+ auto& req = request->Record;
req.SetSession(Session);
ActorIdToProto(jt->second.PipeClient, req.MutablePipeClient());
req.SetClientId(ClientId);
req.SetTopic(converter->GetPrimaryPath());
req.SetPartition(it->second.Partition.Partition);
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " released: " << it->second.Partition);
-
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " released"
+ << ": partition# " << it->second.Partition);
NTabletPipe::SendData(ctx, jt->second.PipeClient, request.Release());
}
-
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx) {
-
- if (errorCode != PersQueue::ErrorCode::OK) {
- if (InternalErrorCode(errorCode)) {
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::CloseSession(PersQueue::ErrorCode::ErrorCode code, const TString& reason, const TActorContext& ctx) {
+ if (code != PersQueue::ErrorCode::OK) {
+ if (InternalErrorCode(code)) {
SLIErrors.Inc();
}
+
if (Errors) {
++(*Errors);
} else if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
@@ -1325,169 +1259,155 @@ void TReadSessionActor<UseMigrationProtocol>::CloseSession(const TString& errorR
}
TServerMessage result;
- result.set_status(ConvertPersQueueInternalCodeToStatus(errorCode));
-
- FillIssue(result.add_issues(), errorCode, errorReason);
-
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " closed with error reason: " << errorReason);
+ result.set_status(ConvertPersQueueInternalCodeToStatus(code));
+ FillIssue(result.add_issues(), code, reason);
- if (!WriteResponse(std::move(result), true)) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " closed with error"
+ << ": reason# " << reason);
+ if (!WriteToStreamOrDie(ctx, std::move(result), true)) {
return;
}
} else {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " closed");
- if (!Request->GetStreamCtx()->Finish(std::move(grpc::Status::OK))) {
+ if (!Request->GetStreamCtx()->Finish(grpc::Status::OK)) {
LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc double finish failed");
- Die(ctx);
- return;
}
-
}
Die(ctx);
}
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
- TEvTabletPipe::TEvClientConnected *msg = ev->Get();
+ const auto* msg = ev->Get();
+
if (msg->Status != NKikimrProto::OK) {
if (msg->Dead) {
- CloseSession(TStringBuilder() << "one of topics is deleted, tablet " << msg->TabletId, PersQueue::ErrorCode::BAD_REQUEST, ctx);
- return;
+ return CloseSession(PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder()
+ << "one of topics is deleted, tablet " << msg->TabletId, ctx);
}
- //TODO: remove it
- CloseSession(TStringBuilder() << "unable to connect to one of topics, tablet " << msg->TabletId, PersQueue::ErrorCode::ERROR, ctx);
- return;
+
+ // TODO: remove it
+ return CloseSession(PersQueue::ErrorCode::ERROR, TStringBuilder()
+ << "unable to connect to one of topics, tablet " << msg->TabletId, ctx);
#if 0
- const bool isAlive = ProcessBalancerDead(msg->TabletId, ctx); // returns false if actor died
- Y_UNUSED(isAlive);
+ ProcessBalancerDead(msg->TabletId, ctx); // returns false if actor died
return;
#endif
}
}
-template<bool UseMigrationProtocol>
-bool TReadSessionActor<UseMigrationProtocol>::ActualPartitionActor(const TActorId& part) {
- return ActualPartitionActors.contains(part);
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
+ ProcessBalancerDead(ev->Get()->TabletId, ctx);
}
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::ReleasePartition(typename TPartitionsMap::iterator it, bool couldBeReads, const TActorContext& ctx) {
+ // TODO: counters
+ auto jt = TopicCounters.find(it->second.Topic->GetInternalName());
+ Y_VERIFY(jt != TopicCounters.end());
+
+ jt->second.PartitionsReleased.Inc();
+ jt->second.PartitionsInfly.Dec();
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::ReleasePartition(const typename THashMap<ui64, TPartitionActorInfo>::iterator& it,
- bool couldBeReads, const TActorContext& ctx)
-{
- {
- //ToDo[counters]
- auto jt = TopicCounters.find(it->second.Topic->GetInternalName());
- Y_VERIFY(jt != TopicCounters.end());
- jt->second.PartitionsReleased.Inc();
- jt->second.PartitionsInfly.Dec();
- if (!it->second.Released && it->second.Releasing) {
- jt->second.PartitionsToBeReleased.Dec();
- }
+ if (!it->second.Released && it->second.Releasing) {
+ jt->second.PartitionsToBeReleased.Dec();
}
Y_VERIFY(couldBeReads || !it->second.Reading);
- //process reads
- typename TFormedReadResponse<TServerMessage>::TPtr formedResponseToAnswer;
+ typename TFormedReadResponse<TServerMessage>::TPtr response;
+
+ // process reads
if (it->second.Reading) {
- const auto readIt = PartitionToReadResponse.find(it->second.Actor);
+ auto readIt = PartitionToReadResponse.find(it->second.Actor);
Y_VERIFY(readIt != PartitionToReadResponse.end());
if (--readIt->second->RequestsInfly == 0) {
- formedResponseToAnswer = readIt->second;
+ response = readIt->second;
}
}
InformBalancerAboutRelease(it, ctx);
- it->second.Released = true; //to force drop
- DropPartition(it, ctx); //partition will be dropped
+ it->second.Released = true; // to force drop
+ DropPartition(it, ctx); // partition will be dropped
- if (formedResponseToAnswer) {
- if (const auto ru = CalcRuConsumption(PrepareResponse(formedResponseToAnswer))) {
- formedResponseToAnswer->RequiredQuota = ru;
+ if (response) {
+ if (const auto ru = CalcRuConsumption(PrepareResponse(response))) {
+ response->RequiredQuota = ru;
if (MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx)) {
Y_VERIFY(!PendingQuota);
- PendingQuota = formedResponseToAnswer;
+ PendingQuota = response;
} else {
- WaitingQuota.push_back(formedResponseToAnswer);
+ WaitingQuota.push_back(response);
}
} else {
- ProcessAnswer(ctx, formedResponseToAnswer); // returns false if actor died
+ ProcessAnswer(response, ctx);
}
}
}
-template<bool UseMigrationProtocol>
-bool TReadSessionActor<UseMigrationProtocol>::ProcessBalancerDead(const ui64 tablet, const TActorContext& ctx) {
+template <bool UseMigrationProtocol>
+TActorId TReadSessionActor<UseMigrationProtocol>::CreatePipeClient(ui64 tabletId, const TActorContext& ctx) {
+ NTabletPipe::TClientConfig clientConfig;
+ clientConfig.CheckAliveness = false;
+ clientConfig.RetryPolicy = RetryPolicyForPipes;
+ return ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
+}
+
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::ProcessBalancerDead(ui64 tabletId, const TActorContext& ctx) {
for (auto& t : Topics) {
- if (t.second.TabletID == tablet) {
- LOG_INFO_S(
- ctx, NKikimrServices::PQ_READ_PROXY,
- PQ_LOG_PREFIX << " balancer for topic " << t.second.FullConverter->GetPrintableString()
- << " is dead, restarting all from this topic"
- );
-
- //Drop all partitions from this topic
+ if (t.second.TabletID == tabletId) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " balancer dead, restarting all from topic"
+ << ": topic# " << t.second.FullConverter->GetPrintableString());
+
+ // Drop all partitions from this topic
for (auto it = Partitions.begin(); it != Partitions.end();) {
- if (it->second.Topic->GetInternalName() == t.first) { //partition from this topic
+ if (it->second.Topic->GetInternalName() == t.first) { // partition from this topic
// kill actor
auto jt = it;
++it;
+
if (jt->second.LockSent) {
- SendReleaseSignalToClient(jt, true, ctx);
+ SendReleaseSignal(jt, true, ctx);
}
+
ReleasePartition(jt, true, ctx);
} else {
++it;
}
}
- //reconnect pipe
- NTabletPipe::TClientConfig clientConfig;
- clientConfig.CheckAliveness = false;
- clientConfig.RetryPolicy = RetryPolicyForPipes;
- t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig));
+ t.second.PipeClient = CreatePipeClient(t.second.TabletID, ctx);
+
if (InitDone) {
if (PipeReconnects) {
++(*PipeReconnects);
}
+
if (Errors) {
++(*Errors);
}
- RegisterSession(t.second.PipeClient, t.first, t.second.Groups, ctx);
+ RegisterSession(t.first, t.second.PipeClient, t.second.Groups, ctx);
}
}
}
- return true;
-}
-
-
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
- const bool isAlive = ProcessBalancerDead(ev->Get()->TabletId, ctx); // returns false if actor died
- Y_UNUSED(isAlive);
}
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequestProxy::TEvRefreshTokenResponse::TPtr &ev , const TActorContext& ctx) {
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequestProxy::TEvRefreshTokenResponse::TPtr& ev , const TActorContext& ctx) {
if (ev->Get()->Authenticated && !ev->Get()->InternalToken.empty()) {
Token = new NACLib::TUserToken(ev->Get()->InternalToken);
ForceACLCheck = true;
+
if constexpr (!UseMigrationProtocol) {
TServerMessage result;
result.set_status(Ydb::StatusIds::SUCCESS);
result.mutable_update_token_response();
- if (!WriteResponse(std::move(result))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
- return;
- }
+ WriteToStreamOrDie(ctx, std::move(result));
}
} else {
Request->ReplyUnauthenticated("refreshed token is invalid");
@@ -1495,46 +1415,33 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(NGRpcService::TGRpcRequestP
}
}
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::ProcessAuth(const TString& auth, const TActorContext& ctx) {
- if (!auth.empty() && auth != Auth) {
- Auth = auth;
- Request->RefreshToken(auth, ctx, ctx.SelfID);
- }
-}
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext& ctx) {
RequestNotChecked = true;
- THolder<TEvPQProxy::TEvRead> event(ev->Release());
-
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
+ if (!ReadFromStreamOrDie(ctx)) {
return;
}
-
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got read request with guid: " << event->Guid);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " got read request"
+ << ": guid# " << ev->Get()->Guid);
if constexpr (UseMigrationProtocol) {
- Reads.emplace_back(event.Release());
+ Reads.emplace_back(ev->Release());
} else {
- ReadSizeBudget += event->MaxSize;
+ ReadSizeBudget += ev->Get()->MaxSize;
}
ProcessReads(ctx);
}
-
-template<typename TServerMessage>
+template <typename TServerMessage>
i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
constexpr bool UseMigrationProtocol = std::is_same_v<TServerMessage, PersQueue::V1::MigrationStreamingReadServerMessage>;
+
if constexpr (UseMigrationProtocol) {
Y_VERIFY(resp.data_batch().partition_data_size() == 1);
Response.mutable_data_batch()->add_partition_data()->Swap(resp.mutable_data_batch()->mutable_partition_data(0));
-
} else {
Y_VERIFY(resp.read_response().partition_data_size() == 1);
Response.mutable_read_response()->add_partition_data()->Swap(resp.mutable_read_response()->mutable_partition_data(0));
@@ -1547,50 +1454,53 @@ i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
return ByteSize - prev;
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::TPtr& ev, const TActorContext& ctx) {
- TActorId sender = ev->Sender;
- if (!ActualPartitionActor(sender))
+ if (!ActualPartitionActors.contains(ev->Sender)) {
return;
+ }
- THolder<TEvReadResponse> event(ev->Release());
-
+ auto& response = ev->Get()->Response;
ui64 partitionCookie;
ui64 assignId;
+
if constexpr (UseMigrationProtocol) {
- Y_VERIFY(event->Response.data_batch().partition_data_size() == 1);
- partitionCookie = event->Response.data_batch().partition_data(0).cookie().partition_cookie();
+ Y_VERIFY(response.data_batch().partition_data_size() == 1);
+ partitionCookie = response.data_batch().partition_data(0).cookie().partition_cookie();
Y_VERIFY(partitionCookie != 0); // cookie is assigned
- assignId = event->Response.data_batch().partition_data(0).cookie().assign_id();
-
+ assignId = response.data_batch().partition_data(0).cookie().assign_id();
} else {
- Y_VERIFY(event->Response.read_response().partition_data_size() == 1);
- assignId = event->Response.read_response().partition_data(0).partition_session_id();
+ Y_VERIFY(response.read_response().partition_data_size() == 1);
+ assignId = response.read_response().partition_data(0).partition_session_id();
}
- const auto partitionIt = Partitions.find(assignId);
- Y_VERIFY(partitionIt != Partitions.end());
- Y_VERIFY(partitionIt->second.Reading);
- partitionIt->second.Reading = false;
-
- if constexpr (UseMigrationProtocol) {
- partitionIt->second.ReadIdToResponse = partitionCookie + 1;
+ typename TFormedReadResponse<TServerMessage>::TPtr formedResponse;
+ {
+ auto it = PartitionToReadResponse.find(ev->Sender);
+ Y_VERIFY(it != PartitionToReadResponse.end());
+ formedResponse = it->second;
}
- auto it = PartitionToReadResponse.find(sender);
- Y_VERIFY(it != PartitionToReadResponse.end());
+ auto it = Partitions.find(assignId);
+ Y_VERIFY(it != Partitions.end());
+ Y_VERIFY(it->second.Reading);
+ it->second.Reading = false;
- typename TFormedReadResponse<TServerMessage>::TPtr formedResponse = it->second;
+ if constexpr (UseMigrationProtocol) {
+ it->second.ReadIdToResponse = partitionCookie + 1;
+ }
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " read done guid " << formedResponse->Guid
- << partitionIt->second.Partition
- << " size " << event->Response.ByteSize());
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " read done"
+ << ": guid# " << formedResponse->Guid
+ << ", partition# " << it->second.Partition
+ << ", size# " << response.ByteSize());
- const i64 diff = formedResponse->ApplyResponse(std::move(event->Response));
- if (event->FromDisk) {
+ const i64 diff = formedResponse->ApplyResponse(std::move(response));
+ if (ev->Get()->FromDisk) {
formedResponse->FromDisk = true;
}
- formedResponse->WaitQuotaTime = Max(formedResponse->WaitQuotaTime, event->WaitQuotaTime);
+
+ formedResponse->WaitQuotaTime = Max(formedResponse->WaitQuotaTime, ev->Get()->WaitQuotaTime);
--formedResponse->RequestsInfly;
BytesInflight_ += diff;
@@ -1608,28 +1518,16 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::T
WaitingQuota.push_back(formedResponse);
}
} else {
- ProcessAnswer(ctx, formedResponse);
+ ProcessAnswer(formedResponse, ctx);
}
}
}
-template<bool UseMigrationProtocol>
-bool TReadSessionActor<UseMigrationProtocol>::WriteResponse(TServerMessage&& response, bool finish) {
- ui64 sz = response.ByteSize();
- ActiveWrites.push(sz);
- BytesInflight_ += sz;
- if (BytesInflight) {
- (*BytesInflight) += sz;
- }
-
- return finish ? Request->GetStreamCtx()->WriteAndFinish(std::move(response), grpc::Status::OK) : Request->GetStreamCtx()->Write(std::move(response));
-}
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
ui64 TReadSessionActor<UseMigrationProtocol>::PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse) {
formedResponse->ByteSizeBeforeFiltering = formedResponse->Response.ByteSize();
- if constexpr(UseMigrationProtocol) {
+ if constexpr (UseMigrationProtocol) {
formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch());
} else {
formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response());
@@ -1638,35 +1536,40 @@ ui64 TReadSessionActor<UseMigrationProtocol>::PrepareResponse(typename TFormedRe
return formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0;
}
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& ctx, typename TFormedReadResponse<TServerMessage>::TPtr formedResponse) {
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse, const TActorContext& ctx) {
ui32 readDurationMs = (ctx.Now() - formedResponse->Start - formedResponse->WaitQuotaTime).MilliSeconds();
+
if (formedResponse->FromDisk) {
ReadLatencyFromDisk.IncFor(readDurationMs, 1);
} else {
ReadLatency.IncFor(readDurationMs, 1);
}
- if (readDurationMs >= (formedResponse->FromDisk ? AppData(ctx)->PQConfig.GetReadLatencyFromDiskBigMs() : AppData(ctx)->PQConfig.GetReadLatencyBigMs())) {
+
+ const auto latencyThreshold = formedResponse->FromDisk
+ ? AppData(ctx)->PQConfig.GetReadLatencyFromDiskBigMs()
+ : AppData(ctx)->PQConfig.GetReadLatencyBigMs();
+ if (readDurationMs >= latencyThreshold) {
SLIBigReadLatency.Inc();
}
Y_VERIFY(formedResponse->RequestsInfly == 0);
const ui64 diff = formedResponse->ByteSizeBeforeFiltering;
- ui64 sizeEstimation = formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0;
+ const ui64 sizeEstimation = formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0;
+
if constexpr (!UseMigrationProtocol) {
formedResponse->Response.mutable_read_response()->set_bytes_size(sizeEstimation);
}
if (formedResponse->HasMessages) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " response to read " << formedResponse->Guid);
-
- if (!WriteResponse(std::move(formedResponse->Response))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " response to read"
+ << ": guid# " << formedResponse->Guid);
+ if (!WriteToStreamOrDie(ctx, std::move(formedResponse->Response))) {
return;
}
} else {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " empty read result " << formedResponse->Guid << ", start new reading");
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " empty read result, start new reading"
+ << ": guid# " << formedResponse->Guid);
}
BytesInflight_ -= diff;
@@ -1674,27 +1577,28 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext&
(*BytesInflight) -= diff;
}
- for (auto& pp : formedResponse->PartitionsTookPartInControlMessages) {
- auto it = PartitionToControlMessages.find(pp);
+ for (const auto& id : formedResponse->PartitionsTookPartInControlMessages) {
+ auto it = PartitionToControlMessages.find(id);
Y_VERIFY(it != PartitionToControlMessages.end());
+
if (--it->second.Infly == 0) {
for (auto& r : it->second.ControlMessages) {
- if (!WriteResponse(std::move(r))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
+ if (!WriteToStreamOrDie(ctx, std::move(r))) {
return;
}
}
+
PartitionToControlMessages.erase(it);
}
}
- for (const TActorId& p : formedResponse->PartitionsTookPartInRead) {
- PartitionToReadResponse.erase(p);
+ for (const auto& id : formedResponse->PartitionsTookPartInRead) {
+ PartitionToReadResponse.erase(id);
}
RequestedBytes -= formedResponse->RequestedBytes;
ReadsInfly--;
+
if constexpr (!UseMigrationProtocol) {
ReadSizeBudget += formedResponse->RequestedBytes;
ReadSizeBudget -= sizeEstimation;
@@ -1712,35 +1616,34 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext&
}
}
- ProcessReads(ctx); // returns false if actor died
-}
-
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvCloseSession::TPtr& ev, const TActorContext& ctx) {
- CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx);
+ ProcessReads(ctx);
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
ui32 TReadSessionActor<UseMigrationProtocol>::NormalizeMaxReadMessagesCount(ui32 sourceValue) {
ui32 count = Min<ui32>(sourceValue, Max<i32>());
+
if (count == 0) {
count = Max<i32>();
}
+
return count;
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
ui32 TReadSessionActor<UseMigrationProtocol>::NormalizeMaxReadSize(ui32 sourceValue) {
ui32 size = Min<ui32>(sourceValue, MAX_READ_SIZE);
+
if (size == 0) {
size = MAX_READ_SIZE;
}
+
return size;
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext& ctx) {
- auto ShouldContinueReads = [this]() {
+ auto shouldContinueReads = [this]() {
if constexpr (UseMigrationProtocol) {
return !Reads.empty() && ReadsInfly < MAX_INFLY_READS;
} else {
@@ -1748,7 +1651,7 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext&
}
};
- while (ShouldContinueReads() && BytesInflight_ + RequestedBytes < MAX_INFLY_BYTES) {
+ while (shouldContinueReads() && BytesInflight_ + RequestedBytes < MAX_INFLY_BYTES) {
ui32 count = MaxReadMessagesCount;
ui64 size = MaxReadSize;
ui32 partitionsAsked = 0;
@@ -1759,127 +1662,141 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessReads(const TActorContext&
} else {
guid = CreateGuidAsString();
}
+
typename TFormedReadResponse<TServerMessage>::TPtr formedResponse =
new TFormedReadResponse<TServerMessage>(guid, ctx.Now());
+
while (!AvailablePartitions.empty()) {
auto part = *AvailablePartitions.begin();
AvailablePartitions.erase(AvailablePartitions.begin());
auto it = Partitions.find(part.AssignId);
- if (it == Partitions.end() || it->second.Releasing) { //this is already released partition
+ if (it == Partitions.end() || it->second.Releasing) { // this is already released partition
continue;
}
- //add this partition to reading
- ++partitionsAsked;
+
+ ++partitionsAsked; // add this partition to reading
const ui32 ccount = Min<ui32>(part.MsgLag * LAG_GROW_MULTIPLIER, count);
count -= ccount;
+
ui64 csize = (ui64)Min<double>(part.SizeLag * LAG_GROW_MULTIPLIER, size);
if constexpr (!UseMigrationProtocol) {
csize = Min<i64>(csize, ReadSizeBudget);
}
+
size -= csize;
Y_VERIFY(csize < Max<i32>());
auto jt = ReadFromTimestamp.find(it->second.Topic->GetInternalName());
if (jt == ReadFromTimestamp.end()) {
- LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Error searching for topic: " << it->second.Topic->GetInternalName()
- << " (" << it->second.Topic->GetPrintableString() << ")");
- for (const auto& [k, v] : ReadFromTimestamp) {
- const auto& kk = k;
- LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << "Have topic: " << kk);
+ LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " error searching for topic"
+ << ": internalName# " << it->second.Topic->GetInternalName()
+ << ", prettyName# " << it->second.Topic->GetPrintableString());
+
+ for (const auto& kv : ReadFromTimestamp) {
+ LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " have topic"
+ << ": topic# " << kv.first);
}
- CloseSession(TStringBuilder() << "Internal error", PersQueue::ErrorCode::ERROR, ctx);
- return;
+
+ return CloseSession(PersQueue::ErrorCode::ERROR, "internal error", ctx);
}
- ui64 readTimestampMs = Max(ReadTimestampMs, jt->second);
- auto lags_it = MaxLagByTopic.find(it->second.Topic->GetInternalName());
- Y_VERIFY(lags_it != MaxLagByTopic.end());
- ui32 maxLag = lags_it->second;
+ ui64 readTimestampMs = Max(ReadTimestampMs, jt->second);
- TAutoPtr<TEvPQProxy::TEvRead> read = new TEvPQProxy::TEvRead(guid, ccount, csize, maxLag, readTimestampMs);
+ auto lagsIt = MaxLagByTopic.find(it->second.Topic->GetInternalName());
+ Y_VERIFY(lagsIt != MaxLagByTopic.end());
+ const ui32 maxLag = lagsIt->second;
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX
- << " performing read request with guid " << read->Guid
- << " from " << it->second.Partition << " count " << ccount << " size " << csize
- << " partitionsAsked " << partitionsAsked << " maxTimeLag " << maxLag << "ms");
+ auto ev = MakeHolder<TEvPQProxy::TEvRead>(guid, ccount, csize, maxLag, readTimestampMs);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " performing read request"
+ << ": guid# " << ev->Guid
+ << ", from# " << it->second.Partition
+ << ", count# " << ccount
+ << ", size# " << csize
+ << ", partitionsAsked# " << partitionsAsked
+ << ", maxTimeLag# " << maxLag << "ms");
Y_VERIFY(!it->second.Reading);
it->second.Reading = true;
+
formedResponse->PartitionsTookPartInRead.insert(it->second.Actor);
- auto pp = it->second.Partition;
- pp.AssignId = 0;
- PartitionToControlMessages[pp].Infly++;
- bool res = formedResponse->PartitionsTookPartInControlMessages.insert(pp).second;
+ auto id = it->second.Partition;
+ id.AssignId = 0;
+ PartitionToControlMessages[id].Infly++;
+
+ bool res = formedResponse->PartitionsTookPartInControlMessages.insert(id).second;
Y_VERIFY(res);
RequestedBytes += csize;
formedResponse->RequestedBytes += csize;
ReadSizeBudget -= csize;
- ctx.Send(it->second.Actor, read.Release());
- const auto insertResult = PartitionToReadResponse.insert(std::make_pair(it->second.Actor, formedResponse));
- Y_VERIFY(insertResult.second);
+ ctx.Send(it->second.Actor, ev.Release());
+ res = PartitionToReadResponse.emplace(it->second.Actor, formedResponse).second;
+ Y_VERIFY(res);
- // TODO (ildar-khisam@): Gather data from all partitions;
- // For now send messages only from single partition
+ // TODO (ildar-khisam@): Gather data from all partitions.
+ // For now send messages only from single partition.
if constexpr (!UseMigrationProtocol) {
break;
}
- if (count == 0 || size == 0)
+ if (count == 0 || size == 0) {
break;
+ }
}
- if (partitionsAsked == 0)
+ if (partitionsAsked == 0) {
break;
+ }
+
ReadsTotal.Inc();
formedResponse->RequestsInfly = partitionsAsked;
-
ReadsInfly++;
i64 diff = formedResponse->Response.ByteSize();
BytesInflight_ += diff;
formedResponse->ByteSize = diff;
+
if (BytesInflight) {
(*BytesInflight) += diff;
}
+
if constexpr (UseMigrationProtocol) {
Reads.pop_front();
}
}
}
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvPartitionReady::TPtr& ev, const TActorContext& ctx) {
-
- if (!ActualPartitionActor(ev->Sender))
+ if (!ActualPartitionActors.contains(ev->Sender)) {
return;
+ }
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << ev->Get()->Partition
- << " ready for read with readOffset "
- << ev->Get()->ReadOffset << " endOffset " << ev->Get()->EndOffset << " WTime "
- << ev->Get()->WTime << " sizeLag " << ev->Get()->SizeLag);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " partition ready for read"
+ << ": partition# " << ev->Get()->Partition
+ << ", readOffset# " << ev->Get()->ReadOffset
+ << ", endOffset# " << ev->Get()->EndOffset
+ << ", WTime# " << ev->Get()->WTime
+ << ", sizeLag# " << ev->Get()->SizeLag);
- const auto it = PartitionToReadResponse.find(ev->Sender); // check whether this partition is taking part in read response
+ auto it = PartitionToReadResponse.find(ev->Sender); // check whether this partition is taking part in read response
auto& container = it != PartitionToReadResponse.end() ? it->second->PartitionsBecameAvailable : AvailablePartitions;
- auto res = container.insert(TPartitionInfo{ev->Get()->Partition.AssignId, ev->Get()->WTime, ev->Get()->SizeLag,
- ev->Get()->EndOffset - ev->Get()->ReadOffset});
- Y_VERIFY(res.second);
- ProcessReads(ctx);
-}
+ bool res = container.emplace(
+ ev->Get()->Partition.AssignId,
+ ev->Get()->WTime,
+ ev->Get()->SizeLag,
+ ev->Get()->EndOffset - ev->Get()->ReadOffset).second;
+ Y_VERIFY(res);
-template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const TActorContext& ctx) {
- CloseSession(ev->Get()->Reason, ev->Get()->ErrorCode, ctx);
+ ProcessReads(ctx);
}
-
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag);
OnWakeup(tag);
@@ -1892,7 +1809,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e
return RecheckACL(ctx);
case EWakeupTag::RlAllowed:
- ProcessAnswer(ctx, PendingQuota);
+ ProcessAnswer(PendingQuota, ctx);
if (!WaitingQuota.empty()) {
PendingQuota = WaitingQuota.front();
WaitingQuota.pop_front();
@@ -1908,27 +1825,34 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e
if (PendingQuota) {
Y_VERIFY(MaybeRequestQuota(PendingQuota->RequiredQuota, EWakeupTag::RlAllowed, ctx));
} else {
- return CloseSession("Throughput limit exceeded", PersQueue::ErrorCode::OVERLOAD, ctx);
+ return CloseSession(PersQueue::ErrorCode::OVERLOAD, "throughput limit exceeded", ctx);
}
break;
}
}
-template<bool UseMigrationProtocol>
+template <bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) {
ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
- if (Token && !AuthInitActor && (ForceACLCheck || (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()) && RequestNotChecked))) {
+
+ const auto timeout = TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec());
+ const bool authTimedOut = (ctx.Now() - LastACLCheckTimestamp) > timeout;
+
+ if (Token && !AuthInitActor && (ForceACLCheck || (authTimedOut && RequestNotChecked))) {
ForceACLCheck = false;
RequestNotChecked = false;
- Y_VERIFY(!AuthInitActor);
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " checking auth because of timeout");
- AuthInitActor = ctx.Register(new TReadInitAndAuthActor(
- ctx, ctx.SelfID, ClientId, Cookie, Session, SchemeCache, NewSchemeCache, Counters, Token, TopicsList,
- TopicsHandler.GetLocalCluster()
- ));
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " checking auth because of timeout");
+ RunAuthActor(ctx);
}
}
-} // namespace NGRpcProxy::V1
-} // namespace NKikimr
+template <bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::RunAuthActor(const TActorContext& ctx) {
+ Y_VERIFY(!AuthInitActor);
+ AuthInitActor = ctx.Register(new TReadInitAndAuthActor(
+ ctx, ctx.SelfID, ClientId, Cookie, Session, SchemeCache, NewSchemeCache, Counters, Token, TopicsList,
+ TopicsHandler.GetLocalCluster()));
+}
+
+}