aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-09-07 20:05:51 +0300
committerilnaz <ilnaz@ydb.tech>2022-09-07 20:05:51 +0300
commit16b02fce99d375f4464e3f4fdcca7991e1f67cf2 (patch)
tree52d14e0b1ed8f127ac17e83f0620259f1b376287
parent0c635f47721cc2d5ce2e3b9e748684d4209cefc3 (diff)
downloadydb-16b02fce99d375f4464e3f4fdcca7991e1f67cf2.tar.gz
Quoting & metering using RUs
-rw-r--r--ydb/core/grpc_services/base/base.h1
-rw-r--r--ydb/core/grpc_services/grpc_request_check_actor.h11
-rw-r--r--ydb/core/grpc_services/rpc_calls.h4
-rw-r--r--ydb/core/persqueue/partition.cpp7
-rw-r--r--ydb/core/persqueue/user_info.h14
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp68
-rw-r--r--ydb/services/datastreams/grpc_service.cpp78
-rw-r--r--ydb/services/datastreams/put_records_actor.h59
-rw-r--r--ydb/services/lib/actors/CMakeLists.txt3
-rw-r--r--ydb/services/lib/actors/pq_rl_helpers.cpp78
-rw-r--r--ydb/services/lib/actors/pq_rl_helpers.h41
-rw-r--r--ydb/services/lib/actors/type_definitions.h2
-rw-r--r--ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp3
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.h24
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp190
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h81
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp167
17 files changed, 622 insertions, 209 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index bf144b5099..3908453123 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -313,6 +313,7 @@ enum class TRateLimiterMode : ui8 {
Rps = 1,
Ru = 2,
RuOnProgress = 3,
+ RuManual = 4,
};
#define RLSWITCH(mode) \
diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h
index 869e3ea23e..e073ea2677 100644
--- a/ydb/core/grpc_services/grpc_request_check_actor.h
+++ b/ydb/core/grpc_services/grpc_request_check_actor.h
@@ -145,6 +145,14 @@ public:
}
);
+ // Just set RlPath
+ static NRpcService::TRlConfig ruRlManualConfig(
+ "serverless_rt_coordination_node_path",
+ "serverless_rt_base_resource_ru",
+ {
+ // no actions
+ }
+ );
auto rlMode = Request_->Get()->GetRlMode();
switch (rlMode) {
@@ -157,6 +165,9 @@ public:
case TRateLimiterMode::RuOnProgress:
RlConfig = &ruRlProgressConfig;
break;
+ case TRateLimiterMode::RuManual:
+ RlConfig = &ruRlManualConfig;
+ break;
case TRateLimiterMode::Off:
break;
}
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index 1112e86d23..5216aacb43 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -55,8 +55,8 @@ using TEvBiStreamPingRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvBiStr
using TEvExperimentalStreamQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExperimentalStreamQuery, Ydb::Experimental::ExecuteStreamQueryRequest, Ydb::Experimental::ExecuteStreamQueryResponse, false>;
using TEvStreamPQWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQWrite, Ydb::PersQueue::V1::StreamingWriteClientMessage, Ydb::PersQueue::V1::StreamingWriteServerMessage>;
using TEvStreamPQMigrationReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamPQMigrationRead, Ydb::PersQueue::V1::MigrationStreamingReadClientMessage, Ydb::PersQueue::V1::MigrationStreamingReadServerMessage>;
-using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer, TRateLimiterMode::Ru>;
-using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer, TRateLimiterMode::Ru>;
+using TEvStreamTopicWriteRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicWrite, Ydb::Topic::StreamWriteMessage::FromClient, Ydb::Topic::StreamWriteMessage::FromServer, TRateLimiterMode::RuManual>;
+using TEvStreamTopicReadRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvStreamTopicRead, Ydb::Topic::StreamReadMessage::FromClient, Ydb::Topic::StreamReadMessage::FromServer, TRateLimiterMode::RuManual>;
using TEvPQReadInfoRequest = TGRpcRequestWrapper<TRpcServices::EvPQReadInfo, Ydb::PersQueue::V1::ReadInfoRequest, Ydb::PersQueue::V1::ReadInfoResponse, true>;
using TEvPQDropTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQDropTopic, Ydb::PersQueue::V1::DropTopicRequest, Ydb::PersQueue::V1::DropTopicResponse, true>;
using TEvPQCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvPQCreateTopic, Ydb::PersQueue::V1::CreateTopicRequest, Ydb::PersQueue::V1::CreateTopicResponse, true>;
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index e4f1e6447a..f715e73c9e 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -2757,6 +2757,10 @@ void TPartition::OnReadRequestFinished(TReadInfo&& info, ui64 answerSize) {
auto userInfo = UsersInfoStorage.GetIfExists(info.User);
Y_VERIFY(userInfo);
+ if (Config.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS) {
+ return;
+ }
+
if (userInfo->ReadSpeedLimiter) {
Send(
userInfo->ReadSpeedLimiter->Actor,
@@ -4831,6 +4835,9 @@ void TPartition::Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& c
}
size_t TPartition::GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request) {
+ if (Config.GetMeteringMode() == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS) {
+ return 0;
+ }
if (AppData()->PQConfig.GetQuotingConfig().GetTopicWriteQuotaEntityToLimit() ==
NKikimrPQ::TPQConfig::TQuotingConfig::USER_PAYLOAD_SIZE) {
return WriteNewSize;
diff --git a/ydb/core/persqueue/user_info.h b/ydb/core/persqueue/user_info.h
index d0e4253f59..0a2957ab9c 100644
--- a/ydb/core/persqueue/user_info.h
+++ b/ydb/core/persqueue/user_info.h
@@ -93,8 +93,7 @@ public:
return QuotedTime;
}
- void UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond)
- {
+ void UpdateConfig(const ui64 maxBurst, const ui64 speedPerSecond) {
SpeedPerSecond = speedPerSecond;
MaxBurst = maxBurst;
AvailableSize = maxBurst;
@@ -103,19 +102,24 @@ public:
void Update(const TInstant& timestamp);
bool CanExaust() const {
- return AvailableSize > 0;
+ if (SpeedPerSecond) {
+ return AvailableSize > 0;
+ } else {
+ return true;
+ }
}
void Exaust(const ui64 size, const TInstant& timestamp) {
Update(timestamp);
- AvailableSize -= (i64)size;
+ if (SpeedPerSecond) {
+ AvailableSize -= (i64)size;
+ }
Update(timestamp);
}
ui64 GetAvailableAvgSec(const TInstant& timestamp) {
Update(timestamp);
return AvgSec.GetAvg();
-
}
ui64 GetAvailableAvgMin(const TInstant& timestamp) {
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index dab04f1d85..1a6fa1b645 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -11,6 +11,7 @@
#include <ydb/core/persqueue/write_meta.h>
#include <ydb/public/api/protos/ydb_topic.pb.h>
+#include <ydb/services/lib/actors/pq_rl_helpers.h>
#include <ydb/services/lib/actors/pq_schema_actor.h>
#include <ydb/services/lib/sharding/sharding.h>
#include <ydb/services/persqueue_v1/actors/persqueue_utils.h>
@@ -1275,10 +1276,12 @@ namespace NKikimr::NDataStreams::V1 {
//-----------------------------------------------------------------------------------
class TGetRecordsActor : public TPQGrpcSchemaBase<TGetRecordsActor, TEvDataStreamsGetRecordsRequest>
+ , private TRlHelpers
, public TCdcStreamCompatible
{
using TBase = TPQGrpcSchemaBase<TGetRecordsActor, TEvDataStreamsGetRecordsRequest>;
using TProtoRequest = typename TBase::TProtoRequest;
+ using EWakeupTag = TRlHelpers::EWakeupTag;
static constexpr ui32 READ_TIMEOUT_MS = 150;
static constexpr i32 MAX_LIMIT = 10000;
@@ -1293,25 +1296,28 @@ namespace NKikimr::NDataStreams::V1 {
void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
void Die(const TActorContext& ctx) override;
private:
void SendReadRequest(const TActorContext& ctx);
- void SendResponse(const TActorContext& ctx,
- const std::vector<Ydb::DataStreams::V1::Record>& records,
- ui64 millisBehindLatestMs);
+ void PrepareResponse(const std::vector<Ydb::DataStreams::V1::Record>& records, ui64 millisBehindLatestMs);
+ void SendResponse(const TActorContext& ctx);
+ ui64 GetPayloadSize() const;
TShardIterator ShardIterator;
TString StreamName;
ui64 TabletId;
i32 Limit;
TActorId PipeClient;
+ Ydb::DataStreams::V1::GetRecordsResult Result;
};
TGetRecordsActor::TGetRecordsActor(NKikimr::NGRpcService::IRequestOpCtx* request)
: TBase(request, TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).IsValid()
? TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).GetStreamName()
: "undefined")
+ , TRlHelpers(request, 8_KB, TDuration::Seconds(1))
, ShardIterator{GetRequest<TProtoRequest>(request)->shard_iterator()}
, StreamName{ShardIterator.IsValid() ? ShardIterator.GetStreamName() : "undefined"}
, TabletId{0}
@@ -1373,6 +1379,7 @@ namespace NKikimr::NDataStreams::V1 {
HFunc(TEvPersQueue::TEvResponse, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ HFunc(TEvents::TEvWakeup, Handle);
default: TBase::StateWork(ev, ctx);
}
}
@@ -1397,6 +1404,8 @@ namespace NKikimr::NDataStreams::V1 {
}
if (response.Self->Info.GetPathType() == NKikimrSchemeOp::EPathTypePersQueueGroup) {
+ SetMeteringMode(response.PQGroupInfo->Description.GetPQTabletConfig().GetMeteringMode());
+
const auto& partitions = response.PQGroupInfo->Description.GetPartitions();
for (auto& partition : partitions) {
auto partitionId = partition.GetPartitionId();
@@ -1418,7 +1427,13 @@ namespace NKikimr::NDataStreams::V1 {
switch (record.GetErrorCode()) {
case NPersQueue::NErrorCode::READ_ERROR_TOO_SMALL_OFFSET:
case NPersQueue::NErrorCode::READ_ERROR_TOO_BIG_OFFSET:
- return SendResponse(ctx, {}, 0);
+ PrepareResponse({}, 0);
+ if (IsQuotaRequired()) {
+ Y_VERIFY(MaybeRequestQuota(1, EWakeupTag::RlAllowed, ctx));
+ } else {
+ SendResponse(ctx);
+ }
+ return;
default:
return ReplyWithError(ConvertPersQueueInternalCodeToStatus(record.GetErrorCode()),
Ydb::PersQueue::ErrorCode::ERROR,
@@ -1449,7 +1464,13 @@ namespace NKikimr::NDataStreams::V1 {
: 0;
}
- SendResponse(ctx, records, millisBehindLatestMs);
+ PrepareResponse(records, millisBehindLatestMs);
+ if (IsQuotaRequired()) {
+ const auto ru = 1 + CalcRuConsumption(GetPayloadSize());
+ Y_VERIFY(MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx));
+ } else {
+ SendResponse(ctx);
+ }
}
void TGetRecordsActor::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
@@ -1464,12 +1485,20 @@ namespace NKikimr::NDataStreams::V1 {
TStringBuilder() << "Cannot connect to tablet " << ev->Get()->TabletId, ctx);
}
- void TGetRecordsActor::SendResponse(const TActorContext& ctx,
- const std::vector<Ydb::DataStreams::V1::Record>& records,
- ui64 millisBehindLatestMs) {
- Ydb::DataStreams::V1::GetRecordsResult result;
+ void TGetRecordsActor::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
+ switch (static_cast<EWakeupTag>(ev->Get()->Tag)) {
+ case EWakeupTag::RlAllowed:
+ return SendResponse(ctx);
+ case EWakeupTag::RlNoResource:
+ return ReplyWithResult(Ydb::StatusIds::OVERLOADED, ctx);
+ default:
+ return HandleWakeup(ev, ctx);
+ }
+ }
+
+ void TGetRecordsActor::PrepareResponse(const std::vector<Ydb::DataStreams::V1::Record>& records, ui64 millisBehindLatestMs) {
for (auto& r : records) {
- auto record = result.add_records();
+ auto record = Result.add_records();
*record = r;
}
@@ -1481,13 +1510,26 @@ namespace NKikimr::NDataStreams::V1 {
ShardIterator.GetStreamArn(),
ShardIterator.GetShardId(),
timestamp, seqNo, ShardIterator.GetKind());
- result.set_next_shard_iterator(shardIterator.Serialize());
- result.set_millis_behind_latest(millisBehindLatestMs);
+ Result.set_next_shard_iterator(shardIterator.Serialize());
+ Result.set_millis_behind_latest(millisBehindLatestMs);
+ }
- Request_->SendResult(result, Ydb::StatusIds::SUCCESS);
+ void TGetRecordsActor::SendResponse(const TActorContext& ctx) {
+ Request_->SendResult(Result, Ydb::StatusIds::SUCCESS);
Die(ctx);
}
+ ui64 TGetRecordsActor::GetPayloadSize() const {
+ ui64 result = 0;
+
+ for (const auto& record : Result.records()) {
+ result += record.data().size()
+ + record.partition_key().size();
+ }
+
+ return result;
+ }
+
void TGetRecordsActor::Die(const TActorContext& ctx) {
NTabletPipe::CloseClient(ctx, PipeClient);
TBase::Die(ctx);
diff --git a/ydb/services/datastreams/grpc_service.cpp b/ydb/services/datastreams/grpc_service.cpp
index 73d350834f..d16040ef59 100644
--- a/ydb/services/datastreams/grpc_service.cpp
+++ b/ydb/services/datastreams/grpc_service.cpp
@@ -46,48 +46,48 @@ void TGRpcDataStreamsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger)
#ifdef ADD_REQUEST
#error ADD_REQUEST macro already defined
#endif
-#define ADD_REQUEST(NAME, CB, ATTR) \
+#define ADD_REQUEST(NAME, CB, ATTR, LIMIT_TYPE) \
MakeIntrusive<TGRpcRequest<Ydb::DataStreams::V1::NAME##Request, Ydb::DataStreams::V1::NAME##Response, TGRpcDataStreamsService>> \
- (this, &Service_, CQ_, \
- [this](NGrpc::IRequestContextBase *ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ActorSystem_->Send(GRpcRequestProxyId_, \
- new TGrpcRequestOperationCall<Ydb::DataStreams::V1::NAME##Request, Ydb::DataStreams::V1::NAME##Response> \
- (ctx, CB, TRequestAuxSettings{TRateLimiterMode::Off, ATTR})); \
- }, &Ydb::DataStreams::V1::DataStreamsService::AsyncService::Request ## NAME, \
+ (this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestOperationCall<Ydb::DataStreams::V1::NAME##Request, Ydb::DataStreams::V1::NAME##Response> \
+ (ctx, CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::LIMIT_TYPE), ATTR})); \
+ }, &Ydb::DataStreams::V1::DataStreamsService::AsyncService::Request ## NAME, \
#NAME, logger, getCounterBlock("data_streams", #NAME))->Run();
- ADD_REQUEST(DescribeStream, DoDataStreamsDescribeStreamRequest, nullptr)
- ADD_REQUEST(CreateStream, DoDataStreamsCreateStreamRequest, nullptr)
- ADD_REQUEST(ListStreams, DoDataStreamsListStreamsRequest, nullptr)
- ADD_REQUEST(DeleteStream, DoDataStreamsDeleteStreamRequest, nullptr)
- ADD_REQUEST(ListShards, DoDataStreamsListShardsRequest, nullptr)
- ADD_REQUEST(PutRecord, DoDataStreamsPutRecordRequest, YdsProcessAttr)
- ADD_REQUEST(PutRecords, DoDataStreamsPutRecordsRequest, YdsProcessAttr)
- ADD_REQUEST(GetRecords, DoDataStreamsGetRecordsRequest, nullptr)
- ADD_REQUEST(GetShardIterator, DoDataStreamsGetShardIteratorRequest, nullptr)
- ADD_REQUEST(SubscribeToShard, DoDataStreamsSubscribeToShardRequest, nullptr)
- ADD_REQUEST(DescribeLimits, DoDataStreamsDescribeLimitsRequest, nullptr)
- ADD_REQUEST(DescribeStreamSummary, DoDataStreamsDescribeStreamSummaryRequest, nullptr)
- ADD_REQUEST(DecreaseStreamRetentionPeriod, DoDataStreamsDecreaseStreamRetentionPeriodRequest, nullptr)
- ADD_REQUEST(IncreaseStreamRetentionPeriod, DoDataStreamsIncreaseStreamRetentionPeriodRequest, nullptr)
- ADD_REQUEST(UpdateShardCount, DoDataStreamsUpdateShardCountRequest, nullptr)
- ADD_REQUEST(UpdateStreamMode, DoDataStreamsUpdateStreamModeRequest, nullptr)
- ADD_REQUEST(RegisterStreamConsumer, DoDataStreamsRegisterStreamConsumerRequest, nullptr)
- ADD_REQUEST(DeregisterStreamConsumer, DoDataStreamsDeregisterStreamConsumerRequest, nullptr)
- ADD_REQUEST(DescribeStreamConsumer, DoDataStreamsDescribeStreamConsumerRequest, nullptr)
- ADD_REQUEST(ListStreamConsumers, DoDataStreamsListStreamConsumersRequest, nullptr)
- ADD_REQUEST(AddTagsToStream, DoDataStreamsAddTagsToStreamRequest, nullptr)
- ADD_REQUEST(DisableEnhancedMonitoring, DoDataStreamsDisableEnhancedMonitoringRequest, nullptr)
- ADD_REQUEST(EnableEnhancedMonitoring, DoDataStreamsEnableEnhancedMonitoringRequest, nullptr)
- ADD_REQUEST(ListTagsForStream, DoDataStreamsListTagsForStreamRequest, nullptr)
- ADD_REQUEST(MergeShards, DoDataStreamsMergeShardsRequest, nullptr)
- ADD_REQUEST(RemoveTagsFromStream, DoDataStreamsRemoveTagsFromStreamRequest, nullptr)
- ADD_REQUEST(SplitShard, DoDataStreamsSplitShardRequest, nullptr)
- ADD_REQUEST(StartStreamEncryption, DoDataStreamsStartStreamEncryptionRequest, nullptr)
- ADD_REQUEST(StopStreamEncryption, DoDataStreamsStopStreamEncryptionRequest, nullptr)
- ADD_REQUEST(UpdateStream, DoDataStreamsUpdateStreamRequest, nullptr)
- ADD_REQUEST(SetWriteQuota, DoDataStreamsSetWriteQuotaRequest, nullptr)
+ ADD_REQUEST(DescribeStream, DoDataStreamsDescribeStreamRequest, nullptr, Off)
+ ADD_REQUEST(CreateStream, DoDataStreamsCreateStreamRequest, nullptr, Off)
+ ADD_REQUEST(ListStreams, DoDataStreamsListStreamsRequest, nullptr, Off)
+ ADD_REQUEST(DeleteStream, DoDataStreamsDeleteStreamRequest, nullptr, Off)
+ ADD_REQUEST(ListShards, DoDataStreamsListShardsRequest, nullptr, Off)
+ ADD_REQUEST(PutRecord, DoDataStreamsPutRecordRequest, YdsProcessAttr, RuManual)
+ ADD_REQUEST(PutRecords, DoDataStreamsPutRecordsRequest, YdsProcessAttr, RuManual)
+ ADD_REQUEST(GetRecords, DoDataStreamsGetRecordsRequest, nullptr, RuManual)
+ ADD_REQUEST(GetShardIterator, DoDataStreamsGetShardIteratorRequest, nullptr, Off)
+ ADD_REQUEST(SubscribeToShard, DoDataStreamsSubscribeToShardRequest, nullptr, Off)
+ ADD_REQUEST(DescribeLimits, DoDataStreamsDescribeLimitsRequest, nullptr, Off)
+ ADD_REQUEST(DescribeStreamSummary, DoDataStreamsDescribeStreamSummaryRequest, nullptr, Off)
+ ADD_REQUEST(DecreaseStreamRetentionPeriod, DoDataStreamsDecreaseStreamRetentionPeriodRequest, nullptr, Off)
+ ADD_REQUEST(IncreaseStreamRetentionPeriod, DoDataStreamsIncreaseStreamRetentionPeriodRequest, nullptr, Off)
+ ADD_REQUEST(UpdateShardCount, DoDataStreamsUpdateShardCountRequest, nullptr, Off)
+ ADD_REQUEST(UpdateStreamMode, DoDataStreamsUpdateStreamModeRequest, nullptr, Off)
+ ADD_REQUEST(RegisterStreamConsumer, DoDataStreamsRegisterStreamConsumerRequest, nullptr, Off)
+ ADD_REQUEST(DeregisterStreamConsumer, DoDataStreamsDeregisterStreamConsumerRequest, nullptr, Off)
+ ADD_REQUEST(DescribeStreamConsumer, DoDataStreamsDescribeStreamConsumerRequest, nullptr, Off)
+ ADD_REQUEST(ListStreamConsumers, DoDataStreamsListStreamConsumersRequest, nullptr, Off)
+ ADD_REQUEST(AddTagsToStream, DoDataStreamsAddTagsToStreamRequest, nullptr, Off)
+ ADD_REQUEST(DisableEnhancedMonitoring, DoDataStreamsDisableEnhancedMonitoringRequest, nullptr, Off)
+ ADD_REQUEST(EnableEnhancedMonitoring, DoDataStreamsEnableEnhancedMonitoringRequest, nullptr, Off)
+ ADD_REQUEST(ListTagsForStream, DoDataStreamsListTagsForStreamRequest, nullptr, Off)
+ ADD_REQUEST(MergeShards, DoDataStreamsMergeShardsRequest, nullptr, Off)
+ ADD_REQUEST(RemoveTagsFromStream, DoDataStreamsRemoveTagsFromStreamRequest, nullptr, Off)
+ ADD_REQUEST(SplitShard, DoDataStreamsSplitShardRequest, nullptr, Off)
+ ADD_REQUEST(StartStreamEncryption, DoDataStreamsStartStreamEncryptionRequest, nullptr, Off)
+ ADD_REQUEST(StopStreamEncryption, DoDataStreamsStopStreamEncryptionRequest, nullptr, Off)
+ ADD_REQUEST(UpdateStream, DoDataStreamsUpdateStreamRequest, nullptr, Off)
+ ADD_REQUEST(SetWriteQuota, DoDataStreamsSetWriteQuotaRequest, nullptr, Off)
#undef ADD_REQUEST
}
diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h
index cc8e201b95..c8bb8ca970 100644
--- a/ydb/services/datastreams/put_records_actor.h
+++ b/ydb/services/datastreams/put_records_actor.h
@@ -8,6 +8,7 @@
#include <ydb/core/protos/msgbus_pq.pb.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
+#include <ydb/services/lib/actors/pq_rl_helpers.h>
#include <ydb/services/lib/actors/pq_schema_actor.h>
#include <ydb/services/lib/sharding/sharding.h>
@@ -212,7 +213,10 @@ namespace NKikimr::NDataStreams::V1 {
}
template<class TDerived, class TProto>
- class TPutRecordsActorBase : public NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto> {
+ class TPutRecordsActorBase
+ : public NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto>
+ , private NGRpcProxy::V1::TRlHelpers
+ {
using TBase = NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto>;
public:
@@ -224,7 +228,9 @@ namespace NKikimr::NDataStreams::V1 {
void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
protected:
+ void Write(const TActorContext& ctx);
void AddRecord(THashMap<ui32, TVector<TPutRecordsItem>>& items, ui32 totalShardsCount, int index);
+ ui64 GetPayloadSize() const;
private:
struct TPartitionTask {
@@ -232,6 +238,7 @@ namespace NKikimr::NDataStreams::V1 {
std::vector<ui32> RecordIndexes;
};
+ TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
THashMap<ui32, TPartitionTask> PartitionToActor;
Ydb::DataStreams::V1::PutRecordsResult PutRecordsResult;
@@ -247,6 +254,7 @@ namespace NKikimr::NDataStreams::V1 {
STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
HFunc(NDataStreams::V1::TEvDataStreams::TEvPartitionActorResult, Handle);
+ HFunc(TEvents::TEvWakeup, Handle);
default: TBase::StateWork(ev, ctx);
};
}
@@ -255,6 +263,7 @@ namespace NKikimr::NDataStreams::V1 {
template<class TDerived, class TProto>
TPutRecordsActorBase<TDerived, TProto>::TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request)
: TBase(request, dynamic_cast<const typename TProto::TRequest*>(request->GetRequest())->stream_name())
+ , TRlHelpers(request, 4_KB, TDuration::Seconds(1))
, Ip(request->GetPeerName())
{
Y_ENSURE(request);
@@ -321,8 +330,21 @@ namespace NKikimr::NDataStreams::V1 {
}
}
+ PQGroupInfo = topicInfo->PQGroupInfo;
+ SetMeteringMode(PQGroupInfo->Description.GetPQTabletConfig().GetMeteringMode());
+
+ if (IsQuotaRequired()) {
+ const auto ru = 1 + CalcRuConsumption(GetPayloadSize());
+ Y_VERIFY(MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx));
+ } else {
+ Write(ctx);
+ }
+ }
+
+ template<class TDerived, class TProto>
+ void TPutRecordsActorBase<TDerived, TProto>::Write(const TActorContext& ctx) {
THashMap<ui32, TVector<TPutRecordsItem>> items;
- const auto& pqDescription = topicInfo->PQGroupInfo->Description;
+ const auto& pqDescription = PQGroupInfo->Description;
ui32 totalShardsCount = pqDescription.GetPartitions().size();
for (int i = 0; i < static_cast<TDerived*>(this)->GetPutRecordsRequest().records_size(); ++i) {
PutRecordsResult.add_records();
@@ -380,6 +402,22 @@ namespace NKikimr::NDataStreams::V1 {
}
template<class TDerived, class TProto>
+ void TPutRecordsActorBase<TDerived, TProto>::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
+ switch (static_cast<EWakeupTag>(ev->Get()->Tag)) {
+ case EWakeupTag::RlAllowed:
+ return Write(ctx);
+ case EWakeupTag::RlNoResource:
+ PutRecordsResult.set_failed_record_count(static_cast<TDerived*>(this)->GetPutRecordsRequest().records_size());
+ for (int i = 0; i < PutRecordsResult.failed_record_count(); ++i) {
+ PutRecordsResult.add_records()->set_error_code("ThrottlingException");
+ }
+ this->CheckFinish(ctx);
+ default:
+ return this->HandleWakeup(ev, ctx);
+ }
+ }
+
+ template<class TDerived, class TProto>
void TPutRecordsActorBase<TDerived, TProto>::AddRecord(THashMap<ui32, TVector<TPutRecordsItem>>& items, ui32 totalShardsCount, int index) {
const auto& record = static_cast<TDerived*>(this)->GetPutRecordsRequest().records(index);
ui32 shard = 0;
@@ -394,6 +432,19 @@ namespace NKikimr::NDataStreams::V1 {
PartitionToActor[shard].RecordIndexes.push_back(index);
}
+ template<class TDerived, class TProto>
+ ui64 TPutRecordsActorBase<TDerived, TProto>::GetPayloadSize() const {
+ ui64 result = 0;
+
+ for (const auto& record : static_cast<const TDerived*>(this)->GetPutRecordsRequest().records()) {
+ result += record.data().size()
+ + record.explicit_hash_key().size()
+ + record.partition_key().size();
+ }
+
+ return result;
+ }
+
class TPutRecordsActor : public TPutRecordsActorBase<TPutRecordsActor, NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest> {
public:
using TBase = TPutRecordsActorBase<TPutRecordsActor, NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest>;
@@ -451,7 +502,9 @@ namespace NKikimr::NDataStreams::V1 {
result.set_encryption_type(Ydb::DataStreams::V1::EncryptionType::NONE);
return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx);
} else {
- if (putRecordsResult.records(0).error_code() == "ProvisionedThroughputExceededException") {
+ if (putRecordsResult.records(0).error_code() == "ProvisionedThroughputExceededException"
+ || putRecordsResult.records(0).error_code() == "ThrottlingException")
+ {
return ReplyWithResult(Ydb::StatusIds::OVERLOADED, ctx);
}
//TODO: other codes - access denied and so on
diff --git a/ydb/services/lib/actors/CMakeLists.txt b/ydb/services/lib/actors/CMakeLists.txt
index 0682dcbf46..ff1baa06b0 100644
--- a/ydb/services/lib/actors/CMakeLists.txt
+++ b/ydb/services/lib/actors/CMakeLists.txt
@@ -14,7 +14,9 @@ target_link_libraries(services-lib-actors PUBLIC
cpp-grpc-server
cpp-digest-md5
ydb-core-grpc_services
+ core-grpc_services-base
ydb-core-mind
+ ydb-core-protos
library-persqueue-obfuscate
library-persqueue-topic_parser
api-grpc
@@ -24,4 +26,5 @@ target_link_libraries(services-lib-actors PUBLIC
)
target_sources(services-lib-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_schema_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_rl_helpers.cpp
)
diff --git a/ydb/services/lib/actors/pq_rl_helpers.cpp b/ydb/services/lib/actors/pq_rl_helpers.cpp
new file mode 100644
index 0000000000..2ee5df7238
--- /dev/null
+++ b/ydb/services/lib/actors/pq_rl_helpers.cpp
@@ -0,0 +1,78 @@
+#include "pq_rl_helpers.h"
+
+#include <ydb/core/grpc_services/base/base.h>
+
+namespace NKikimr::NGRpcProxy::V1 {
+
+TRlHelpers::TRlHelpers(NGRpcService::IRequestCtxBase* reqCtx, ui64 blockSize, const TDuration& waitDuration)
+ : Request(reqCtx)
+ , BlockSize(blockSize)
+ , WaitDuration(waitDuration)
+ , PayloadBytes(0)
+{
+ Y_VERIFY(Request);
+}
+
+bool TRlHelpers::IsQuotaRequired() const {
+ Y_VERIFY(MeteringMode.Defined());
+ return MeteringMode == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS && Request->GetRlPath();
+}
+
+bool TRlHelpers::MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx) {
+ if (RlActor) {
+ return false;
+ }
+
+ const auto selfId = ctx.SelfID;
+ const auto as = ctx.ActorSystem();
+
+ auto onSendAllowed = [selfId, as, tag]() {
+ as->Send(selfId, new TEvents::TEvWakeup(tag));
+ };
+
+ auto onSendTimeout = [selfId, as]() {
+ as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::RlNoResource));
+ };
+
+ RlActor = NRpcService::RateLimiterAcquireUseSameMailbox(
+ *Request, amount, WaitDuration,
+ std::move(onSendAllowed), std::move(onSendTimeout), ctx);
+ return true;
+}
+
+void TRlHelpers::OnWakeup(EWakeupTag tag) {
+ switch (tag) {
+ case EWakeupTag::RlInit:
+ case EWakeupTag::RlAllowed:
+ case EWakeupTag::RlNoResource:
+ RlActor = {};
+ break;
+ default:
+ break;
+ }
+}
+
+const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& TRlHelpers::GetMeteringMode() const {
+ return MeteringMode;
+}
+
+void TRlHelpers::SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode) {
+ MeteringMode = mode;
+}
+
+ui64 TRlHelpers::CalcRuConsumption(ui64 payloadSize) {
+ if (!IsQuotaRequired()) {
+ return 0;
+ }
+
+ const ui64 remainder = BlockSize - (PayloadBytes % BlockSize);
+ PayloadBytes += payloadSize;
+
+ if (payloadSize > remainder) {
+ return Max<ui64>(1, (payloadSize - remainder) / BlockSize);
+ }
+
+ return 0;
+}
+
+}
diff --git a/ydb/services/lib/actors/pq_rl_helpers.h b/ydb/services/lib/actors/pq_rl_helpers.h
new file mode 100644
index 0000000000..02b1f43698
--- /dev/null
+++ b/ydb/services/lib/actors/pq_rl_helpers.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include <ydb/core/grpc_services/local_rate_limiter.h>
+#include <ydb/core/protos/pqconfig.pb.h>
+
+#include <util/datetime/base.h>
+
+namespace NKikimr::NGRpcProxy::V1 {
+
+class TRlHelpers {
+public:
+ explicit TRlHelpers(NGRpcService::IRequestCtxBase* reqCtx, ui64 blockSize, const TDuration& waitDuration);
+
+protected:
+ enum EWakeupTag: ui64 {
+ RlInit = 0,
+ RlAllowed = 1,
+ RlNoResource = 2,
+ RecheckAcl = 3,
+ };
+
+ bool IsQuotaRequired() const;
+ bool MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx);
+ void OnWakeup(EWakeupTag tag);
+
+ const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& GetMeteringMode() const;
+ void SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode);
+
+ ui64 CalcRuConsumption(ui64 payloadSize);
+
+private:
+ NGRpcService::IRequestCtxBase* const Request;
+ const ui64 BlockSize;
+ const TDuration WaitDuration;
+
+ ui64 PayloadBytes;
+ TActorId RlActor;
+ TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode;
+};
+
+}
diff --git a/ydb/services/lib/actors/type_definitions.h b/ydb/services/lib/actors/type_definitions.h
index a942b32f8d..89e0168a71 100644
--- a/ydb/services/lib/actors/type_definitions.h
+++ b/ydb/services/lib/actors/type_definitions.h
@@ -14,6 +14,7 @@ namespace NKikimr::NGRpcProxy {
TString CloudId;
TString DbId;
TString FolderId;
+ NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
NPersQueue::TDiscoveryConverterPtr DiscoveryConverter;
NPersQueue::TTopicConverterPtr FullConverter;
TMaybe<TString> CdcStreamPath;
@@ -34,6 +35,7 @@ namespace NKikimr::NGRpcProxy {
TString CloudId;
TString DbId;
TString FolderId;
+ NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
};
using TTopicInitInfoMap = THashMap<TString, TTopicInitInfo>;
diff --git a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
index 99bb8f1751..34e6522250 100644
--- a/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
@@ -97,6 +97,7 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse(
topicsIter->second.CloudId = pqDescr.GetPQTabletConfig().GetYcCloudId();
topicsIter->second.DbId = pqDescr.GetPQTabletConfig().GetYdbDatabaseId();
topicsIter->second.FolderId = pqDescr.GetPQTabletConfig().GetYcFolderId();
+ topicsIter->second.MeteringMode = pqDescr.GetPQTabletConfig().GetMeteringMode();
if (!topicsIter->second.DiscoveryConverter->IsValid()) {
TString errorReason = Sprintf("Internal server error with topic '%s', Marker# PQ503",
topicsIter->second.DiscoveryConverter->GetPrintableString().c_str());
@@ -261,7 +262,7 @@ void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) {
TTopicInitInfoMap res;
for (auto& [name, holder] : Topics) {
res.insert(std::make_pair(name, TTopicInitInfo{
- holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId
+ holder.FullConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId, holder.MeteringMode
}));
}
ctx.Send(ParentId, new TEvPQProxy::TEvAuthResultOk(std::move(res)));
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h
index de20aee7bd..d9e048394d 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.h
@@ -1,9 +1,8 @@
#pragma once
#include "events.h"
-#include "persqueue_utils.h"
-
#include "partition_actor.h"
+#include "persqueue_utils.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>
@@ -11,6 +10,7 @@
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
#include <ydb/core/persqueue/events/global.h>
+#include <ydb/services/lib/actors/pq_rl_helpers.h>
#include <util/generic/guid.h>
#include <util/system/compiler.h>
@@ -90,6 +90,10 @@ struct TFormedReadResponse: public TSimpleRefCount<TFormedReadResponse<TServerMe
i64 ByteSize = 0;
ui64 RequestedBytes = 0;
+ bool HasMessages = false;
+ i64 ByteSizeBeforeFiltering = 0;
+ ui64 RequiredQuota = 0;
+
//returns byteSize diff
i64 ApplyResponse(TServerMessage&& resp);
@@ -108,7 +112,10 @@ struct TFormedReadResponse: public TSimpleRefCount<TFormedReadResponse<TServerMe
template<bool UseMigrationProtocol>
-class TReadSessionActor : public TActorBootstrapped<TReadSessionActor<UseMigrationProtocol>> {
+class TReadSessionActor
+ : public TActorBootstrapped<TReadSessionActor<UseMigrationProtocol>>
+ , private TRlHelpers
+{
using TClientMessage = typename std::conditional_t<UseMigrationProtocol, PersQueue::V1::MigrationStreamingReadClientMessage, Topic::StreamReadMessage::FromClient>;
using TServerMessage = typename std::conditional_t<UseMigrationProtocol, PersQueue::V1::MigrationStreamingReadServerMessage, Topic::StreamReadMessage::FromServer>;
@@ -132,6 +139,7 @@ private:
static constexpr ui32 MAX_INFLY_READS = 10;
static constexpr ui64 MAX_READ_SIZE = 100 << 20; //100mb;
+ static constexpr ui64 READ_BLOCK_SIZE = 8_KB; // metering
static constexpr double LAG_GROW_MULTIPLIER = 1.2; //assume that 20% more data arrived to partitions
@@ -152,7 +160,7 @@ public:
private:
STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
- CFunc(NActors::TEvents::TSystem::Wakeup, HandleWakeup)
+ HFunc(TEvents::TEvWakeup, Handle);
HFunc(IContext::TEvReadFinished, Handle);
HFunc(IContext::TEvWriteFinished, Handle);
@@ -194,6 +202,7 @@ private:
};
}
+ ui64 PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse); // returns estimated response's size
bool WriteResponse(TServerMessage&& response, bool finish = false);
void Handle(typename IContext::TEvReadFinished::TPtr& ev, const TActorContext &ctx);
@@ -231,9 +240,11 @@ private:
[[nodiscard]] bool ProcessBalancerDead(const ui64 tabletId, const NActors::TActorContext& ctx); // returns false if actor died
void HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const NActors::TActorContext& ctx);
- void HandleWakeup(const NActors::TActorContext& ctx);
+ void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQProxy::TEvAuthResultOk::TPtr& ev, const NActors::TActorContext& ctx);
+ void RecheckACL(const TActorContext& ctx);
+ void InitSession(const TActorContext& ctx);
void CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode,
const NActors::TActorContext& ctx);
@@ -280,7 +291,6 @@ private:
TString PeerName;
bool CommitsDisabled;
- bool BalancersInitStarted;
bool InitDone;
bool RangesMode = false;
@@ -316,6 +326,8 @@ private:
THashMap<TActorId, typename TFormedReadResponse<TServerMessage>::TPtr> PartitionToReadResponse; // Partition actor -> TFormedReadResponse answer that has this partition.
// PartitionsTookPartInRead in formed read response contain this actor id.
+ typename TFormedReadResponse<TServerMessage>::TPtr PendingQuota; // response that currenly pending quota
+ std::deque<typename TFormedReadResponse<TServerMessage>::TPtr> WaitingQuota; // responses that will be quoted next
struct TControlMessages {
TVector<TServerMessage> ControlMessages;
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index 0e175427c2..a71056406a 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -36,7 +36,8 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(TEvStreamPQReadReques
TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
const TMaybe<TString> clientDC,
const NPersQueue::TTopicsListController& topicsHandler)
- : Request(request)
+ : TRlHelpers(request, READ_BLOCK_SIZE, TDuration::Minutes(1))
+ , Request(request)
, ClientDC(clientDC ? *clientDC : "other")
, StartTimestamp(TInstant::Now())
, SchemeCache(schemeCache)
@@ -46,7 +47,6 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(TEvStreamPQReadReques
, ClientPath()
, Session()
, CommitsDisabled(false)
- , BalancersInitStarted(false)
, InitDone(false)
, MaxReadMessagesCount(0)
, MaxReadSize(0)
@@ -899,26 +899,6 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk
SLIBigLatency.Inc();
}
- TServerMessage result;
- result.set_status(Ydb::StatusIds::SUCCESS);
-
- result.mutable_init_response()->set_session_id(Session);
- if (!WriteResponse(std::move(result))) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
- Die(ctx);
- return;
- }
-
- if (!Request->GetStreamCtx()->Read()) {
- LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
- Die(ctx);
- return;
- }
-
-
- Y_VERIFY(!BalancersInitStarted);
- BalancersInitStarted = true;
-
for (auto& [name, t] : ev->Get()->TopicAndTablets) { // ToDo - return something from Init and Auth Actor (Full Path - ?)
auto internalName = t.TopicNameConverter->GetInternalName();
auto topicGrIter = TopicGroups.find(name);
@@ -945,42 +925,79 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPQProxy::TEvAuthResultOk
topicHolder.CloudId = t.CloudId;
topicHolder.DbId = t.DbId;
topicHolder.FolderId = t.FolderId;
+ topicHolder.MeteringMode = t.MeteringMode;
FullPathToConverter[t.TopicNameConverter->GetPrimaryPath()] = t.TopicNameConverter;
FullPathToConverter[t.TopicNameConverter->GetSecondaryPath()] = t.TopicNameConverter;
- }
- for (auto& t : Topics) {
- NTabletPipe::TClientConfig clientConfig;
-
- clientConfig.CheckAliveness = false;
-
- clientConfig.RetryPolicy = RetryPolicyForPipes;
- t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig));
-
- Y_VERIFY(t.second.FullConverter);
- auto it = TopicGroups.find(t.second.FullConverter->GetInternalName());
- if (it != TopicGroups.end()) {
- t.second.Groups = it->second;
+ if (!GetMeteringMode()) {
+ SetMeteringMode(t.MeteringMode);
+ } else if (*GetMeteringMode() != t.MeteringMode) {
+ return CloseSession("Cannot read from topics with different metering modes",
+ PersQueue::ErrorCode::BAD_REQUEST, ctx);
}
}
- RegisterSessions(ctx);
-
- ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup());
+ if (IsQuotaRequired()) {
+ Y_VERIFY(MaybeRequestQuota(1, EWakeupTag::RlInit, ctx));
+ } else {
+ InitSession(ctx);
+ }
} else {
for (auto& [name, t] : ev->Get()->TopicAndTablets) {
- if (Topics.find(t.TopicNameConverter->GetInternalName()) == Topics.end()) {
- CloseSession(
+ auto it = Topics.find(t.TopicNameConverter->GetInternalName());
+ if (it == Topics.end()) {
+ return CloseSession(
TStringBuilder() << "list of topics changed - new topic '"
<< t.TopicNameConverter->GetPrintableString() << "' found",
PersQueue::ErrorCode::BAD_REQUEST, ctx
);
- return;
+ } else if (it->second.MeteringMode != *GetMeteringMode()) {
+ return CloseSession(
+ TStringBuilder() << "Metering mode of topic: " << name << " has been changed",
+ PersQueue::ErrorCode::OVERLOAD, ctx
+ );
}
}
}
}
+template<bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::InitSession(const TActorContext& ctx) {
+ TServerMessage result;
+ result.set_status(Ydb::StatusIds::SUCCESS);
+
+ result.mutable_init_response()->set_session_id(Session);
+ if (!WriteResponse(std::move(result))) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc write failed");
+ Die(ctx);
+ return;
+ }
+
+ if (!Request->GetStreamCtx()->Read()) {
+ LOG_INFO_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " grpc read failed at start");
+ Die(ctx);
+ return;
+ }
+
+ for (auto& t : Topics) {
+ NTabletPipe::TClientConfig clientConfig;
+
+ clientConfig.CheckAliveness = false;
+
+ clientConfig.RetryPolicy = RetryPolicyForPipes;
+ t.second.PipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, t.second.TabletID, clientConfig));
+
+ Y_VERIFY(t.second.FullConverter);
+ auto it = TopicGroups.find(t.second.FullConverter->GetInternalName());
+ if (it != TopicGroups.end()) {
+ t.second.Groups = it->second;
+ }
+ }
+
+ RegisterSessions(ctx);
+
+ ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
+}
template<bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const TActorContext& ctx) {
@@ -1390,11 +1407,20 @@ void TReadSessionActor<UseMigrationProtocol>::ReleasePartition(const typename TH
DropPartition(it, ctx); //partition will be dropped
if (formedResponseToAnswer) {
- ProcessAnswer(ctx, formedResponseToAnswer); // returns false if actor died
+ if (const auto ru = CalcRuConsumption(PrepareResponse(formedResponseToAnswer))) {
+ formedResponseToAnswer->RequiredQuota = ru;
+ if (MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx)) {
+ Y_VERIFY(!PendingQuota);
+ PendingQuota = formedResponseToAnswer;
+ } else {
+ WaitingQuota.push_back(formedResponseToAnswer);
+ }
+ } else {
+ ProcessAnswer(ctx, formedResponseToAnswer); // returns false if actor died
+ }
}
}
-
template<bool UseMigrationProtocol>
bool TReadSessionActor<UseMigrationProtocol>::ProcessBalancerDead(const ui64 tablet, const TActorContext& ctx) {
for (auto& t : Topics) {
@@ -1572,7 +1598,17 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::T
}
if (formedResponse->RequestsInfly == 0) {
- ProcessAnswer(ctx, formedResponse);
+ if (const auto ru = CalcRuConsumption(PrepareResponse(formedResponse))) {
+ formedResponse->RequiredQuota = ru;
+ if (MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx)) {
+ Y_VERIFY(!PendingQuota);
+ PendingQuota = formedResponse;
+ } else {
+ WaitingQuota.push_back(formedResponse);
+ }
+ } else {
+ ProcessAnswer(ctx, formedResponse);
+ }
}
}
@@ -1589,6 +1625,19 @@ bool TReadSessionActor<UseMigrationProtocol>::WriteResponse(TServerMessage&& res
}
template<bool UseMigrationProtocol>
+ui64 TReadSessionActor<UseMigrationProtocol>::PrepareResponse(typename TFormedReadResponse<TServerMessage>::TPtr formedResponse) {
+ formedResponse->ByteSizeBeforeFiltering = formedResponse->Response.ByteSize();
+
+ if constexpr(UseMigrationProtocol) {
+ formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch());
+ } else {
+ formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response());
+ }
+
+ return formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0;
+}
+
+template<bool UseMigrationProtocol>
void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext& ctx, typename TFormedReadResponse<TServerMessage>::TPtr formedResponse) {
ui32 readDurationMs = (ctx.Now() - formedResponse->Start - formedResponse->WaitQuotaTime).MilliSeconds();
if (formedResponse->FromDisk) {
@@ -1601,19 +1650,13 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext&
}
Y_VERIFY(formedResponse->RequestsInfly == 0);
- const ui64 diff = formedResponse->Response.ByteSize();
- bool hasMessages;
- if constexpr(UseMigrationProtocol) {
- hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch());
- } else {
- hasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response());
- }
- ui64 sizeEstimation = hasMessages ? formedResponse->Response.ByteSize() : 0;
+ const ui64 diff = formedResponse->ByteSizeBeforeFiltering;
+ ui64 sizeEstimation = formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0;
if constexpr (!UseMigrationProtocol) {
formedResponse->Response.mutable_read_response()->set_bytes_size(sizeEstimation);
}
- if (hasMessages) {
+ if (formedResponse->HasMessages) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " response to read " << formedResponse->Guid);
if (!WriteResponse(std::move(formedResponse->Response))) {
@@ -1661,7 +1704,7 @@ void TReadSessionActor<UseMigrationProtocol>::ProcessAnswer(const TActorContext&
AvailablePartitions.insert(formedResponse->PartitionsBecameAvailable.begin(), formedResponse->PartitionsBecameAvailable.end());
if constexpr (UseMigrationProtocol) {
- if (!hasMessages) {
+ if (!formedResponse->HasMessages) {
// process new read
// Start new reading request with the same guid
Reads.emplace_back(new TEvPQProxy::TEvRead(formedResponse->Guid));
@@ -1836,8 +1879,43 @@ void TReadSessionActor<UseMigrationProtocol>::HandlePoison(TEvPQProxy::TEvDieCom
template<bool UseMigrationProtocol>
-void TReadSessionActor<UseMigrationProtocol>::HandleWakeup(const TActorContext& ctx) {
- ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup());
+void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
+ const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag);
+ OnWakeup(tag);
+
+ switch (tag) {
+ case EWakeupTag::RlInit:
+ return InitSession(ctx);
+
+ case EWakeupTag::RecheckAcl:
+ return RecheckACL(ctx);
+
+ case EWakeupTag::RlAllowed:
+ ProcessAnswer(ctx, PendingQuota);
+ if (!WaitingQuota.empty()) {
+ PendingQuota = WaitingQuota.front();
+ WaitingQuota.pop_front();
+ } else {
+ PendingQuota = nullptr;
+ }
+ if (PendingQuota) {
+ Y_VERIFY(MaybeRequestQuota(PendingQuota->RequiredQuota, EWakeupTag::RlAllowed, ctx));
+ }
+ break;
+
+ case EWakeupTag::RlNoResource:
+ if (PendingQuota) {
+ Y_VERIFY(MaybeRequestQuota(PendingQuota->RequiredQuota, EWakeupTag::RlAllowed, ctx));
+ } else {
+ return CloseSession("Throughput limit exceeded", PersQueue::ErrorCode::OVERLOAD, ctx);
+ }
+ break;
+ }
+}
+
+template<bool UseMigrationProtocol>
+void TReadSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) {
+ ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
if (Token && !AuthInitActor && (ForceACLCheck || (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()) && RequestNotChecked))) {
ForceACLCheck = false;
RequestNotChecked = false;
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index 2952da3970..99b026acee 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -9,12 +9,11 @@
#include <ydb/core/client/server/msgbus_server_pq_metacache.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
#include <ydb/core/kqp/kqp.h>
-
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
-
#include <ydb/core/protos/grpc_pq_old.pb.h>
+#include <ydb/services/lib/actors/pq_rl_helpers.h>
namespace NKikimr::NGRpcProxy::V1 {
@@ -24,7 +23,10 @@ inline TActorId GetPQWriteServiceActorID() {
}
template<bool UseMigrationProtocol>
-class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor<UseMigrationProtocol>> {
+class TWriteSessionActor
+ : public NActors::TActorBootstrapped<TWriteSessionActor<UseMigrationProtocol>>
+ , private TRlHelpers
+{
using TSelf = TWriteSessionActor<UseMigrationProtocol>;
using TClientMessage = std::conditional_t<UseMigrationProtocol, PersQueue::V1::StreamingWriteClientMessage,
Topic::StreamWriteMessage::FromClient>;
@@ -49,6 +51,33 @@ class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor
using TEvDescribeTopicsResponse = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse;
using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsRequest;
+ struct TWriteRequestInfo: public TSimpleRefCount<TWriteRequestInfo> {
+ using TPtr = TIntrusivePtr<TWriteRequestInfo>;
+
+ explicit TWriteRequestInfo(ui64 cookie)
+ : PartitionWriteRequest(new NPQ::TEvPartitionWriter::TEvWriteRequest(cookie))
+ , Cookie(cookie)
+ , ByteSize(0)
+ , RequiredQuota(0)
+ {
+ }
+
+ // Source requests from user (grpc session object)
+ std::deque<THolder<TEvWrite>> UserWriteRequests;
+
+ // Partition write request
+ THolder<NPQ::TEvPartitionWriter::TEvWriteRequest> PartitionWriteRequest;
+
+ // Formed write request's cookie
+ ui64 Cookie;
+
+ // Formed write request's size
+ ui64 ByteSize;
+
+ // Quota in term of RUs
+ ui64 RequiredQuota;
+ };
+
// Codec ID size in bytes
static constexpr ui32 CODEC_ID_SIZE = 1;
@@ -67,7 +96,7 @@ public:
private:
STFUNC(StateFunc) {
switch (ev->GetTypeRewrite()) {
- CFunc(NActors::TEvents::TSystem::Wakeup, HandleWakeup);
+ HFunc(TEvents::TEvWakeup, Handle);
HFunc(IContext::TEvReadFinished, Handle);
HFunc(IContext::TEvWriteFinished, Handle);
@@ -113,6 +142,7 @@ private:
void TryCloseSession(const TActorContext& ctx);
void CheckACL(const TActorContext& ctx);
+ void RecheckACL(const TActorContext& ctx);
// Requests fresh ACL from 'SchemeCache'
void InitCheckSchema(const TActorContext& ctx, bool needWaitSchema = false);
void Handle(typename TEvWriteInit::TPtr& ev, const NActors::TActorContext& ctx);
@@ -144,13 +174,14 @@ private:
void Handle(TEvPersQueue::TEvGetPartitionIdForWriteResponse::TPtr& ev, const NActors::TActorContext& ctx);
void HandlePoison(TEvPQProxy::TEvDieCommand::TPtr& ev, const NActors::TActorContext& ctx);
- void HandleWakeup(const NActors::TActorContext& ctx);
+ void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
void CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx);
void CheckFinish(const NActors::TActorContext& ctx);
- void GenerateNextWriteRequest(const NActors::TActorContext& ctx);
+ void PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx);
+ void SendRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx);
void SetupCounters();
void SetupCounters(const TString& cloudId, const TString& dbId, const TString& folderId);
@@ -190,31 +221,18 @@ private:
TString OwnerCookie;
TString UserAgent;
- ui32 NumReserveBytesRequests;
-
THolder<TAclWrapper> ACL;
- struct TWriteRequestBatchInfo: public TSimpleRefCount<TWriteRequestBatchInfo> {
- using TPtr = TIntrusivePtr<TWriteRequestBatchInfo>;
-
- // Source requests from user (grpc session object)
- std::deque<THolder<TEvWrite>> UserWriteRequests;
-
- // Formed write request's size
- ui64 ByteSize = 0;
-
- // Formed write request's cookie
- ui64 Cookie = 0;
- };
-
- // Nonprocessed source client requests
- std::deque<THolder<TEvWrite>> Writes;
-
- // Formed, but not sent, batch requests to partition actor
- std::deque<typename TWriteRequestBatchInfo::TPtr> FormedWrites;
-
- // Requests that is already sent to partition actor
- std::deque<typename TWriteRequestBatchInfo::TPtr> SentMessages;
+ // Future batch request to partition actor
+ typename TWriteRequestInfo::TPtr PendingRequest;
+ // Request that is waiting for quota
+ typename TWriteRequestInfo::TPtr PendingQuotaRequest;
+ // Quoted, but not sent requests
+ std::deque<typename TWriteRequestInfo::TPtr> QuotedRequests;
+ // Requests that is sent to partition actor, but not accepted
+ std::deque<typename TWriteRequestInfo::TPtr> SentRequests;
+ // Accepted requests
+ std::deque<typename TWriteRequestInfo::TPtr> AcceptedRequests;
bool WritesDone;
@@ -230,14 +248,15 @@ private:
ui64 BytesInflightTotal_;
bool NextRequestInited;
+ ui64 NextRequestCookie;
+
+ ui64 PayloadBytes;
NKikimr::NPQ::TMultiCounter SessionsCreated;
NKikimr::NPQ::TMultiCounter SessionsActive;
NKikimr::NPQ::TMultiCounter Errors;
- ui64 NextRequestCookie;
-
TIntrusivePtr<NACLib::TUserToken> Token;
TString Auth;
// Got 'update_token_request', authentication or authorization in progress,
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 36d2613676..43af563e85 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -155,6 +155,9 @@ static const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5;
static const ui32 MAX_BYTES_INFLIGHT = 1_MB;
static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1);
+// metering
+static const ui64 WRITE_BLOCK_SIZE = 4_KB;
+
//TODO: add here tracking of bytes in/out
@@ -165,7 +168,8 @@ TWriteSessionActor<UseMigrationProtocol>::TWriteSessionActor(
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC,
const NPersQueue::TTopicsListController& topicsController
)
- : Request(request)
+ : TRlHelpers(request, WRITE_BLOCK_SIZE, TDuration::Minutes(1))
+ , Request(request)
, State(ES_CREATED)
, SchemeCache(schemeCache)
, PeerName("")
@@ -173,7 +177,6 @@ TWriteSessionActor<UseMigrationProtocol>::TWriteSessionActor(
, TopicsController(topicsController)
, Partition(0)
, PreferedPartition(Max<ui32>())
- , NumReserveBytesRequests(0)
, WritesDone(false)
, Counters(counters)
, BytesInflight_(0)
@@ -320,7 +323,7 @@ void TWriteSessionActor<UseMigrationProtocol>::CheckFinish(const TActorContext&
CloseSession("out of order Writes done before initialization", PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
- if (Writes.empty() && FormedWrites.empty() && SentMessages.empty()) {
+ if (!PendingRequest && !PendingQuotaRequest && QuotedRequests.empty() && SentRequests.empty() & AcceptedRequests.empty()) {
CloseSession("", PersQueue::ErrorCode::OK, ctx);
return;
}
@@ -545,7 +548,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse:
return;
}
Y_VERIFY(entry.PQGroupInfo); // checked at ProcessMetaCacheTopicResponse()
- auto& description = entry.PQGroupInfo->Description;
+ const auto& description = entry.PQGroupInfo->Description;
Y_VERIFY(description.PartitionsSize() > 0);
Y_VERIFY(description.HasPQTabletConfig());
InitialPQTabletConfig = description.GetPQTabletConfig();
@@ -577,6 +580,13 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse:
ACL.Reset(new TAclWrapper(entry.SecurityObject));
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " describe result for acl check");
+ const auto meteringMode = description.GetPQTabletConfig().GetMeteringMode();
+ if (meteringMode != GetMeteringMode().GetOrElse(meteringMode)) {
+ return CloseSession("Metering mode has been changed", PersQueue::ErrorCode::OVERLOAD, ctx);
+ }
+
+ SetMeteringMode(meteringMode);
+
if (Request->GetInternalToken().empty()) { // session without auth
if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
Request->ReplyUnauthenticated("Unauthenticated access is forbidden, please provide credentials");
@@ -589,9 +599,13 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvDescribeTopicsResponse:
} else {
Y_VERIFY(Request->GetYdbToken());
Auth = *Request->GetYdbToken();
-
Token = new NACLib::TUserToken(Request->GetInternalToken());
- CheckACL(ctx);
+
+ if (FirstACLCheck && IsQuotaRequired()) {
+ Y_VERIFY(MaybeRequestQuota(1, EWakeupTag::RlInit, ctx));
+ } else {
+ CheckACL(ctx);
+ }
}
}
@@ -966,7 +980,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
State = ES_INITED;
- ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup());
+ ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
//init completed; wait for first data chunk
NextRequestInited = true;
@@ -983,18 +997,18 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
return CloseSession("got write permission but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
}
- Y_VERIFY(!FormedWrites.empty());
- typename TWriteRequestBatchInfo::TPtr writeRequest = std::move(FormedWrites.front());
+ Y_VERIFY(!SentRequests.empty());
+ auto writeRequest = std::move(SentRequests.front());
if (ev->Get()->Cookie != writeRequest->Cookie) {
return CloseSession("out of order reserve bytes response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx);
}
- FormedWrites.pop_front();
+ SentRequests.pop_front();
ui64 diff = writeRequest->ByteSize;
- SentMessages.emplace_back(std::move(writeRequest));
+ AcceptedRequests.emplace_back(std::move(writeRequest));
BytesInflight_ -= diff;
BytesInflight.Dec(diff);
@@ -1008,9 +1022,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
}
- --NumReserveBytesRequests;
- if (!Writes.empty())
- GenerateNextWriteRequest(ctx);
+ if (!IsQuotaRequired() && PendingRequest) {
+ SendRequest(std::move(PendingRequest), ctx);
+ } else if (!QuotedRequests.empty()) {
+ SendRequest(std::move(QuotedRequests.front()), ctx);
+ QuotedRequests.pop_front();
+ }
}
template<bool UseMigrationProtocol>
@@ -1031,13 +1048,13 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
const auto& resp = result.Record.GetPartitionResponse();
- if (SentMessages.empty()) {
+ if (AcceptedRequests.empty()) {
CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx);
return;
}
- typename TWriteRequestBatchInfo::TPtr writeRequest = std::move(SentMessages.front());
- SentMessages.pop_front();
+ auto writeRequest = std::move(AcceptedRequests.front());
+ AcceptedRequests.pop_front();
if (resp.GetCookie() != writeRequest->Cookie) {
return CloseSession("out of order write response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx);
@@ -1176,14 +1193,13 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientDe
}
template<bool UseMigrationProtocol>
-void TWriteSessionActor<UseMigrationProtocol>::GenerateNextWriteRequest(const TActorContext& ctx) {
- typename TWriteRequestBatchInfo::TPtr writeRequest = new TWriteRequestBatchInfo();
-
- auto ev = MakeHolder<NPQ::TEvPartitionWriter::TEvWriteRequest>(++NextRequestCookie);
- NKikimrClient::TPersQueueRequest& request = ev->Record;
-
- writeRequest->UserWriteRequests = std::move(Writes);
- Writes.clear();
+void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx) {
+ if (!PendingRequest) {
+ PendingRequest = new TWriteRequestInfo(++NextRequestCookie);
+ }
+
+ auto& request = PendingRequest->PartitionWriteRequest->Record;
+ ui64 payloadSize = 0;
auto addDataMigration = [&](const StreamingWriteClientMessage::WriteRequest& writeRequest, const i32 messageIndex) {
auto w = request.MutablePartitionRequest()->AddCmdWrite();
@@ -1193,6 +1209,7 @@ void TWriteSessionActor<UseMigrationProtocol>::GenerateNextWriteRequest(const TA
w->SetCreateTimeMS(writeRequest.created_at_ms(messageIndex));
w->SetUncompressedSize(writeRequest.blocks_uncompressed_sizes(messageIndex));
w->SetClientDC(ClientDC);
+ payloadSize += w->GetData().size() + w->GetSourceId().size();
};
auto addData = [&](const Topic::StreamWriteMessage::WriteRequest& writeRequest, const i32 messageIndex) {
@@ -1203,39 +1220,54 @@ void TWriteSessionActor<UseMigrationProtocol>::GenerateNextWriteRequest(const TA
w->SetCreateTimeMS(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(writeRequest.messages(messageIndex).created_at()));
w->SetUncompressedSize(writeRequest.messages(messageIndex).uncompressed_size());
w->SetClientDC(ClientDC);
+ payloadSize += w->GetData().size() + w->GetSourceId().size();
};
- i64 diff = 0;
+ const auto& writeRequest = ev->Request.write_request();
+ if constexpr (UseMigrationProtocol) {
+ for (i32 messageIndex = 0; messageIndex != writeRequest.sequence_numbers_size(); ++messageIndex) {
+ addDataMigration(writeRequest, messageIndex);
+ }
+ } else {
+ for (i32 messageIndex = 0; messageIndex != writeRequest.messages_size(); ++messageIndex) {
+ addData(writeRequest, messageIndex);
+ }
+ }
- for (const auto& write : writeRequest->UserWriteRequests) {
- diff -= write->Request.ByteSize();
- const auto& writeRequest = write->Request.write_request();
+ PendingRequest->UserWriteRequests.push_back(std::move(ev));
+ PendingRequest->ByteSize = request.ByteSize();
- if constexpr (UseMigrationProtocol) {
- for (i32 messageIndex = 0; messageIndex != writeRequest.sequence_numbers_size(); ++messageIndex) {
- addDataMigration(writeRequest, messageIndex);
- }
- } else {
- for (i32 messageIndex = 0; messageIndex != writeRequest.messages_size(); ++messageIndex) {
- addData(writeRequest, messageIndex);
- }
+ if (const auto ru = CalcRuConsumption(payloadSize)) {
+ PendingRequest->RequiredQuota += ru;
+ if (MaybeRequestQuota(PendingRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)) {
+ Y_VERIFY(!PendingQuotaRequest);
+ PendingQuotaRequest = std::move(PendingRequest);
+ }
+ } else {
+ if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
+ SendRequest(std::move(PendingRequest), ctx);
}
}
+}
- writeRequest->Cookie = request.GetPartitionRequest().GetCookie();
+template<bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::SendRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx) {
+ Y_VERIFY(request->PartitionWriteRequest);
+
+ i64 diff = 0;
+ for (const auto& w : request->UserWriteRequests) {
+ diff -= w->Request.ByteSize();
+ }
Y_VERIFY(-diff <= (i64)BytesInflight_);
- diff += request.ByteSize();
+ diff += request->PartitionWriteRequest->Record.ByteSize();
BytesInflight_ += diff;
BytesInflightTotal_ += diff;
BytesInflight.Inc(diff);
BytesInflightTotal.Inc(diff);
- writeRequest->ByteSize = request.ByteSize();
- FormedWrites.push_back(writeRequest);
-
- ctx.Send(Writer, std::move(ev));
- ++NumReserveBytesRequests;
+ ctx.Send(Writer, std::move(request->PartitionWriteRequest));
+ SentRequests.push_back(std::move(request));
}
template<bool UseMigrationProtocol>
@@ -1410,10 +1442,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e
}
- THolder<TEvWrite> event(ev->Release());
- Writes.push_back(std::move(event));
-
- ui64 diff = Writes.back()->Request.ByteSize();
+ ui64 diff = ev->Get()->Request.ByteSize();
BytesInflight_ += diff;
BytesInflightTotal_ += diff;
BytesInflight.Inc(diff);
@@ -1431,9 +1460,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e
NextRequestInited = false;
}
- if (NumReserveBytesRequests < MAX_RESERVE_REQUESTS_INFLIGHT) {
- GenerateNextWriteRequest(ctx);
- }
+ PrepareRequest(THolder<TEvWrite>(ev->Release()), ctx);
}
template<bool UseMigrationProtocol>
@@ -1462,9 +1489,43 @@ void TWriteSessionActor<UseMigrationProtocol>::LogSession(const TActorContext& c
}
template<bool UseMigrationProtocol>
-void TWriteSessionActor<UseMigrationProtocol>::HandleWakeup(const TActorContext& ctx) {
+void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
+ const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag);
+ OnWakeup(tag);
+
+ switch (tag) {
+ case EWakeupTag::RlInit:
+ return CheckACL(ctx);
+
+ case EWakeupTag::RecheckAcl:
+ return RecheckACL(ctx);
+
+ case EWakeupTag::RlAllowed:
+ if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
+ SendRequest(std::move(PendingQuotaRequest), ctx);
+ } else {
+ QuotedRequests.push_back(std::move(PendingQuotaRequest));
+ }
+
+ if (PendingQuotaRequest = std::move(PendingRequest)) {
+ Y_VERIFY(MaybeRequestQuota(PendingQuotaRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx));
+ }
+ break;
+
+ case EWakeupTag::RlNoResource:
+ if (PendingQuotaRequest) {
+ Y_VERIFY(MaybeRequestQuota(PendingQuotaRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx));
+ } else {
+ return CloseSession("Throughput limit exceeded", PersQueue::ErrorCode::OVERLOAD, ctx);
+ }
+ break;
+ }
+}
+
+template<bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& ctx) {
Y_VERIFY(State == ES_INITED);
- ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup());
+ ctx.Schedule(CHECK_ACL_DELAY, new TEvents::TEvWakeup(EWakeupTag::RecheckAcl));
if (Token && !ACLCheckInProgress && RequestNotChecked && (ctx.Now() - LastACLCheckTimestamp > TDuration::Seconds(AppData(ctx)->PQConfig.GetACLRetryTimeoutSec()))) {
RequestNotChecked = false;
InitCheckSchema(ctx);