diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-09-07 20:05:51 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-09-07 20:05:51 +0300 |
commit | 16b02fce99d375f4464e3f4fdcca7991e1f67cf2 (patch) | |
tree | 52d14e0b1ed8f127ac17e83f0620259f1b376287 | |
parent | 0c635f47721cc2d5ce2e3b9e748684d4209cefc3 (diff) | |
download | ydb-16b02fce99d375f4464e3f4fdcca7991e1f67cf2.tar.gz |
Quoting & metering using RUs
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 1 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_check_actor.h | 11 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_calls.h | 4 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 7 | ||||
-rw-r--r-- | ydb/core/persqueue/user_info.h | 14 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.cpp | 68 | ||||
-rw-r--r-- | ydb/services/datastreams/grpc_service.cpp | 78 | ||||
-rw-r--r-- | ydb/services/datastreams/put_records_actor.h | 59 | ||||
-rw-r--r-- | ydb/services/lib/actors/CMakeLists.txt | 3 | ||||
-rw-r--r-- | ydb/services/lib/actors/pq_rl_helpers.cpp | 78 | ||||
-rw-r--r-- | ydb/services/lib/actors/pq_rl_helpers.h | 41 | ||||
-rw-r--r-- | ydb/services/lib/actors/type_definitions.h | 2 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp | 3 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.h | 24 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/read_session_actor.ipp | 190 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.h | 81 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.ipp | 167 |
17 files changed, 622 insertions, 209 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index bf144b5099..3908453123 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -313,6 +313,7 @@ enum class TRateLimiterMode : ui8 { Rps = 1, Ru = 2, RuOnProgress = 3, + RuManual = 4, }; #define RLSWITCH(mode) \ diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h index 869e3ea23e..e073ea2677 100644 --- a/ydb/core/grpc_services/grpc_request_check_actor.h +++ b/ydb/core/grpc_services/grpc_request_check_actor.h @@ -145,6 +145,14 @@ public: } ); + // Just set RlPath + static NRpcService::TRlConfig ruRlManualConfig( + "serverless_rt_coordination_node_path", + "serverless_rt_base_resource_ru", + { + // no actions + } + ); auto rlMode = Request_->Get()->GetRlMode(); switch (rlMode) { @@ -157,6 +165,9 @@ public: case TRateLimiterMode::RuOnProgress: RlConfig = &ruRlProgressConfig; break; + case TRateLimiterMode::RuManual: + RlConfig = &ruRlManualConfig; + break; case TRateLimiterMode::Off: break; } diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index 1112e86d23..5216aacb43 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -55,8 +55,8 @@ using TEvBiStreamPingRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvBiStr using TEvExperimentalStreamQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExperimentalStreamQuery, Ydb::Experimental::ExecuteStreamQueryRequest, Ydb::Experimental::ExecuteStreamQueryResponse, false>; using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>; using TEvStreamPQMigrationReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQMigrationRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>; -using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer, TRateLimiterMode::Ru>; -using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer, TRateLimiterMode::Ru>; +using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer, TRateLimiterMode::RuManual>; +using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer, TRateLimiterMode::RuManual>; using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>; using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>; using TEvPQCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQCreateTopic, Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse, true>; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index e4f1e6447a..f715e73c9e 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -2757,6 +2757,10 @@ void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) { auto userInfo = UsersInfoStorage.GetIfExists(info.User); Y_VERIFY(userInfo); + if (Config.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS) { + return; + } + if (userInfo->ReadSpeedLimiter) { Send( userInfo->ReadSpeedLimiter->Actor, @@ -4831,6 +4835,9 @@ void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& c } size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) { + if (Config.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS) { + return 0; + } if (AppData()->PQConfig.GetQuotingConfig().GetTopicWriteQuotaEntityToLimit() == NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE) { return WriteNewSize; diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h index d0e4253f59..0a2957ab9c 100644 --- a/ydb/core/persqueue/user_info.h +++ b/ydb/core/persqueue/user_info.h @@ -93,8 +93,7 @@ public: return QuotedTime; } - void UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) - { + void UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) { SpeedPerSecond = speedPerSecond; MaxBurst = maxBurst; AvailableSize = maxBurst; @@ -103,19 +102,24 @@ public: void Update(const TInstant& timestamp); bool CanExaust() const { - return AvailableSize > 0; + if (SpeedPerSecond) { + return AvailableSize > 0; + } else { + return true; + } } void Exaust(const ui64 size, const TInstant& timestamp) { Update(timestamp); - AvailableSize -= (i64)size; + if (SpeedPerSecond) { + AvailableSize -= (i64)size; + } Update(timestamp); } ui64 GetAvailableAvgSec(const TInstant& timestamp) { Update(timestamp); return AvgSec.GetAvg(); - } ui64 GetAvailableAvgMin(const TInstant& timestamp) { diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index dab04f1d85..1a6fa1b645 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -11,6 +11,7 @@ #include <ydb/core/persqueue/write_meta.h> #include <ydb/public/api/protos/ydb_topic.pb.h> +#include <ydb/services/lib/actors/pq_rl_helpers.h> #include <ydb/services/lib/actors/pq_schema_actor.h> #include <ydb/services/lib/sharding/sharding.h> #include <ydb/services/persqueue_v1/actors/persqueue_utils.h> @@ -1275,10 +1276,12 @@ namespace NKikimr::NDataStreams::V1 { //----------------------------------------------------------------------------------- class TGetRecordsActor : public TPQGrpcSchemaBase<TGetRecordsActor, TEvDataStreamsGetRecordsRequest> + , private TRlHelpers , public TCdcStreamCompatible { using TBase = TPQGrpcSchemaBase<TGetRecordsActor, TEvDataStreamsGetRecordsRequest>; using TProtoRequest = typename TBase::TProtoRequest; + using EWakeupTag = TRlHelpers::EWakeupTag; static constexpr ui32 READ_TIMEOUT_MS = 150; static constexpr i32 MAX_LIMIT = 10000; @@ -1293,25 +1296,28 @@ namespace NKikimr::NDataStreams::V1 { void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx); + void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); void Die(const TActorContext& ctx) override; private: void SendReadRequest(const TActorContext& ctx); - void SendResponse(const TActorContext& ctx, - const std::vector<Ydb::DataStreams::V1::Record>& records, - ui64 millisBehindLatestMs); + void PrepareResponse(const std::vector<Ydb::DataStreams::V1::Record>& records, ui64 millisBehindLatestMs); + void SendResponse(const TActorContext& ctx); + ui64 GetPayloadSize() const; TShardIterator ShardIterator; TString StreamName; ui64 TabletId; i32 Limit; TActorId PipeClient; + Ydb::DataStreams::V1::GetRecordsResult Result; }; TGetRecordsActor::TGetRecordsActor(NKikimr::NGRpcService::IRequestOpCtx* request) : TBase(request, TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).IsValid() ? TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).GetStreamName() : "undefined") + , TRlHelpers(request, 8_KB, TDuration::Seconds(1)) , ShardIterator{GetRequest<TProtoRequest>(request)->shard_iterator()} , StreamName{ShardIterator.IsValid() ? ShardIterator.GetStreamName() : "undefined"} , TabletId{0} @@ -1373,6 +1379,7 @@ namespace NKikimr::NDataStreams::V1 { HFunc(TEvPersQueue::TEvResponse, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); HFunc(TEvTabletPipe::TEvClientConnected, Handle); + HFunc(TEvents::TEvWakeup, Handle); default: TBase::StateWork(ev, ctx); } } @@ -1397,6 +1404,8 @@ namespace NKikimr::NDataStreams::V1 { } if (response.Self->Info.GetPathType() == NKikimrSchemeOp::EPathTypePersQueueGroup) { + SetMeteringMode(response.PQGroupInfo->Description.GetPQTabletConfig().GetMeteringMode()); + const auto& partitions = response.PQGroupInfo->Description.GetPartitions(); for (auto& partition : partitions) { auto partitionId = partition.GetPartitionId(); @@ -1418,7 +1427,13 @@ namespace NKikimr::NDataStreams::V1 { switch (record.GetErrorCode()) { case NPersQueue::NErrorCode::READ_ERROR_TOO_SMALL_OFFSET: case NPersQueue::NErrorCode::READ_ERROR_TOO_BIG_OFFSET: - return SendResponse(ctx, {}, 0); + PrepareResponse({}, 0); + if (IsQuotaRequired()) { + Y_VERIFY(MaybeRequestQuota(1, EWakeupTag::RlAllowed, ctx)); + } else { + SendResponse(ctx); + } + return; default: return ReplyWithError(ConvertPersQueueInternalCodeToStatus(record.GetErrorCode()), Ydb::PersQueue::ErrorCode::ERROR, @@ -1449,7 +1464,13 @@ namespace NKikimr::NDataStreams::V1 { : 0; } - SendResponse(ctx, records, millisBehindLatestMs); + PrepareResponse(records, millisBehindLatestMs); + if (IsQuotaRequired()) { + const auto ru = 1 + CalcRuConsumption(GetPayloadSize()); + Y_VERIFY(MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx)); + } else { + SendResponse(ctx); + } } void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { @@ -1464,12 +1485,20 @@ namespace NKikimr::NDataStreams::V1 { TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx); } - void TGetRecordsActor::SendResponse(const TActorContext& ctx, - const std::vector<Ydb::DataStreams::V1::Record>& records, - ui64 millisBehindLatestMs) { - Ydb::DataStreams::V1::GetRecordsResult result; + void TGetRecordsActor::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { + switch (static_cast<EWakeupTag>(ev->Get()->Tag)) { + case EWakeupTag::RlAllowed: + return SendResponse(ctx); + case EWakeupTag::RlNoResource: + return ReplyWithResult(Ydb::StatusIds::OVERLOADED, ctx); + default: + return HandleWakeup(ev, ctx); + } + } + + void TGetRecordsActor::PrepareResponse(const std::vector<Ydb::DataStreams::V1::Record>& records, ui64 millisBehindLatestMs) { for (auto& r : records) { - auto record = result.add_records(); + auto record = Result.add_records(); *record = r; } @@ -1481,13 +1510,26 @@ namespace NKikimr::NDataStreams::V1 { ShardIterator.GetStreamArn(), ShardIterator.GetShardId(), timestamp, seqNo, ShardIterator.GetKind()); - result.set_next_shard_iterator(shardIterator.Serialize()); - result.set_millis_behind_latest(millisBehindLatestMs); + Result.set_next_shard_iterator(shardIterator.Serialize()); + Result.set_millis_behind_latest(millisBehindLatestMs); + } - Request_->SendResult(result, Ydb::StatusIds::SUCCESS); + void TGetRecordsActor::SendResponse(const TActorContext& ctx) { + Request_->SendResult(Result, Ydb::StatusIds::SUCCESS); Die(ctx); } + ui64 TGetRecordsActor::GetPayloadSize() const { + ui64 result = 0; + + for (const auto& record : Result.records()) { + result += record.data().size() + + record.partition_key().size(); + } + + return result; + } + void TGetRecordsActor::Die(const TActorContext& ctx) { NTabletPipe::CloseClient(ctx, PipeClient); TBase::Die(ctx); diff --git a/ydb/services/datastreams/grpc_service.cpp b/ydb/services/datastreams/grpc_service.cpp index 73d350834f..d16040ef59 100644 --- a/ydb/services/datastreams/grpc_service.cpp +++ b/ydb/services/datastreams/grpc_service.cpp @@ -46,48 +46,48 @@ void TGRpcDataStreamsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, CB, ATTR) \ +#define ADD_REQUEST(NAME, CB, ATTR, LIMIT_TYPE) \ MakeIntrusive<TGRpcRequest<Ydb::DataStreams::V1::NAME##Request, Ydb::DataStreams::V1::NAME##Response, TGRpcDataStreamsService>> \ - (this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ActorSystem_->Send(GRpcRequestProxyId_, \ - new TGrpcRequestOperationCall<Ydb::DataStreams::V1::NAME##Request, Ydb::DataStreams::V1::NAME##Response> \ - (ctx, CB, TRequestAuxSettings{TRateLimiterMode::Off, ATTR})); \ - }, &Ydb::DataStreams::V1::DataStreamsService::AsyncService::Request ## NAME, \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Ydb::DataStreams::V1::NAME##Request, Ydb::DataStreams::V1::NAME##Response> \ + (ctx, CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), ATTR})); \ + }, &Ydb::DataStreams::V1::DataStreamsService::AsyncService::Request ## NAME, \ #NAME, logger, getCounterBlock("data_streams", #NAME))->Run(); - ADD_REQUEST(DescribeStream, DoDataStreamsDescribeStreamRequest, nullptr) - ADD_REQUEST(CreateStream, DoDataStreamsCreateStreamRequest, nullptr) - ADD_REQUEST(ListStreams, DoDataStreamsListStreamsRequest, nullptr) - ADD_REQUEST(DeleteStream, DoDataStreamsDeleteStreamRequest, nullptr) - ADD_REQUEST(ListShards, DoDataStreamsListShardsRequest, nullptr) - ADD_REQUEST(PutRecord, DoDataStreamsPutRecordRequest, YdsProcessAttr) - ADD_REQUEST(PutRecords, DoDataStreamsPutRecordsRequest, YdsProcessAttr) - ADD_REQUEST(GetRecords, DoDataStreamsGetRecordsRequest, nullptr) - ADD_REQUEST(GetShardIterator, DoDataStreamsGetShardIteratorRequest, nullptr) - ADD_REQUEST(SubscribeToShard, DoDataStreamsSubscribeToShardRequest, nullptr) - ADD_REQUEST(DescribeLimits, DoDataStreamsDescribeLimitsRequest, nullptr) - ADD_REQUEST(DescribeStreamSummary, DoDataStreamsDescribeStreamSummaryRequest, nullptr) - ADD_REQUEST(DecreaseStreamRetentionPeriod, DoDataStreamsDecreaseStreamRetentionPeriodRequest, nullptr) - ADD_REQUEST(IncreaseStreamRetentionPeriod, DoDataStreamsIncreaseStreamRetentionPeriodRequest, nullptr) - ADD_REQUEST(UpdateShardCount, DoDataStreamsUpdateShardCountRequest, nullptr) - ADD_REQUEST(UpdateStreamMode, DoDataStreamsUpdateStreamModeRequest, nullptr) - ADD_REQUEST(RegisterStreamConsumer, DoDataStreamsRegisterStreamConsumerRequest, nullptr) - ADD_REQUEST(DeregisterStreamConsumer, DoDataStreamsDeregisterStreamConsumerRequest, nullptr) - ADD_REQUEST(DescribeStreamConsumer, DoDataStreamsDescribeStreamConsumerRequest, nullptr) - ADD_REQUEST(ListStreamConsumers, DoDataStreamsListStreamConsumersRequest, nullptr) - ADD_REQUEST(AddTagsToStream, DoDataStreamsAddTagsToStreamRequest, nullptr) - ADD_REQUEST(DisableEnhancedMonitoring, DoDataStreamsDisableEnhancedMonitoringRequest, nullptr) - ADD_REQUEST(EnableEnhancedMonitoring, DoDataStreamsEnableEnhancedMonitoringRequest, nullptr) - ADD_REQUEST(ListTagsForStream, DoDataStreamsListTagsForStreamRequest, nullptr) - ADD_REQUEST(MergeShards, DoDataStreamsMergeShardsRequest, nullptr) - ADD_REQUEST(RemoveTagsFromStream, DoDataStreamsRemoveTagsFromStreamRequest, nullptr) - ADD_REQUEST(SplitShard, DoDataStreamsSplitShardRequest, nullptr) - ADD_REQUEST(StartStreamEncryption, DoDataStreamsStartStreamEncryptionRequest, nullptr) - ADD_REQUEST(StopStreamEncryption, DoDataStreamsStopStreamEncryptionRequest, nullptr) - ADD_REQUEST(UpdateStream, DoDataStreamsUpdateStreamRequest, nullptr) - ADD_REQUEST(SetWriteQuota, DoDataStreamsSetWriteQuotaRequest, nullptr) + ADD_REQUEST(DescribeStream, DoDataStreamsDescribeStreamRequest, nullptr, Off) + ADD_REQUEST(CreateStream, DoDataStreamsCreateStreamRequest, nullptr, Off) + ADD_REQUEST(ListStreams, DoDataStreamsListStreamsRequest, nullptr, Off) + ADD_REQUEST(DeleteStream, DoDataStreamsDeleteStreamRequest, nullptr, Off) + ADD_REQUEST(ListShards, DoDataStreamsListShardsRequest, nullptr, Off) + ADD_REQUEST(PutRecord, DoDataStreamsPutRecordRequest, YdsProcessAttr, RuManual) + ADD_REQUEST(PutRecords, DoDataStreamsPutRecordsRequest, YdsProcessAttr, RuManual) + ADD_REQUEST(GetRecords, DoDataStreamsGetRecordsRequest, nullptr, RuManual) + ADD_REQUEST(GetShardIterator, DoDataStreamsGetShardIteratorRequest, nullptr, Off) + ADD_REQUEST(SubscribeToShard, DoDataStreamsSubscribeToShardRequest, nullptr, Off) + ADD_REQUEST(DescribeLimits, DoDataStreamsDescribeLimitsRequest, nullptr, Off) + ADD_REQUEST(DescribeStreamSummary, DoDataStreamsDescribeStreamSummaryRequest, nullptr, Off) + ADD_REQUEST(DecreaseStreamRetentionPeriod, DoDataStreamsDecreaseStreamRetentionPeriodRequest, nullptr, Off) + ADD_REQUEST(IncreaseStreamRetentionPeriod, DoDataStreamsIncreaseStreamRetentionPeriodRequest, nullptr, Off) + ADD_REQUEST(UpdateShardCount, DoDataStreamsUpdateShardCountRequest, nullptr, Off) + ADD_REQUEST(UpdateStreamMode, DoDataStreamsUpdateStreamModeRequest, nullptr, Off) + ADD_REQUEST(RegisterStreamConsumer, DoDataStreamsRegisterStreamConsumerRequest, nullptr, Off) + ADD_REQUEST(DeregisterStreamConsumer, DoDataStreamsDeregisterStreamConsumerRequest, nullptr, Off) + ADD_REQUEST(DescribeStreamConsumer, DoDataStreamsDescribeStreamConsumerRequest, nullptr, Off) + ADD_REQUEST(ListStreamConsumers, DoDataStreamsListStreamConsumersRequest, nullptr, Off) + ADD_REQUEST(AddTagsToStream, DoDataStreamsAddTagsToStreamRequest, nullptr, Off) + ADD_REQUEST(DisableEnhancedMonitoring, DoDataStreamsDisableEnhancedMonitoringRequest, nullptr, Off) + ADD_REQUEST(EnableEnhancedMonitoring, DoDataStreamsEnableEnhancedMonitoringRequest, nullptr, Off) + ADD_REQUEST(ListTagsForStream, DoDataStreamsListTagsForStreamRequest, nullptr, Off) + ADD_REQUEST(MergeShards, DoDataStreamsMergeShardsRequest, nullptr, Off) + ADD_REQUEST(RemoveTagsFromStream, DoDataStreamsRemoveTagsFromStreamRequest, nullptr, Off) + ADD_REQUEST(SplitShard, DoDataStreamsSplitShardRequest, nullptr, Off) + ADD_REQUEST(StartStreamEncryption, DoDataStreamsStartStreamEncryptionRequest, nullptr, Off) + ADD_REQUEST(StopStreamEncryption, DoDataStreamsStopStreamEncryptionRequest, nullptr, Off) + ADD_REQUEST(UpdateStream, DoDataStreamsUpdateStreamRequest, nullptr, Off) + ADD_REQUEST(SetWriteQuota, DoDataStreamsSetWriteQuotaRequest, nullptr, Off) #undef ADD_REQUEST } diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h index cc8e201b95..c8bb8ca970 100644 --- a/ydb/services/datastreams/put_records_actor.h +++ b/ydb/services/datastreams/put_records_actor.h @@ -8,6 +8,7 @@ #include <ydb/core/protos/msgbus_pq.pb.h> #include <ydb/core/protos/grpc_pq_old.pb.h> +#include <ydb/services/lib/actors/pq_rl_helpers.h> #include <ydb/services/lib/actors/pq_schema_actor.h> #include <ydb/services/lib/sharding/sharding.h> @@ -212,7 +213,10 @@ namespace NKikimr::NDataStreams::V1 { } template<class TDerived, class TProto> - class TPutRecordsActorBase : public NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto> { + class TPutRecordsActorBase + : public NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto> + , private NGRpcProxy::V1::TRlHelpers + { using TBase = NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto>; public: @@ -224,7 +228,9 @@ namespace NKikimr::NDataStreams::V1 { void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); protected: + void Write(const TActorContext& ctx); void AddRecord(THashMap<ui32, TVector<TPutRecordsItem>>& items, ui32 totalShardsCount, int index); + ui64 GetPayloadSize() const; private: struct TPartitionTask { @@ -232,6 +238,7 @@ namespace NKikimr::NDataStreams::V1 { std::vector<ui32> RecordIndexes; }; + TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo; THashMap<ui32, TPartitionTask> PartitionToActor; Ydb::DataStreams::V1::PutRecordsResult PutRecordsResult; @@ -247,6 +254,7 @@ namespace NKikimr::NDataStreams::V1 { STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { HFunc(NDataStreams::V1::TEvDataStreams::TEvPartitionActorResult, Handle); + HFunc(TEvents::TEvWakeup, Handle); default: TBase::StateWork(ev, ctx); }; } @@ -255,6 +263,7 @@ namespace NKikimr::NDataStreams::V1 { template<class TDerived, class TProto> TPutRecordsActorBase<TDerived, TProto>::TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request) : TBase(request, dynamic_cast<const typename TProto::TRequest*>(request->GetRequest())->stream_name()) + , TRlHelpers(request, 4_KB, TDuration::Seconds(1)) , Ip(request->GetPeerName()) { Y_ENSURE(request); @@ -321,8 +330,21 @@ namespace NKikimr::NDataStreams::V1 { } } + PQGroupInfo = topicInfo->PQGroupInfo; + SetMeteringMode(PQGroupInfo->Description.GetPQTabletConfig().GetMeteringMode()); + + if (IsQuotaRequired()) { + const auto ru = 1 + CalcRuConsumption(GetPayloadSize()); + Y_VERIFY(MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx)); + } else { + Write(ctx); + } + } + + template<class TDerived, class TProto> + void TPutRecordsActorBase<TDerived, TProto>::Write(const TActorContext& ctx) { THashMap<ui32, TVector<TPutRecordsItem>> items; - const auto& pqDescription = topicInfo->PQGroupInfo->Description; + const auto& pqDescription = PQGroupInfo->Description; ui32 totalShardsCount = pqDescription.GetPartitions().size(); for (int i = 0; i < static_cast<TDerived*>(this)->GetPutRecordsRequest().records_size(); ++i) { PutRecordsResult.add_records(); @@ -380,6 +402,22 @@ namespace NKikimr::NDataStreams::V1 { } template<class TDerived, class TProto> + void TPutRecordsActorBase<TDerived, TProto>::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { + switch (static_cast<EWakeupTag>(ev->Get()->Tag)) { + case EWakeupTag::RlAllowed: + return Write(ctx); + case EWakeupTag::RlNoResource: + PutRecordsResult.set_failed_record_count(static_cast<TDerived*>(this)->GetPutRecordsRequest().records_size()); + for (int i = 0; i < PutRecordsResult.failed_record_count(); ++i) { + PutRecordsResult.add_records()->set_error_code("ThrottlingException"); + } + this->CheckFinish(ctx); + default: + return this->HandleWakeup(ev, ctx); + } + } + + template<class TDerived, class TProto> void TPutRecordsActorBase<TDerived, TProto>::AddRecord(THashMap<ui32, TVector<TPutRecordsItem>>& items, ui32 totalShardsCount, int index) { const auto& record = static_cast<TDerived*>(this)->GetPutRecordsRequest().records(index); ui32 shard = 0; @@ -394,6 +432,19 @@ namespace NKikimr::NDataStreams::V1 { PartitionToActor[shard].RecordIndexes.push_back(index); } + template<class TDerived, class TProto> + ui64 TPutRecordsActorBase<TDerived, TProto>::GetPayloadSize() const { + ui64 result = 0; + + for (const auto& record : static_cast<const TDerived*>(this)->GetPutRecordsRequest().records()) { + result += record.data().size() + + record.explicit_hash_key().size() + + record.partition_key().size(); + } + + return result; + } + class TPutRecordsActor : public TPutRecordsActorBase<TPutRecordsActor, NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest> { public: using TBase = TPutRecordsActorBase<TPutRecordsActor, NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest>; @@ -451,7 +502,9 @@ namespace NKikimr::NDataStreams::V1 { result.set_encryption_type(Ydb::DataStreams::V1::EncryptionType::NONE); return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx); } else { - if (putRecordsResult.records(0).error_code() == "ProvisionedThroughputExceededException") { + if (putRecordsResult.records(0).error_code() == "ProvisionedThroughputExceededException" + || putRecordsResult.records(0).error_code() == "ThrottlingException") + { return ReplyWithResult(Ydb::StatusIds::OVERLOADED, ctx); } //TODO: other codes - access denied and so on diff --git a/ydb/services/lib/actors/CMakeLists.txt b/ydb/services/lib/actors/CMakeLists.txt index 0682dcbf46..ff1baa06b0 100644 --- a/ydb/services/lib/actors/CMakeLists.txt +++ b/ydb/services/lib/actors/CMakeLists.txt @@ -14,7 +14,9 @@ target_link_libraries(services-lib-actors PUBLIC cpp-grpc-server cpp-digest-md5 ydb-core-grpc_services + core-grpc_services-base ydb-core-mind + ydb-core-protos library-persqueue-obfuscate library-persqueue-topic_parser api-grpc @@ -24,4 +26,5 @@ target_link_libraries(services-lib-actors PUBLIC ) target_sources(services-lib-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_schema_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_rl_helpers.cpp ) diff --git a/ydb/services/lib/actors/pq_rl_helpers.cpp b/ydb/services/lib/actors/pq_rl_helpers.cpp new file mode 100644 index 0000000000..2ee5df7238 --- /dev/null +++ b/ydb/services/lib/actors/pq_rl_helpers.cpp @@ -0,0 +1,78 @@ +#include "pq_rl_helpers.h" + +#include <ydb/core/grpc_services/base/base.h> + +namespace NKikimr::NGRpcProxy::V1 { + +TRlHelpers::TRlHelpers(NGRpcService::IRequestCtxBase* reqCtx, ui64 blockSize, const TDuration& waitDuration) + : Request(reqCtx) + , BlockSize(blockSize) + , WaitDuration(waitDuration) + , PayloadBytes(0) +{ + Y_VERIFY(Request); +} + +bool TRlHelpers::IsQuotaRequired() const { + Y_VERIFY(MeteringMode.Defined()); + return MeteringMode == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS && Request->GetRlPath(); +} + +bool TRlHelpers::MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx) { + if (RlActor) { + return false; + } + + const auto selfId = ctx.SelfID; + const auto as = ctx.ActorSystem(); + + auto onSendAllowed = [selfId, as, tag]() { + as->Send(selfId, new TEvents::TEvWakeup(tag)); + }; + + auto onSendTimeout = [selfId, as]() { + as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::RlNoResource)); + }; + + RlActor = NRpcService::RateLimiterAcquireUseSameMailbox( + *Request, amount, WaitDuration, + std::move(onSendAllowed), std::move(onSendTimeout), ctx); + return true; +} + +void TRlHelpers::OnWakeup(EWakeupTag tag) { + switch (tag) { + case EWakeupTag::RlInit: + case EWakeupTag::RlAllowed: + case EWakeupTag::RlNoResource: + RlActor = {}; + break; + default: + break; + } +} + +const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& TRlHelpers::GetMeteringMode() const { + return MeteringMode; +} + +void TRlHelpers::SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode) { + MeteringMode = mode; +} + +ui64 TRlHelpers::CalcRuConsumption(ui64 payloadSize) { + if (!IsQuotaRequired()) { + return 0; + } + + const ui64 remainder = BlockSize - (PayloadBytes % BlockSize); + PayloadBytes += payloadSize; + + if (payloadSize > remainder) { + return Max<ui64>(1, (payloadSize - remainder) / BlockSize); + } + + return 0; +} + +} diff --git a/ydb/services/lib/actors/pq_rl_helpers.h b/ydb/services/lib/actors/pq_rl_helpers.h new file mode 100644 index 0000000000..02b1f43698 --- /dev/null +++ b/ydb/services/lib/actors/pq_rl_helpers.h @@ -0,0 +1,41 @@ +#pragma once + +#include <ydb/core/grpc_services/local_rate_limiter.h> +#include <ydb/core/protos/pqconfig.pb.h> + +#include <util/datetime/base.h> + +namespace NKikimr::NGRpcProxy::V1 { + +class TRlHelpers { +public: + explicit TRlHelpers(NGRpcService::IRequestCtxBase* reqCtx, ui64 blockSize, const TDuration& waitDuration); + +protected: + enum EWakeupTag: ui64 { + RlInit = 0, + RlAllowed = 1, + RlNoResource = 2, + RecheckAcl = 3, + }; + + bool IsQuotaRequired() const; + bool MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx); + void OnWakeup(EWakeupTag tag); + + const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& GetMeteringMode() const; + void SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode); + + ui64 CalcRuConsumption(ui64 payloadSize); + +private: + NGRpcService::IRequestCtxBase* const Request; + const ui64 BlockSize; + const TDuration WaitDuration; + + ui64 PayloadBytes; + TActorId RlActor; + TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode; +}; + +} diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h index a942b32f8d..89e0168a71 100644 --- a/ydb/services/lib/actors/type_definitions.h +++ b/ydb/services/lib/actors/type_definitions.h @@ -14,6 +14,7 @@ namespace NKikimr::NGRpcProxy { TString CloudId; TString DbId; TString FolderId; + NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; NPersQueue::TDiscoveryConverterPtr DiscoveryConverter; NPersQueue::TTopicConverterPtr FullConverter; TMaybe<TString> CdcStreamPath; @@ -34,6 +35,7 @@ namespace NKikimr::NGRpcProxy { TString CloudId; TString DbId; TString FolderId; + NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; }; using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>; diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp index 99bb8f1751..34e6522250 100644 --- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp +++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp @@ -97,6 +97,7 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse( topicsIter->second.CloudId = pqDescr.GetPQTabletConfig().GetYcCloudId(); topicsIter->second.DbId = pqDescr.GetPQTabletConfig().GetYdbDatabaseId(); topicsIter->second.FolderId = pqDescr.GetPQTabletConfig().GetYcFolderId(); + topicsIter->second.MeteringMode = pqDescr.GetPQTabletConfig().GetMeteringMode(); if (!topicsIter->second.DiscoveryConverter->IsValid()) { TString errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503", topicsIter->second.DiscoveryConverter->GetPrintableString().c_str()); @@ -261,7 +262,7 @@ void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) { TTopicInitInfoMap res; for (auto& [name, holder] : Topics) { res.insert(std::make_pair(name, TTopicInitInfo{ - holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId + holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId, holder.MeteringMode })); } ctx.Send(ParentId, new TEvPQProxy::TEvAuthResultOk(std::move(res))); diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h index de20aee7bd..d9e048394d 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.h +++ b/ydb/services/persqueue_v1/actors/read_session_actor.h @@ -1,9 +1,8 @@ #pragma once #include "events.h" -#include "persqueue_utils.h" - #include "partition_actor.h" +#include "persqueue_utils.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> @@ -11,6 +10,7 @@ #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/persqueue/events/global.h> +#include <ydb/services/lib/actors/pq_rl_helpers.h> #include <util/generic/guid.h> #include <util/system/compiler.h> @@ -90,6 +90,10 @@ struct TFormedReadResponse: public TSimpleRefCount<TFormedReadResponse<TServerMe i64 ByteSize = 0; ui64 RequestedBytes = 0; + bool HasMessages = false; + i64 ByteSizeBeforeFiltering = 0; + ui64 RequiredQuota = 0; + //returns byteSize diff i64 ApplyResponse(TServerMessage&& resp); @@ -108,7 +112,10 @@ struct TFormedReadResponse: public TSimpleRefCount<TFormedReadResponse<TServerMe template<bool UseMigrationProtocol> -class TReadSessionActor : public TActorBootstrapped<TReadSessionActor<UseMigrationProtocol>> { +class TReadSessionActor + : public TActorBootstrapped<TReadSessionActor<UseMigrationProtocol>> + , private TRlHelpers +{ using TClientMessage = typename std::conditional_t<UseMigrationProtocol, PersQueue::V1::MigrationStreamingReadClientMessage, Topic::StreamReadMessage::FromClient>; using TServerMessage = typename std::conditional_t<UseMigrationProtocol, PersQueue::V1::MigrationStreamingReadServerMessage, Topic::StreamReadMessage::FromServer>; @@ -132,6 +139,7 @@ private: static constexpr ui32 MAX_INFLY_READS = 10; static constexpr ui64 MAX_READ_SIZE = 100 << 20; //100mb; + static constexpr ui64 READ_BLOCK_SIZE = 8_KB; // metering static constexpr double LAG_GROW_MULTIPLIER = 1.2; //assume that 20% more data arrived to partitions @@ -152,7 +160,7 @@ public: private: STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { - CFunc(NActors::TEvents::TSystem::Wakeup, HandleWakeup) + HFunc(TEvents::TEvWakeup, Handle); HFunc(IContext::TEvReadFinished, Handle); HFunc(IContext::TEvWriteFinished, Handle); @@ -194,6 +202,7 @@ private: }; } + ui64 PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse); // returns estimated response's size bool WriteResponse(TServerMessage&& response, bool finish = false); void Handle(typename IContext::TEvReadFinished::TPtr& ev, const TActorContext &ctx); @@ -231,9 +240,11 @@ private: [[nodiscard]] bool ProcessBalancerDead(const ui64 tabletId, const NActors::TActorContext& ctx); // returns false if actor died void HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const NActors::TActorContext& ctx); - void HandleWakeup(const NActors::TActorContext& ctx); + void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const NActors::TActorContext& ctx); + void RecheckACL(const TActorContext& ctx); + void InitSession(const TActorContext& ctx); void CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx); @@ -280,7 +291,6 @@ private: TString PeerName; bool CommitsDisabled; - bool BalancersInitStarted; bool InitDone; bool RangesMode = false; @@ -316,6 +326,8 @@ private: THashMap<TActorId, typename TFormedReadResponse<TServerMessage>::TPtr> PartitionToReadResponse; // Partition actor -> TFormedReadResponse answer that has this partition. // PartitionsTookPartInRead in formed read response contain this actor id. + typename TFormedReadResponse<TServerMessage>::TPtr PendingQuota; // response that currenly pending quota + std::deque<typename TFormedReadResponse<TServerMessage>::TPtr> WaitingQuota; // responses that will be quoted next struct TControlMessages { TVector<TServerMessage> ControlMessages; diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index 0e175427c2..a71056406a 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -36,7 +36,8 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(TEvStreamPQReadReques TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC, const NPersQueue::TTopicsListController& topicsHandler) - : Request(request) + : TRlHelpers(request, READ_BLOCK_SIZE, TDuration::Minutes(1)) + , Request(request) , ClientDC(clientDC ? *clientDC : "other") , StartTimestamp(TInstant::Now()) , SchemeCache(schemeCache) @@ -46,7 +47,6 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(TEvStreamPQReadReques , ClientPath() , Session() , CommitsDisabled(false) - , BalancersInitStarted(false) , InitDone(false) , MaxReadMessagesCount(0) , MaxReadSize(0) @@ -899,26 +899,6 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk SLIBigLatency.Inc(); } - TServerMessage result; - result.set_status(Ydb::StatusIds::SUCCESS); - - result.mutable_init_response()->set_session_id(Session); - if (!WriteResponse(std::move(result))) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); - Die(ctx); - return; - } - - if (!Request->GetStreamCtx()->Read()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); - Die(ctx); - return; - } - - - Y_VERIFY(!BalancersInitStarted); - BalancersInitStarted = true; - for (auto& [name, t] : ev->Get()->TopicAndTablets) { // ToDo - return something from Init and Auth Actor (Full Path - ?) auto internalName = t.TopicNameConverter->GetInternalName(); auto topicGrIter = TopicGroups.find(name); @@ -945,42 +925,79 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk topicHolder.CloudId = t.CloudId; topicHolder.DbId = t.DbId; topicHolder.FolderId = t.FolderId; + topicHolder.MeteringMode = t.MeteringMode; FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter; FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter; - } - for (auto& t : Topics) { - NTabletPipe::TClientConfig clientConfig; - - clientConfig.CheckAliveness = false; - - clientConfig.RetryPolicy = RetryPolicyForPipes; - t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig)); - - Y_VERIFY(t.second.FullConverter); - auto it = TopicGroups.find(t.second.FullConverter->GetInternalName()); - if (it != TopicGroups.end()) { - t.second.Groups = it->second; + if (!GetMeteringMode()) { + SetMeteringMode(t.MeteringMode); + } else if (*GetMeteringMode() != t.MeteringMode) { + return CloseSession("Cannot read from topics with different metering modes", + PersQueue::ErrorCode::BAD_REQUEST, ctx); } } - RegisterSessions(ctx); - - ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup()); + if (IsQuotaRequired()) { + Y_VERIFY(MaybeRequestQuota(1, EWakeupTag::RlInit, ctx)); + } else { + InitSession(ctx); + } } else { for (auto& [name, t] : ev->Get()->TopicAndTablets) { - if (Topics.find(t.TopicNameConverter->GetInternalName()) == Topics.end()) { - CloseSession( + auto it = Topics.find(t.TopicNameConverter->GetInternalName()); + if (it == Topics.end()) { + return CloseSession( TStringBuilder() << "list of topics changed - new topic '" << t.TopicNameConverter->GetPrintableString() << "' found", PersQueue::ErrorCode::BAD_REQUEST, ctx ); - return; + } else if (it->second.MeteringMode != *GetMeteringMode()) { + return CloseSession( + TStringBuilder() << "Metering mode of topic: " << name << " has been changed", + PersQueue::ErrorCode::OVERLOAD, ctx + ); } } } } +template<bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& ctx) { + TServerMessage result; + result.set_status(Ydb::StatusIds::SUCCESS); + + result.mutable_init_response()->set_session_id(Session); + if (!WriteResponse(std::move(result))) { + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed"); + Die(ctx); + return; + } + + if (!Request->GetStreamCtx()->Read()) { + LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start"); + Die(ctx); + return; + } + + for (auto& t : Topics) { + NTabletPipe::TClientConfig clientConfig; + + clientConfig.CheckAliveness = false; + + clientConfig.RetryPolicy = RetryPolicyForPipes; + t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig)); + + Y_VERIFY(t.second.FullConverter); + auto it = TopicGroups.find(t.second.FullConverter->GetInternalName()); + if (it != TopicGroups.end()) { + t.second.Groups = it->second; + } + } + + RegisterSessions(ctx); + + ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); +} template<bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx) { @@ -1390,11 +1407,20 @@ void TReadSessionActor<UseMigrationProtocol>::ReleasePartition(const typename TH DropPartition(it, ctx); //partition will be dropped if (formedResponseToAnswer) { - ProcessAnswer(ctx, formedResponseToAnswer); // returns false if actor died + if (const auto ru = CalcRuConsumption(PrepareResponse(formedResponseToAnswer))) { + formedResponseToAnswer->RequiredQuota = ru; + if (MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx)) { + Y_VERIFY(!PendingQuota); + PendingQuota = formedResponseToAnswer; + } else { + WaitingQuota.push_back(formedResponseToAnswer); + } + } else { + ProcessAnswer(ctx, formedResponseToAnswer); // returns false if actor died + } } } - template<bool UseMigrationProtocol> bool TReadSessionActor<UseMigrationProtocol>::ProcessBalancerDead(const ui64 tablet, const TActorContext& ctx) { for (auto& t : Topics) { @@ -1572,7 +1598,17 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::T } if (formedResponse->RequestsInfly == 0) { - ProcessAnswer(ctx, formedResponse); + if (const auto ru = CalcRuConsumption(PrepareResponse(formedResponse))) { + formedResponse->RequiredQuota = ru; + if (MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx)) { + Y_VERIFY(!PendingQuota); + PendingQuota = formedResponse; + } else { + WaitingQuota.push_back(formedResponse); + } + } else { + ProcessAnswer(ctx, formedResponse); + } } } @@ -1589,6 +1625,19 @@ bool TReadSessionActor<UseMigrationProtocol>::WriteResponse(TServerMessage&& res } template<bool UseMigrationProtocol> +ui64 TReadSessionActor<UseMigrationProtocol>::PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse) { + formedResponse->ByteSizeBeforeFiltering = formedResponse->Response.ByteSize(); + + if constexpr(UseMigrationProtocol) { + formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch()); + } else { + formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response()); + } + + return formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0; +} + +template<bool UseMigrationProtocol> void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& ctx, typename TFormedReadResponse<TServerMessage>::TPtr formedResponse) { ui32 readDurationMs = (ctx.Now() - formedResponse->Start - formedResponse->WaitQuotaTime).MilliSeconds(); if (formedResponse->FromDisk) { @@ -1601,19 +1650,13 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& } Y_VERIFY(formedResponse->RequestsInfly == 0); - const ui64 diff = formedResponse->Response.ByteSize(); - bool hasMessages; - if constexpr(UseMigrationProtocol) { - hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch()); - } else { - hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response()); - } - ui64 sizeEstimation = hasMessages ? formedResponse->Response.ByteSize() : 0; + const ui64 diff = formedResponse->ByteSizeBeforeFiltering; + ui64 sizeEstimation = formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0; if constexpr (!UseMigrationProtocol) { formedResponse->Response.mutable_read_response()->set_bytes_size(sizeEstimation); } - if (hasMessages) { + if (formedResponse->HasMessages) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " response to read " << formedResponse->Guid); if (!WriteResponse(std::move(formedResponse->Response))) { @@ -1661,7 +1704,7 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& AvailablePartitions.insert(formedResponse->PartitionsBecameAvailable.begin(), formedResponse->PartitionsBecameAvailable.end()); if constexpr (UseMigrationProtocol) { - if (!hasMessages) { + if (!formedResponse->HasMessages) { // process new read // Start new reading request with the same guid Reads.emplace_back(new TEvPQProxy::TEvRead(formedResponse->Guid)); @@ -1836,8 +1879,43 @@ void TReadSessionActor<UseMigrationProtocol>::HandlePoison(TEvPQProxy::TEvDieCom template<bool UseMigrationProtocol> -void TReadSessionActor<UseMigrationProtocol>::HandleWakeup(const TActorContext& ctx) { - ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup()); +void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { + const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag); + OnWakeup(tag); + + switch (tag) { + case EWakeupTag::RlInit: + return InitSession(ctx); + + case EWakeupTag::RecheckAcl: + return RecheckACL(ctx); + + case EWakeupTag::RlAllowed: + ProcessAnswer(ctx, PendingQuota); + if (!WaitingQuota.empty()) { + PendingQuota = WaitingQuota.front(); + WaitingQuota.pop_front(); + } else { + PendingQuota = nullptr; + } + if (PendingQuota) { + Y_VERIFY(MaybeRequestQuota(PendingQuota->RequiredQuota, EWakeupTag::RlAllowed, ctx)); + } + break; + + case EWakeupTag::RlNoResource: + if (PendingQuota) { + Y_VERIFY(MaybeRequestQuota(PendingQuota->RequiredQuota, EWakeupTag::RlAllowed, ctx)); + } else { + return CloseSession("Throughput limit exceeded", PersQueue::ErrorCode::OVERLOAD, ctx); + } + break; + } +} + +template<bool UseMigrationProtocol> +void TReadSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) { + ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); if (Token && !AuthInitActor && (ForceACLCheck || (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()) && RequestNotChecked))) { ForceACLCheck = false; RequestNotChecked = false; diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 2952da3970..99b026acee 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -9,12 +9,11 @@ #include <ydb/core/client/server/msgbus_server_pq_metacache.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/kqp/kqp.h> - #include <ydb/core/persqueue/events/global.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/persqueue/writer/writer.h> - #include <ydb/core/protos/grpc_pq_old.pb.h> +#include <ydb/services/lib/actors/pq_rl_helpers.h> namespace NKikimr::NGRpcProxy::V1 { @@ -24,7 +23,10 @@ inline TActorId GetPQWriteServiceActorID() { } template<bool UseMigrationProtocol> -class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor<UseMigrationProtocol>> { +class TWriteSessionActor + : public NActors::TActorBootstrapped<TWriteSessionActor<UseMigrationProtocol>> + , private TRlHelpers +{ using TSelf = TWriteSessionActor<UseMigrationProtocol>; using TClientMessage = std::conditional_t<UseMigrationProtocol, PersQueue::V1::StreamingWriteClientMessage, Topic::StreamWriteMessage::FromClient>; @@ -49,6 +51,33 @@ class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor using TEvDescribeTopicsResponse = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse; using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsRequest; + struct TWriteRequestInfo: public TSimpleRefCount<TWriteRequestInfo> { + using TPtr = TIntrusivePtr<TWriteRequestInfo>; + + explicit TWriteRequestInfo(ui64 cookie) + : PartitionWriteRequest(new NPQ::TEvPartitionWriter::TEvWriteRequest(cookie)) + , Cookie(cookie) + , ByteSize(0) + , RequiredQuota(0) + { + } + + // Source requests from user (grpc session object) + std::deque<THolder<TEvWrite>> UserWriteRequests; + + // Partition write request + THolder<NPQ::TEvPartitionWriter::TEvWriteRequest> PartitionWriteRequest; + + // Formed write request's cookie + ui64 Cookie; + + // Formed write request's size + ui64 ByteSize; + + // Quota in term of RUs + ui64 RequiredQuota; + }; + // Codec ID size in bytes static constexpr ui32 CODEC_ID_SIZE = 1; @@ -67,7 +96,7 @@ public: private: STFUNC(StateFunc) { switch (ev->GetTypeRewrite()) { - CFunc(NActors::TEvents::TSystem::Wakeup, HandleWakeup); + HFunc(TEvents::TEvWakeup, Handle); HFunc(IContext::TEvReadFinished, Handle); HFunc(IContext::TEvWriteFinished, Handle); @@ -113,6 +142,7 @@ private: void TryCloseSession(const TActorContext& ctx); void CheckACL(const TActorContext& ctx); + void RecheckACL(const TActorContext& ctx); // Requests fresh ACL from 'SchemeCache' void InitCheckSchema(const TActorContext& ctx, bool needWaitSchema = false); void Handle(typename TEvWriteInit::TPtr& ev, const NActors::TActorContext& ctx); @@ -144,13 +174,14 @@ private: void Handle(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const NActors::TActorContext& ctx); void HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const NActors::TActorContext& ctx); - void HandleWakeup(const NActors::TActorContext& ctx); + void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); void CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx); void CheckFinish(const NActors::TActorContext& ctx); - void GenerateNextWriteRequest(const NActors::TActorContext& ctx); + void PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx); + void SendRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx); void SetupCounters(); void SetupCounters(const TString& cloudId, const TString& dbId, const TString& folderId); @@ -190,31 +221,18 @@ private: TString OwnerCookie; TString UserAgent; - ui32 NumReserveBytesRequests; - THolder<TAclWrapper> ACL; - struct TWriteRequestBatchInfo: public TSimpleRefCount<TWriteRequestBatchInfo> { - using TPtr = TIntrusivePtr<TWriteRequestBatchInfo>; - - // Source requests from user (grpc session object) - std::deque<THolder<TEvWrite>> UserWriteRequests; - - // Formed write request's size - ui64 ByteSize = 0; - - // Formed write request's cookie - ui64 Cookie = 0; - }; - - // Nonprocessed source client requests - std::deque<THolder<TEvWrite>> Writes; - - // Formed, but not sent, batch requests to partition actor - std::deque<typename TWriteRequestBatchInfo::TPtr> FormedWrites; - - // Requests that is already sent to partition actor - std::deque<typename TWriteRequestBatchInfo::TPtr> SentMessages; + // Future batch request to partition actor + typename TWriteRequestInfo::TPtr PendingRequest; + // Request that is waiting for quota + typename TWriteRequestInfo::TPtr PendingQuotaRequest; + // Quoted, but not sent requests + std::deque<typename TWriteRequestInfo::TPtr> QuotedRequests; + // Requests that is sent to partition actor, but not accepted + std::deque<typename TWriteRequestInfo::TPtr> SentRequests; + // Accepted requests + std::deque<typename TWriteRequestInfo::TPtr> AcceptedRequests; bool WritesDone; @@ -230,14 +248,15 @@ private: ui64 BytesInflightTotal_; bool NextRequestInited; + ui64 NextRequestCookie; + + ui64 PayloadBytes; NKikimr::NPQ::TMultiCounter SessionsCreated; NKikimr::NPQ::TMultiCounter SessionsActive; NKikimr::NPQ::TMultiCounter Errors; - ui64 NextRequestCookie; - TIntrusivePtr<NACLib::TUserToken> Token; TString Auth; // Got 'update_token_request', authentication or authorization in progress, diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 36d2613676..43af563e85 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -155,6 +155,9 @@ static const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5; static const ui32 MAX_BYTES_INFLIGHT = 1_MB; static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1); +// metering +static const ui64 WRITE_BLOCK_SIZE = 4_KB; + //TODO: add here tracking of bytes in/out @@ -165,7 +168,8 @@ TWriteSessionActor<UseMigrationProtocol>::TWriteSessionActor( TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC, const NPersQueue::TTopicsListController& topicsController ) - : Request(request) + : TRlHelpers(request, WRITE_BLOCK_SIZE, TDuration::Minutes(1)) + , Request(request) , State(ES_CREATED) , SchemeCache(schemeCache) , PeerName("") @@ -173,7 +177,6 @@ TWriteSessionActor<UseMigrationProtocol>::TWriteSessionActor( , TopicsController(topicsController) , Partition(0) , PreferedPartition(Max<ui32>()) - , NumReserveBytesRequests(0) , WritesDone(false) , Counters(counters) , BytesInflight_(0) @@ -320,7 +323,7 @@ void TWriteSessionActor<UseMigrationProtocol>::CheckFinish(const TActorContext& CloseSession("out of order Writes done before initialization", PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } - if (Writes.empty() && FormedWrites.empty() && SentMessages.empty()) { + if (!PendingRequest && !PendingQuotaRequest && QuotedRequests.empty() && SentRequests.empty() & AcceptedRequests.empty()) { CloseSession("", PersQueue::ErrorCode::OK, ctx); return; } @@ -545,7 +548,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse: return; } Y_VERIFY(entry.PQGroupInfo); // checked at ProcessMetaCacheTopicResponse() - auto& description = entry.PQGroupInfo->Description; + const auto& description = entry.PQGroupInfo->Description; Y_VERIFY(description.PartitionsSize() > 0); Y_VERIFY(description.HasPQTabletConfig()); InitialPQTabletConfig = description.GetPQTabletConfig(); @@ -577,6 +580,13 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse: ACL.Reset(new TAclWrapper(entry.SecurityObject)); LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " describe result for acl check"); + const auto meteringMode = description.GetPQTabletConfig().GetMeteringMode(); + if (meteringMode != GetMeteringMode().GetOrElse(meteringMode)) { + return CloseSession("Metering mode has been changed", PersQueue::ErrorCode::OVERLOAD, ctx); + } + + SetMeteringMode(meteringMode); + if (Request->GetInternalToken().empty()) { // session without auth if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { Request->ReplyUnauthenticated("Unauthenticated access is forbidden, please provide credentials"); @@ -589,9 +599,13 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse: } else { Y_VERIFY(Request->GetYdbToken()); Auth = *Request->GetYdbToken(); - Token = new NACLib::TUserToken(Request->GetInternalToken()); - CheckACL(ctx); + + if (FirstACLCheck && IsQuotaRequired()) { + Y_VERIFY(MaybeRequestQuota(1, EWakeupTag::RlInit, ctx)); + } else { + CheckACL(ctx); + } } } @@ -966,7 +980,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T State = ES_INITED; - ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup()); + ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); //init completed; wait for first data chunk NextRequestInited = true; @@ -983,18 +997,18 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T return CloseSession("got write permission but not wait for it", PersQueue::ErrorCode::ERROR, ctx); } - Y_VERIFY(!FormedWrites.empty()); - typename TWriteRequestBatchInfo::TPtr writeRequest = std::move(FormedWrites.front()); + Y_VERIFY(!SentRequests.empty()); + auto writeRequest = std::move(SentRequests.front()); if (ev->Get()->Cookie != writeRequest->Cookie) { return CloseSession("out of order reserve bytes response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx); } - FormedWrites.pop_front(); + SentRequests.pop_front(); ui64 diff = writeRequest->ByteSize; - SentMessages.emplace_back(std::move(writeRequest)); + AcceptedRequests.emplace_back(std::move(writeRequest)); BytesInflight_ -= diff; BytesInflight.Dec(diff); @@ -1008,9 +1022,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } } - --NumReserveBytesRequests; - if (!Writes.empty()) - GenerateNextWriteRequest(ctx); + if (!IsQuotaRequired() && PendingRequest) { + SendRequest(std::move(PendingRequest), ctx); + } else if (!QuotedRequests.empty()) { + SendRequest(std::move(QuotedRequests.front()), ctx); + QuotedRequests.pop_front(); + } } template<bool UseMigrationProtocol> @@ -1031,13 +1048,13 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T const auto& resp = result.Record.GetPartitionResponse(); - if (SentMessages.empty()) { + if (AcceptedRequests.empty()) { CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx); return; } - typename TWriteRequestBatchInfo::TPtr writeRequest = std::move(SentMessages.front()); - SentMessages.pop_front(); + auto writeRequest = std::move(AcceptedRequests.front()); + AcceptedRequests.pop_front(); if (resp.GetCookie() != writeRequest->Cookie) { return CloseSession("out of order write response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx); @@ -1176,14 +1193,13 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientDe } template<bool UseMigrationProtocol> -void TWriteSessionActor<UseMigrationProtocol>::GenerateNextWriteRequest(const TActorContext& ctx) { - typename TWriteRequestBatchInfo::TPtr writeRequest = new TWriteRequestBatchInfo(); - - auto ev = MakeHolder<NPQ::TEvPartitionWriter::TEvWriteRequest>(++NextRequestCookie); - NKikimrClient::TPersQueueRequest& request = ev->Record; - - writeRequest->UserWriteRequests = std::move(Writes); - Writes.clear(); +void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx) { + if (!PendingRequest) { + PendingRequest = new TWriteRequestInfo(++NextRequestCookie); + } + + auto& request = PendingRequest->PartitionWriteRequest->Record; + ui64 payloadSize = 0; auto addDataMigration = [&](const StreamingWriteClientMessage::WriteRequest& writeRequest, const i32 messageIndex) { auto w = request.MutablePartitionRequest()->AddCmdWrite(); @@ -1193,6 +1209,7 @@ void TWriteSessionActor<UseMigrationProtocol>::GenerateNextWriteRequest(const TA w->SetCreateTimeMS(writeRequest.created_at_ms(messageIndex)); w->SetUncompressedSize(writeRequest.blocks_uncompressed_sizes(messageIndex)); w->SetClientDC(ClientDC); + payloadSize += w->GetData().size() + w->GetSourceId().size(); }; auto addData = [&](const Topic::StreamWriteMessage::WriteRequest& writeRequest, const i32 messageIndex) { @@ -1203,39 +1220,54 @@ void TWriteSessionActor<UseMigrationProtocol>::GenerateNextWriteRequest(const TA w->SetCreateTimeMS(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(writeRequest.messages(messageIndex).created_at())); w->SetUncompressedSize(writeRequest.messages(messageIndex).uncompressed_size()); w->SetClientDC(ClientDC); + payloadSize += w->GetData().size() + w->GetSourceId().size(); }; - i64 diff = 0; + const auto& writeRequest = ev->Request.write_request(); + if constexpr (UseMigrationProtocol) { + for (i32 messageIndex = 0; messageIndex != writeRequest.sequence_numbers_size(); ++messageIndex) { + addDataMigration(writeRequest, messageIndex); + } + } else { + for (i32 messageIndex = 0; messageIndex != writeRequest.messages_size(); ++messageIndex) { + addData(writeRequest, messageIndex); + } + } - for (const auto& write : writeRequest->UserWriteRequests) { - diff -= write->Request.ByteSize(); - const auto& writeRequest = write->Request.write_request(); + PendingRequest->UserWriteRequests.push_back(std::move(ev)); + PendingRequest->ByteSize = request.ByteSize(); - if constexpr (UseMigrationProtocol) { - for (i32 messageIndex = 0; messageIndex != writeRequest.sequence_numbers_size(); ++messageIndex) { - addDataMigration(writeRequest, messageIndex); - } - } else { - for (i32 messageIndex = 0; messageIndex != writeRequest.messages_size(); ++messageIndex) { - addData(writeRequest, messageIndex); - } + if (const auto ru = CalcRuConsumption(payloadSize)) { + PendingRequest->RequiredQuota += ru; + if (MaybeRequestQuota(PendingRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)) { + Y_VERIFY(!PendingQuotaRequest); + PendingQuotaRequest = std::move(PendingRequest); + } + } else { + if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { + SendRequest(std::move(PendingRequest), ctx); } } +} - writeRequest->Cookie = request.GetPartitionRequest().GetCookie(); +template<bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::SendRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx) { + Y_VERIFY(request->PartitionWriteRequest); + + i64 diff = 0; + for (const auto& w : request->UserWriteRequests) { + diff -= w->Request.ByteSize(); + } Y_VERIFY(-diff <= (i64)BytesInflight_); - diff += request.ByteSize(); + diff += request->PartitionWriteRequest->Record.ByteSize(); BytesInflight_ += diff; BytesInflightTotal_ += diff; BytesInflight.Inc(diff); BytesInflightTotal.Inc(diff); - writeRequest->ByteSize = request.ByteSize(); - FormedWrites.push_back(writeRequest); - - ctx.Send(Writer, std::move(ev)); - ++NumReserveBytesRequests; + ctx.Send(Writer, std::move(request->PartitionWriteRequest)); + SentRequests.push_back(std::move(request)); } template<bool UseMigrationProtocol> @@ -1410,10 +1442,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e } - THolder<TEvWrite> event(ev->Release()); - Writes.push_back(std::move(event)); - - ui64 diff = Writes.back()->Request.ByteSize(); + ui64 diff = ev->Get()->Request.ByteSize(); BytesInflight_ += diff; BytesInflightTotal_ += diff; BytesInflight.Inc(diff); @@ -1431,9 +1460,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e NextRequestInited = false; } - if (NumReserveBytesRequests < MAX_RESERVE_REQUESTS_INFLIGHT) { - GenerateNextWriteRequest(ctx); - } + PrepareRequest(THolder<TEvWrite>(ev->Release()), ctx); } template<bool UseMigrationProtocol> @@ -1462,9 +1489,43 @@ void TWriteSessionActor<UseMigrationProtocol>::LogSession(const TActorContext& c } template<bool UseMigrationProtocol> -void TWriteSessionActor<UseMigrationProtocol>::HandleWakeup(const TActorContext& ctx) { +void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { + const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag); + OnWakeup(tag); + + switch (tag) { + case EWakeupTag::RlInit: + return CheckACL(ctx); + + case EWakeupTag::RecheckAcl: + return RecheckACL(ctx); + + case EWakeupTag::RlAllowed: + if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { + SendRequest(std::move(PendingQuotaRequest), ctx); + } else { + QuotedRequests.push_back(std::move(PendingQuotaRequest)); + } + + if (PendingQuotaRequest = std::move(PendingRequest)) { + Y_VERIFY(MaybeRequestQuota(PendingQuotaRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)); + } + break; + + case EWakeupTag::RlNoResource: + if (PendingQuotaRequest) { + Y_VERIFY(MaybeRequestQuota(PendingQuotaRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)); + } else { + return CloseSession("Throughput limit exceeded", PersQueue::ErrorCode::OVERLOAD, ctx); + } + break; + } +} + +template<bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) { Y_VERIFY(State == ES_INITED); - ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup()); + ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl)); if (Token && !ACLCheckInProgress && RequestNotChecked && (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()))) { RequestNotChecked = false; InitCheckSchema(ctx); |