diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-29 15:09:38 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-29 18:44:48 +0300 |
commit | ccd68bc7ecf30f86692670b1c303d02f8758f600 (patch) | |
tree | 2293f328608eb5b2d37a49b65704d2ddd79f0821 | |
parent | b906b72cdf260008a312be3b4717a4593dc26786 (diff) | |
download | ydb-ccd68bc7ecf30f86692670b1c303d02f8758f600.tar.gz |
Count RequestUnits for Kafka protocol
35 files changed, 714 insertions, 203 deletions
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index e10d52aaf60..25a270d5666 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -480,12 +480,7 @@ void TGRpcRequestProxyImpl::ForgetDatabase(const TString& database) { void TGRpcRequestProxyImpl::SubscribeToDatabase(const TString& database) { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Subscribe to " << database); - Y_VERIFY(!AppData()->DomainsInfo->Domains.empty()); - auto& domain = AppData()->DomainsInfo->Domains.begin()->second; - ui64 domainOwnerId = domain->SchemeRoot; - ui32 schemeBoardGroup = domain->DefaultSchemeBoardGroup; - THolder<IActor> subscriber{CreateSchemeBoardSubscriber(SelfId(), database, schemeBoardGroup, domainOwnerId)}; - TActorId subscriberId = Register(subscriber.Release()); + TActorId subscriberId = Register(CreateSchemeBoardSubscriber(SelfId(), database)); auto itSubscriber = Subscribers.emplace(database, subscriberId); if (!itSubscriber.second) { Send(itSubscriber.first->second, new TEvents::TEvPoisonPill()); diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index 1b98abe4d9f..526328bddb9 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/core/persqueue/pq_rl_helpers.h> #include <ydb/core/protos/config.pb.h> #include <ydb/library/aclib/aclib.h> @@ -33,6 +34,8 @@ struct TContext { TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TString ClientDC; + NKikimr::NPQ::TRlContext RlContext; + bool Authenticated() { return AuthenticationStep == SUCCESS; } }; diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 908446b0326..11b3cd582f6 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -19,6 +19,13 @@ NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context) { return new TKafkaProduceActor(context); } +TString NormalizePath(const TString& database, const TString& topic) { + if (topic.Size() > database.Size() && topic.at(database.Size()) == '/' && topic.StartsWith(database)) { + return topic; + } + return CanonizePath(database + "/" + topic); +} + TString TKafkaProduceActor::LogPrefix() { TStringBuilder sb; sb << "TKafkaProduceActor " << SelfId() << " State: "; @@ -115,12 +122,14 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu auto* navigate = ev.Get()->Get()->Request.Get(); for (auto& info : navigate->ResultSet) { if (NSchemeCache::TSchemeCacheNavigate::EStatus::Ok == info.Status) { - auto topicPath = "/" + NKikimr::JoinPath(info.Path); + auto topicPath = CanonizePath(NKikimr::JoinPath(info.Path)); KAFKA_LOG_D("Received topic '" << topicPath << "' description"); TopicsForInitialization.erase(topicPath); auto& topic = Topics[topicPath]; + topic.MeteringMode = info.PQGroupInfo->Description.GetPQTabletConfig().GetMeteringMode(); + if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *Context->UserToken)) { topic.Status = OK; topic.ExpirationTime = now + TOPIC_OK_EXPIRATION_INTERVAL; @@ -222,7 +231,7 @@ size_t TKafkaProduceActor::EnqueueInitialization() { for(const auto& e : Requests) { auto* r = e->Get()->Request; for(const auto& topicData : r->TopicData) { - const auto& topicPath = *topicData.Name; + const auto& topicPath = NormalizePath(Context->Database, *topicData.Name); if (!Topics.contains(topicPath)) { requireInitialization = true; TopicsForInitialization.insert(topicPath); @@ -315,7 +324,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const T size_t position = 0; for(const auto& topicData : r->TopicData) { - const TString& topicPath = *topicData.Name; + const TString& topicPath = NormalizePath(Context->Database, *topicData.Name); for(const auto& partitionData : topicData.PartitionData) { const auto partitionId = partitionData.Index; @@ -470,11 +479,13 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { partitionResponse.Index = partitionData.Index; if (EKafkaErrors::NONE_ERROR != result.ErrorCode) { + KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode) << ", ErrorMessage=" << result.ErrorMessage); partitionResponse.ErrorCode = result.ErrorCode; partitionResponse.ErrorMessage = result.ErrorMessage; } else { auto* msg = result.Value->Get(); if (msg->IsSuccess()) { + KAFKA_LOG_T("Partition result success."); partitionResponse.ErrorCode = EKafkaErrors::NONE_ERROR; auto& writeResults = msg->Record.GetPartitionResponse().GetCmdWriteResult(); @@ -484,6 +495,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { partitionResponse.BaseOffset = lastResult.GetSeqNo(); } } else { + KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason); partitionResponse.ErrorCode = Convert(msg->GetError().Code); partitionResponse.ErrorMessage = msg->GetError().Reason; } @@ -560,7 +572,10 @@ std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::Partit } auto tabletId = pit->second; - auto* writerActor = CreatePartitionWriter(SelfId(), tabletId, partitionId, {}, SourceId, TPartitionWriterOpts().WithDeduplication(false)); + TPartitionWriterOpts opts; + opts.WithDeduplication(false) + .WithCheckRequestUnits(topicInfo.MeteringMode, Context->RlContext); + auto* writerActor = CreatePartitionWriter(SelfId(), topicPath, tabletId, partitionId, {/*expectedGeneration*/}, SourceId, opts); auto& writerInfo = partitionWriters[partitionId]; writerInfo.ActorId = ctx.RegisterWithSameMailbox(writerActor); diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h index e6ee6990009..b37c6890963 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h @@ -177,6 +177,8 @@ private: ETopicStatus Status = OK; TInstant ExpirationTime; + NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode; + // partitioId -> tabletId std::unordered_map<ui32, ui64> partitions; }; diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp index 0ef26fed5f9..e0c3bab0ce8 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp @@ -1,7 +1,9 @@ #include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> +#include <ydb/core/base/path.h> #include <ydb/core/base/ticket_parser.h> #include <ydb/core/kafka_proxy/kafka_events.h> +#include <ydb/core/tx/scheme_board/subscriber.h> #include <library/cpp/actors/core/actor.h> #include "kafka_sasl_auth_actor.h" @@ -33,13 +35,21 @@ void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) { StartPlainAuth(ctx); } +void TKafkaSaslAuthActor::PassAway() { + if (SubscriberId) { + Send(SubscriberId, new TEvents::TEvPoison()); + } + TActorBootstrapped::PassAway(); +} + void TKafkaSaslAuthActor::StartPlainAuth(const NActors::TActorContext& ctx) { TAuthData authData; if (!TryParseAuthDataTo(authData, ctx)) { return; } - Database = authData.Database; + Database = CanonizePath(authData.Database); SendLoginRequest(authData, ctx); + SendDescribeRequest(ctx); } void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx) { @@ -48,17 +58,11 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes return; } - auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>(); - responseToClient->ErrorCode = EKafkaErrors::NONE_ERROR; - responseToClient->ErrorMessage = ""; - responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES)); + Authentificated = true; - auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); + Token = ev->Get()->Token; - auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, ev->Get()->Token, Database); - Send(Context->ConnectionId, authResult); - - Die(ctx); + ReplyIfReady(ctx); } void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& /*ctx*/) { @@ -69,6 +73,31 @@ void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NAct })); } +void TKafkaSaslAuthActor::ReplyIfReady(const NActors::TActorContext& ctx) { + if (!Authentificated || !Described) { + return; + } + + KAFKA_LOG_D("Authentificated success. Database='" << Database << "', " + << "FolderId='" << FolderId << "', " + << "ServiceAccountId='" << ServiceAccountId << "', " + << "DatabaseId='" << DatabaseId << "', " + << "Coordinator='" << Coordinator << "', " + << "ResourcePath='" << ResourcePath << "'"); + + auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>(); + responseToClient->ErrorCode = EKafkaErrors::NONE_ERROR; + responseToClient->ErrorMessage = ""; + responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES)); + + auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); + + auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, Token, Database, FolderId, ServiceAccountId, DatabaseId, Coordinator, ResourcePath); + Send(Context->ConnectionId, authResult); + + Die(ctx); +} + void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx) { SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->ErrorMessage, ctx); } @@ -111,7 +140,7 @@ void TKafkaSaslAuthActor::SendAuthFailedAndDie(EKafkaErrors errorCode, const TSt responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES)); auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient); - auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, nullptr, "", errorMessage); + auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, errorMessage); Send(Context->ConnectionId, authResult); Die(ctx); @@ -125,7 +154,6 @@ void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authDa using TRpcEv = NKikimr::NGRpcService::TGRpcRequestWrapperNoAuth<NKikimr::NGRpcService::TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>; auto rpcFuture = NKikimr::NRpcService::DoLocalRpc<TRpcEv>(std::move(request), authData.Database, {}, actorSystem); - rpcFuture.Subscribe([authData, actorSystem, selfId = SelfId()](const NThreading::TFuture<Ydb::Auth::LoginResponse>& future) { auto& response = future.GetValueSync(); @@ -146,4 +174,48 @@ void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authDa }); } +void TKafkaSaslAuthActor::SendDescribeRequest(const NActors::TActorContext& /*ctx*/) { + if (Database.Empty()) { + Described = true; + return; + } + + KAFKA_LOG_D("Describe database '" << Database << "'"); + SubscriberId = Register(CreateSchemeBoardSubscriber(SelfId(), Database)); +} + +void TKafkaSaslAuthActor::Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev, const TActorContext& ctx) { + Described = true; + + auto* result = ev->Get(); + auto status = result->DescribeSchemeResult.GetStatus(); + if (status != NKikimrScheme::EStatus::StatusSuccess) { + KAFKA_LOG_ERROR("Describe database '" << Database << "' error: " << status); + ReplyIfReady(ctx); + return; + } + + for(const auto& attr : result->DescribeSchemeResult.GetPathDescription().GetUserAttributes()) { + const auto& key = attr.GetKey(); + const auto& value = attr.GetValue(); + + KAFKA_LOG_D("Database attribute key=" << key << ", value=" << value); + + if (key == "folder_id") { + FolderId = value; + } else if (key == "service_account_id") { + ServiceAccountId = value; + } else if (key == "database_id") { + DatabaseId = value; + } else if (key == "serverless_rt_coordination_node_path") { + Coordinator = value; + } else if (key == "serverless_rt_base_resource_ru") { + ResourcePath = value; + } + } + + ReplyIfReady(ctx); +} + + } // NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h index 85d516b3759..8c3007096f5 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h @@ -2,6 +2,8 @@ #include "ydb/library/aclib/aclib.h" #include <ydb/core/base/ticket_parser.h> +#include <ydb/core/tx/scheme_board/events.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -9,6 +11,8 @@ namespace NKafka { +using namespace NKikimr; + class TKafkaSaslAuthActor: public NActors::TActorBootstrapped<TKafkaSaslAuthActor> { struct TEvPrivate { @@ -48,22 +52,31 @@ public: private: STATEFN(StateWork) { + KAFKA_LOG_T("Received event: " << (*ev.Get()).GetTypeName()); switch (ev->GetTypeRewrite()) { HFunc(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult, Handle); HFunc(TEvPrivate::TEvTokenReady, Handle); HFunc(TEvPrivate::TEvAuthFailed, Handle); + HFunc(TSchemeBoardEvents::TEvNotifyUpdate, Handle); } } void Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx); void Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& ctx); void Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx); + void Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev, const TActorContext& ctx); void StartPlainAuth(const NActors::TActorContext& ctx); void SendLoginRequest(TKafkaSaslAuthActor::TAuthData authData, const NActors::TActorContext& ctx); + void SendDescribeRequest(const NActors::TActorContext& ctx); void SendAuthFailedAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx); bool TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx); + void ReplyIfReady(const NActors::TActorContext& ctx); + +protected: + void PassAway() override; + private: const TContext::TPtr Context; const ui64 CorrelationId; @@ -72,6 +85,16 @@ private: const NKikimr::NRawSocket::TNetworkConfig::TSocketAddressType Address; TString Database; + TIntrusiveConstPtr<NACLib::TUserToken> Token; + TString FolderId; + TString ServiceAccountId; + TString DatabaseId; + TString Coordinator; + TString ResourcePath; + + bool Authentificated = false; + bool Described = false; + TActorId SubscriberId; }; } // NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index bc951d1b95f..696d0799814 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -290,6 +290,7 @@ protected: Context->UserToken = event->UserToken; Context->Database = event->Database; Context->AuthenticationStep = authStep; + Context->RlContext = {event->Coordinator, event->ResourcePath, event->Database, event->UserToken->GetSerializedToken()}; KAFKA_LOG_D("Authentificated successful. SID=" << Context->UserToken->GetUserSID()); } diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index 8b70b059308..f9577438dfb 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -49,17 +49,35 @@ struct TEvKafka { }; struct TEvAuthResult : public TEventLocal<TEvAuthResult, EvAuthResult> { - TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TIntrusiveConstPtr<NACLib::TUserToken> token, TString database, TString error = "") - : AuthStep(authStep), - UserToken(token), - Database(database), - Error(error), - ClientResponse(std::move(clientResponse)) - {} + TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TString error = "") + : AuthStep(authStep) + , Error(error) + , ClientResponse(clientResponse) { + } + + TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TIntrusiveConstPtr<NACLib::TUserToken> token, TString database, + TString folderId, TString serviceAccountId, TString databaseId, TString coordinator, TString resourcePath, TString error = "") + : AuthStep(authStep) + , UserToken(token) + , Database(database) + , FolderId(folderId) + , ServiceAccountId(serviceAccountId) + , DatabaseId(databaseId) + , Coordinator(coordinator) + , ResourcePath(resourcePath) + , Error(error) + , ClientResponse(std::move(clientResponse)) { + } EAuthSteps AuthStep; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TString Database; + TString FolderId; + TString ServiceAccountId; + TString DatabaseId; + TString Coordinator; + TString ResourcePath; + TString Error; TString SaslMechanism; std::shared_ptr<TEvKafka::TEvResponse> ClientResponse; diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h index 890b19ede5f..7461d0c78d1 100644 --- a/ydb/core/kafka_proxy/kafka_messages_int.h +++ b/ydb/core/kafka_proxy/kafka_messages_int.h @@ -356,7 +356,7 @@ public: ythrow yexception() << "string field " << Meta::Name << " is too long to be serialized " << v.length(); } if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { - return v.length() + SizeOfUnsignedVarint(v.length() + sizeof(TKafkaInt8)); + return v.length() + SizeOfUnsignedVarint(v.length() + 1); } else { return v.length() + sizeof(TKafkaInt16); } @@ -450,7 +450,7 @@ class TypeStrategy<Meta, TKafkaRecords, TKafkaRecordsDesc> { public: inline static void DoWrite(TKafkaWritable& writable, TKafkaVersion version, const TKafkaRecords& value) { if (value) { - WriteArraySize<Meta>(writable, version, DoSize(CURRENT_RECORD_VERSION, value)); + WriteArraySize<Meta>(writable, version, value->Size(CURRENT_RECORD_VERSION)); (*value).Write(writable, CURRENT_RECORD_VERSION); } else { WriteArraySize<Meta>(writable, version, 0); @@ -490,11 +490,21 @@ public: } } - inline static i64 DoSize(TKafkaVersion /*version*/, const TKafkaRecords& value) { + inline static i64 DoSize(TKafkaVersion version, const TKafkaRecords& value) { if (value) { - return (*value).Size(CURRENT_RECORD_VERSION); + const auto& v = *value; + const auto size = v.Size(CURRENT_RECORD_VERSION); + if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { + return size + SizeOfUnsignedVarint(size + 1); + } else { + return size + sizeof(TKafkaInt32); + } } else { - return 0; + if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) { + return 1; + } else { + return sizeof(TKafkaInt32); + } } } @@ -614,13 +624,24 @@ inline void Size(TSizeCollector& collector, TKafkaInt16 version, const typename i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value); collector.Size += size + SizeOfUnsignedVarint(Meta::Tag) + SizeOfUnsignedVarint(size); + if constexpr (DEBUG_ENABLED) { + Cerr << "Size of field '" << Meta::Name << "' " << size << " + " << SizeOfUnsignedVarint(Meta::Tag) << " + " << SizeOfUnsignedVarint(size) << Endl; + } } } else if (VersionCheck<Meta::PresentVersions.Min, Meta::PresentVersions.Max>(version)) { - collector.Size += TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value); + i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value); + collector.Size += size; + if constexpr (DEBUG_ENABLED) { + Cerr << "Size of field '" << Meta::Name << "' " << size << Endl; + } } } else { if (VersionCheck<Meta::PresentVersions.Min, Meta::PresentVersions.Max>(version)) { - collector.Size += TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value); + i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value); + collector.Size += size; + if constexpr (DEBUG_ENABLED) { + Cerr << "Size of field '" << Meta::Name << "' " << size << Endl; + } } } } diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 4d4c5724e9e..ac633aa3cc5 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -125,7 +125,7 @@ public: nullptr, 0) ); - KikimrServer->GetRuntime()->SetLogPriority(NKikimrServices::KAFKA_PROXY, NActors::NLog::PRI_DEBUG); + KikimrServer->GetRuntime()->SetLogPriority(NKikimrServices::KAFKA_PROXY, NActors::NLog::PRI_TRACE); ui16 grpc = KikimrServer->GetPort(); TString location = TStringBuilder() << "localhost:" << grpc; @@ -169,7 +169,7 @@ using TInsecureTestServer = TTestServer<TKikimrWithGrpcAndRootSchema, false>; using TSecureTestServer = TTestServer<TKikimrWithGrpcAndRootSchemaSecure, true>; void Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version) { - TWritableBuf sb(nullptr, request->Size(version)); + TWritableBuf sb(nullptr, request->Size(version) + 1000); TKafkaWritable writable(sb); request->Write(writable, version); so.Write(sb.Data(), sb.Size()); @@ -221,7 +221,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { Y_UNIT_TEST(ProduceScenario) { TInsecureTestServer testServer; - TString topicName = "topic-0-test"; + TString topicName = "/Root/topic-0-test"; { NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); @@ -235,13 +235,15 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TSocketOutput so(s); TSocketInput si(s); + size_t correlationId = 0; + { Cerr << ">>>>> ApiVersionsRequest\n"; TRequestHeaderData header; header.RequestApiKey = NKafka::EApiKey::API_VERSIONS; header.RequestApiVersion = 2; - header.CorrelationId = 0; + header.CorrelationId = correlationId++; header.ClientId = "test"; TApiVersionsRequestData request; @@ -263,7 +265,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { TRequestHeaderData header; header.RequestApiKey = NKafka::EApiKey::SASL_HANDSHAKE; header.RequestApiVersion = 1; - header.CorrelationId = 1; + header.CorrelationId = correlationId++; header.ClientId = "test"; TSaslHandshakeRequestData request; @@ -280,13 +282,13 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } { - Cerr << ">>>>> SaslAuthenticateRequestData"; + Cerr << ">>>>> SaslAuthenticateRequestData\n"; char authBytes[] = "ignored\0ouruser@/Root\0ourUserPassword"; TRequestHeaderData header; header.RequestApiKey = NKafka::EApiKey::SASL_AUTHENTICATE; header.RequestApiVersion = 2; - header.CorrelationId = 2; + header.CorrelationId = correlationId++; header.ClientId = "test"; TSaslAuthenticateRequestData request; @@ -295,10 +297,61 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { Write(so, &header, &request); auto response = Read(si, &header); - Y_UNUSED(response); auto* msg = dynamic_cast<TSaslAuthenticateResponseData*>(response.get()); UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); } + + { + Cerr << ">>>>> TInitProducerIdRequestData\n"; + + TRequestHeaderData header; + header.RequestApiKey = NKafka::EApiKey::INIT_PRODUCER_ID; + header.RequestApiVersion = 4; + header.CorrelationId = correlationId++; + header.ClientId = "test"; + + TInitProducerIdRequestData request; + request.TransactionTimeoutMs = 5000; + + Write(so, &header, &request); + + auto response = Read(si, &header); + auto* msg = dynamic_cast<TInitProducerIdResponseData*>(response.get()); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + + { + Cerr << ">>>>> TProduceRequestData\n"; + + TRequestHeaderData header; + header.RequestApiKey = NKafka::EApiKey::PRODUCE; + header.RequestApiVersion = 9; + header.CorrelationId = correlationId++; + header.ClientId = "test-client-random-string"; + + TProduceRequestData request; + request.TopicData.resize(1); + request.TopicData[0].Name = topicName; + request.TopicData[0].PartitionData.resize(1); + request.TopicData[0].PartitionData[0].Index = 0; // Partition id + request.TopicData[0].PartitionData[0].Records.emplace(); + request.TopicData[0].PartitionData[0].Records->BaseOffset = 3; + request.TopicData[0].PartitionData[0].Records->BaseSequence = 5; + request.TopicData[0].PartitionData[0].Records->Magic = 2; // Current supported + request.TopicData[0].PartitionData[0].Records->Records.resize(1); + request.TopicData[0].PartitionData[0].Records->Records[0].Key = "record-key"; + request.TopicData[0].PartitionData[0].Records->Records[0].Value = "record-value"; + + Write(so, &header, &request); + + auto response = Read(si, &header); + auto* msg = dynamic_cast<TProduceResponseData*>(response.get()); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0); + // UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } } }
\ No newline at end of file diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt index 0009a60a21d..95b410b0854 100644 --- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt @@ -62,6 +62,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_rl_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt index 61db845d639..c81de2e4f04 100644 --- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt @@ -63,6 +63,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_rl_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt index 61db845d639..c81de2e4f04 100644 --- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt @@ -63,6 +63,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_rl_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt index 0009a60a21d..95b410b0854 100644 --- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt +++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt @@ -62,6 +62,7 @@ target_sources(ydb-core-persqueue PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_rl_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp diff --git a/ydb/core/persqueue/pq_rl_helpers.cpp b/ydb/core/persqueue/pq_rl_helpers.cpp new file mode 100644 index 00000000000..f3eb3348299 --- /dev/null +++ b/ydb/core/persqueue/pq_rl_helpers.cpp @@ -0,0 +1,117 @@ +#include "pq_rl_helpers.h" + +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/tx/scheme_board/subscriber.h> + +namespace NKikimr::NPQ { + +TRlHelpers::TRlHelpers(const std::optional<TString>& topicPath, const TRlContext ctx, ui64 blockSize, bool subscribe, const TDuration& waitDuration) + : TStreamRequestUnitsCalculator(blockSize) + , TopicPath(topicPath) + , Ctx(ctx) + , Subscribe(subscribe) + , WaitDuration(waitDuration) +{ + Y_VERIFY_DEBUG(!subscribe || (topicPath && subscribe)); +} + +void TRlHelpers::Bootstrap(const TActorId selfId, const NActors::TActorContext& ctx) { + if (Subscribe) { + SubscriberId = ctx.Register(CreateSchemeBoardSubscriber(selfId, *TopicPath)); + } +} + +void TRlHelpers::PassAway(const TActorId selfId) { + TActorIdentity id(selfId); + if (RlActor) { + id.Send(RlActor, new TEvents::TEvPoison()); + } + if (SubscriberId) { + id.Send(SubscriberId, new TEvents::TEvPoison()); + } +} + +bool TRlHelpers::IsQuotaInflight() const { + return !!RlActor; +} + +bool TRlHelpers::IsQuotaRequired() const { + Y_VERIFY(MeteringMode.Defined()); + return MeteringMode == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS && Ctx; +} + +void TRlHelpers::RequestInitQuota(ui64 amount, const TActorContext& ctx) { + RequestQuota(amount, EWakeupTag::RlInit, EWakeupTag::RlInitNoResource, ctx); +} + +void TRlHelpers::RequestDataQuota(ui64 amount, const TActorContext& ctx) { + RequestQuota(amount, EWakeupTag::RlAllowed, EWakeupTag::RlNoResource, ctx); +} + +bool TRlHelpers::MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx) { + if (IsQuotaInflight()) { + return false; + } + + RequestQuota(amount, tag, EWakeupTag::RlNoResource, ctx); + return true; +} + +void TRlHelpers::RequestQuota(ui64 amount, EWakeupTag success, EWakeupTag timeout, const TActorContext& ctx) { + const auto selfId = ctx.SelfID; + const auto as = ctx.ActorSystem(); + + auto onSendAllowed = [selfId, as, success]() { + as->Send(selfId, new TEvents::TEvWakeup(success)); + }; + + auto onSendTimeout = [selfId, as, timeout]() { + as->Send(selfId, new TEvents::TEvWakeup(timeout)); + }; + + RlActor = NRpcService::RateLimiterAcquireUseSameMailbox( + Ctx.GetPath(), amount, WaitDuration, + std::move(onSendAllowed), std::move(onSendTimeout), ctx); +} + +void TRlHelpers::OnWakeup(EWakeupTag tag) { + switch (tag) { + case EWakeupTag::RlInit: + case EWakeupTag::RlInitNoResource: + case EWakeupTag::RlAllowed: + case EWakeupTag::RlNoResource: + RlActor = {}; + break; + default: + break; + } +} + +const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& TRlHelpers::GetMeteringMode() const { + return MeteringMode; +} + +void TRlHelpers::SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode) { + MeteringMode = mode; +} + +ui64 TRlHelpers::CalcRuConsumption(ui64 payloadSize) { + if (!IsQuotaRequired()) { + return 0; + } + + return CalcConsumption(payloadSize); +} + +void TRlHelpers::Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev) { + auto* result = ev->Get(); + auto status = result->DescribeSchemeResult.GetStatus(); + if (status != NKikimrScheme::EStatus::StatusSuccess) { + //LOG_ERROR("Describe database '" << Database << "' error: " << status); + return; + } + + MeteringMode = result->DescribeSchemeResult.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetMeteringMode(); +} + +} diff --git a/ydb/core/persqueue/pq_rl_helpers.h b/ydb/core/persqueue/pq_rl_helpers.h new file mode 100644 index 00000000000..836808c012a --- /dev/null +++ b/ydb/core/persqueue/pq_rl_helpers.h @@ -0,0 +1,88 @@ +#pragma once + +#include <ydb/core/grpc_services/local_rate_limiter.h> +#include <ydb/core/metering/stream_ru_calculator.h> +#include <ydb/core/protos/pqconfig.pb.h> + +#include <util/datetime/base.h> + +namespace NKikimr::NPQ { + +class TRlContext { +public: + TRlContext() { + } + + TRlContext(const NGRpcService::IRequestCtxBase* reqCtx) { + Path.DatabaseName = reqCtx->GetDatabaseName().GetOrElse(""); + Path.Token = reqCtx->GetSerializedToken(); + + if (auto rlPath = reqCtx->GetRlPath()) { + Path.ResourcePath = rlPath->ResourcePath; + Path.CoordinationNode = rlPath->CoordinationNode; + } + } + + TRlContext(const TString& coordinationNode, + const TString& resourcePath, + const TString& databaseName, + const TString& token) + : Path({coordinationNode, resourcePath, databaseName, token}) { + } + + operator bool() const { return !Path.ResourcePath.Empty() && !Path.CoordinationNode.Empty(); }; + const NRpcService::TRlFullPath GetPath() const { return Path; } + +private: + NRpcService::TRlFullPath Path; +}; + +class TRlHelpers: public NMetering::TStreamRequestUnitsCalculator { +public: + TRlHelpers(const std::optional<TString>& topicPath, const TRlContext ctx, ui64 blockSize, bool subscribe, const TDuration& waitDuration = TDuration::Minutes(1)); + +protected: + enum EWakeupTag: ui64 { + RlInit = 0, + RlInitNoResource = 1, + + RlAllowed = 2, + RlNoResource = 3, + + RecheckAcl = 4, + }; + + void Bootstrap(const TActorId selfId, const NActors::TActorContext& ctx); + void PassAway(const TActorId selfId); + + bool IsQuotaRequired() const; + bool IsQuotaInflight() const; + + void RequestInitQuota(ui64 amount, const TActorContext& ctx); + void RequestDataQuota(ui64 amount, const TActorContext& ctx); + + bool MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx); + void RequestQuota(ui64 amount, EWakeupTag success, EWakeupTag timeout, const TActorContext& ctx); + + void OnWakeup(EWakeupTag tag); + + const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& GetMeteringMode() const; + void SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode); + + ui64 CalcRuConsumption(ui64 payloadSize); + + void Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev); + +private: + const std::optional<TString> TopicPath; + const TRlContext Ctx; + const bool Subscribe; + const TDuration WaitDuration; + + TActorId RlActor; + TActorId SubscriberId; + + TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode; +}; + +} diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 1f3beeb1cfd..36d13fe7008 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -7,6 +7,7 @@ #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/persqueue/events/global.h> +#include <ydb/core/persqueue/pq_rl_helpers.h> #include <ydb/public/lib/base/msgbus_status.h> #include <util/generic/deque.h> @@ -16,6 +17,19 @@ namespace NKikimr::NPQ { +#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR) +#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR" +#endif + + +#define LOG_PREFIX "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") " +#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message); +#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message); +#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message); +#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message); + +static const ui64 WRITE_BLOCK_SIZE = 4_KB; + TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const { return TStringBuilder() << "Success {" << " OwnerCookie: " << OwnerCookie @@ -71,7 +85,10 @@ TString TEvPartitionWriter::TEvWriteResponse::ToString() const { return out; } -class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { +class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRlHelpers { + + static constexpr size_t MAX_QUOTA_INFLIGHT = 3; + static void FillHeader(NKikimrClient::TPersQueuePartitionRequest& request, ui32 partitionId, const TActorId& pipeClient) { @@ -136,6 +153,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { for (const auto& [cookie, _] : std::exchange(PendingReserve, {})) { SendWriteResult(ErrorCode, error, MakeResponse(cookie)); } + for (const auto& [cookie, _] : std::exchange(ReceivedReserve, {})) { + SendWriteResult(ErrorCode, error, MakeResponse(cookie)); + } for (const auto& [cookie, _] : std::exchange(Pending, {})) { SendWriteResult(ErrorCode, error, MakeResponse(cookie)); } @@ -236,14 +256,14 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { STATEFN(StateGetMaxSeqNo) { switch (ev->GetTypeRewrite()) { - hFunc(TEvPersQueue::TEvResponse, HandleMaxSeqNo); + HFunc(TEvPersQueue::TEvResponse, HandleMaxSeqNo); hFunc(TEvPartitionWriter::TEvWriteRequest, HoldPending); default: return StateBase(ev); } } - void HandleMaxSeqNo(TEvPersQueue::TEvResponse::TPtr& ev) { + void HandleMaxSeqNo(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) { auto& record = ev->Get()->Record; TString error; @@ -282,7 +302,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { Become(&TThis::StateWork); if (Pending) { - ReserveBytes(); + ReserveBytes(ctx); } } @@ -329,9 +349,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { /// Work STATEFN(StateWork) { + DEBUG("Received event: " << (*ev.Get()).GetTypeName()) switch (ev->GetTypeRewrite()) { - hFunc(TEvPartitionWriter::TEvWriteRequest, Handle); + HFunc(TEvPartitionWriter::TEvWriteRequest, Handle); hFunc(TEvPersQueue::TEvResponse, Handle); + HFunc(TEvents::TEvWakeup, Handle); default: return StateBase(ev); } @@ -353,12 +375,21 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { Pending.emplace(cookie, std::move(ev->Get()->Record)); } - void Handle(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) { + void Handle(TEvPartitionWriter::TEvWriteRequest::TPtr& ev, const TActorContext& ctx) { HoldPending(ev); - ReserveBytes(); + ReserveBytes(ctx); } - void ReserveBytes() { + void ReserveBytes(const TActorContext& ctx) { + if (IsQuotaInflight()) { + return; + } + + const bool checkQuota = Opts.CheckRequestUnits() && IsQuotaRequired(); + + size_t processed = 0; + PendingQuotaAmount = 0; + while (Pending) { auto it = Pending.begin(); @@ -372,11 +403,93 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { cmd.SetSize(it->second.ByteSize()); cmd.SetLastRequest(false); + if (checkQuota) { + ++processed; + PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize()); + PendingQuota.emplace_back(it->first); + } + NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); - PendingReserve.emplace(it->first, std::move(it->second)); + PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), checkQuota }); Pending.erase(it); + + if (checkQuota && processed == MAX_QUOTA_INFLIGHT) { + break; + } + } + + if (checkQuota) { + RequestDataQuota(PendingQuotaAmount, ctx); + } + } + + void EnqueueReservedAndProcess(ui64 cookie) { + Y_VERIFY(!PendingReserve.empty()); + auto it = PendingReserve.begin(); + + Y_VERIFY(it->first == cookie); + + ReceivedReserve.emplace(it->first, std::move(it->second)); + + ProcessQuota(); + } + + void ProcessQuota() { + auto rit = ReceivedReserve.begin(); + auto qit = ReceivedQuota.begin(); + + while(rit != ReceivedReserve.end() && qit != ReceivedQuota.end()) { + auto& request = rit->second; + const auto cookie = rit->first; + TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted); + if (!request.QuotaChecked || request.QuotaAccepted) { + // A situation when a quota was not requested or was received while waiting for a reserve + Write(cookie, std::move(request.Request)); + ReceivedReserve.erase(rit++); + continue; + } + + if (cookie != *qit) { + ERROR("The order of reservation and quota requests should be the same. ReserveCookie=" << cookie << ", QuotaCookie=" << *qit); + Disconnected(TEvPartitionWriter::TEvWriteResponse::InternalError); + return; + } + + Write(cookie, std::move(request.Request)); + ReceivedReserve.erase(rit++); + ++qit; } + + while(rit != ReceivedReserve.end()) { + auto& request = rit->second; + const auto cookie = rit->first; + TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted); + if (request.QuotaChecked && !request.QuotaAccepted) { + break; + } + + // A situation when a quota was not requested or was received while waiting for a reserve + Write(cookie, std::move(request.Request)); + ReceivedReserve.erase(rit++); + } + + while(qit != ReceivedQuota.end()) { + auto cookie = *qit; + TRACE("processing quota for request cookie=" << cookie); + auto pit = PendingReserve.find(cookie); + + if (pit == PendingReserve.end()) { + ERROR("The received quota does not apply to any request. Cookie=" << *qit); + Disconnected(TEvPartitionWriter::TEvWriteResponse::InternalError); + return; + } + + pit->second.QuotaAccepted = true; + ++qit; + } + + ReceivedQuota.clear(); } void Write(ui64 cookie) { @@ -386,8 +499,14 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { Y_VERIFY(it->first == cookie); Y_VERIFY(PendingWrite.empty() || PendingWrite.back() < cookie); + Write(cookie, std::move(it->second.Request)); + + PendingReserve.erase(it); + } + + void Write(ui64 cookie, NKikimrClient::TPersQueueRequest&& req) { auto ev = MakeHolder<TEvPersQueue::TEvRequest>(); - ev->Record = std::move(it->second); + ev->Record = req; auto& request = *ev->Record.MutablePartitionRequest(); request.SetMessageNo(MessageNo++); @@ -398,7 +517,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { NTabletPipe::SendData(SelfId(), PipeClient, ev.Release()); PendingWrite.emplace_back(cookie); - PendingReserve.erase(it); } void Handle(TEvPersQueue::TEvResponse::TPtr& ev) { @@ -424,7 +542,19 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { } WriteAccepted(cookie); - Write(cookie); + + Y_VERIFY(!PendingReserve.empty()); + auto it = PendingReserve.begin(); + auto& holder = it->second; + + if ((holder.QuotaChecked && !holder.QuotaAccepted)|| !ReceivedReserve.empty()) { + // There may be two situations: + // - a quota has been requested, and the quota has not been received yet + // - the quota was not requested, for example, due to a change in the metering option, but the previous quota requests have not yet been processed + EnqueueReservedAndProcess(cookie); + } else { + Write(cookie); + } } else { if (PendingWrite.empty()) { return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, "Unexpected Write response", std::move(record)); @@ -442,13 +572,13 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { } } - void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { auto msg = ev->Get(); - LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TEvClientConnected Status " << msg->Status << ", TabletId: " << msg->TabletId << ", NodeId " << msg->ServerId.NodeId() << ", Generation: " << msg->Generation); + DEBUG("TEvClientConnected Status " << msg->Status << ", TabletId: " << msg->TabletId << ", NodeId " << msg->ServerId.NodeId() << ", Generation: " << msg->Generation); Y_VERIFY_DEBUG(msg->TabletId == TabletId); if (msg->Status != NKikimrProto::OK) { - LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with status " << ev->Get()->Status); + ERROR("received TEvClientConnected with status " << ev->Get()->Status); Disconnected(TEvPartitionWriter::TEvWriteResponse::InternalError); return; } @@ -459,22 +589,22 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { { if(*ExpectedGeneration != msg->Generation) { - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong generation. Expected: " << *ExpectedGeneration << ", received " << msg->Generation); + INFO("received TEvClientConnected with wrong generation. Expected: " << *ExpectedGeneration << ", received " << msg->Generation); Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionNotLocal); PassAway(); } if (NActors::TActivationContext::ActorSystem()->NodeId != msg->ServerId.NodeId()) { - LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong NodeId. Expected: " << NActors::TActivationContext::ActorSystem()->NodeId << ", received " << msg->ServerId.NodeId()); + INFO("received TEvClientConnected with wrong NodeId. Expected: " << NActors::TActivationContext::ActorSystem()->NodeId << ", received " << msg->ServerId.NodeId()); Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionNotLocal); PassAway(); } } } - void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { if (ev->Get()->TabletId == TabletId) { - LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientDestroyed"); + DEBUG("received TEvClientDestroyed"); Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionDisconnected); } } @@ -487,6 +617,29 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { TActorBootstrapped::PassAway(); } + void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { + const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag); + OnWakeup(tag); + switch (tag) { + case EWakeupTag::RlAllowed: + ReceivedQuota.insert(ReceivedQuota.end(), PendingQuota.begin(), PendingQuota.end()); + PendingQuota.clear(); + + ProcessQuota(); + + break; + + case EWakeupTag::RlNoResource: + // Re-requesting the quota. We do this until we get a quota. + // We do not request a quota with a long waiting time because the writer may already be a destroyer, and the quota will still be waiting to be received. + RequestDataQuota(PendingQuotaAmount, ctx); + break; + + default: + Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag)); + } + } + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::PQ_PARTITION_WRITER_ACTOR; @@ -494,18 +647,23 @@ public: explicit TPartitionWriter( const TActorId& client, + const std::optional<TString>& topicPath, ui64 tabletId, ui32 partitionId, - TMaybe<ui32> expectedGeneration, + const std::optional<ui32> expectedGeneration, const TString& sourceId, const TPartitionWriterOpts& opts) - : Client(client) + : TRlHelpers(topicPath, opts.RlCtx, WRITE_BLOCK_SIZE, !!opts.RlCtx) + , Client(client) , TabletId(tabletId) , PartitionId(partitionId) , ExpectedGeneration(expectedGeneration) , SourceId(sourceId) , Opts(opts) { + if (Opts.MeteringMode) { + SetMeteringMode(*Opts.MeteringMode); + } } void Bootstrap() { @@ -524,8 +682,9 @@ public: STATEFN(StateBase) { switch (ev->GetTypeRewrite()) { - HFunc(TEvTabletPipe::TEvClientConnected, Handle); - HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + hFunc(TSchemeBoardEvents::TEvNotifyUpdate, TRlHelpers::Handle); + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); sFunc(TEvents::TEvPoison, PassAway); } } @@ -541,7 +700,7 @@ private: const TActorId Client; const ui64 TabletId; const ui32 PartitionId; - const TMaybe<ui32> ExpectedGeneration; + const std::optional<ui32> ExpectedGeneration; const TString SourceId; const TPartitionWriterOpts Opts; @@ -550,15 +709,40 @@ private: bool Registered = false; ui64 MessageNo = 0; + struct RequestHolder { + NKikimrClient::TPersQueueRequest Request; + bool QuotaChecked; + bool QuotaAccepted; + + RequestHolder(NKikimrClient::TPersQueueRequest&& request, bool quotaChecked) + : Request(request) + , QuotaChecked(quotaChecked) + , QuotaAccepted(false) { + } + }; + TMap<ui64, NKikimrClient::TPersQueueRequest> Pending; - TMap<ui64, NKikimrClient::TPersQueueRequest> PendingReserve; + TMap<ui64, RequestHolder> PendingReserve; + TMap<ui64, RequestHolder> ReceivedReserve; + TDeque<ui64> PendingQuota; + ui64 PendingQuotaAmount; + TDeque<ui64> ReceivedQuota; TDeque<ui64> PendingWrite; TEvPartitionWriter::TEvWriteResponse::EErrors ErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrors::InternalError; }; // TPartitionWriter -IActor* CreatePartitionWriter(const TActorId& client, ui64 tabletId, ui32 partitionId, TMaybe<ui32> expectedGeneration, const TString& sourceId, const TPartitionWriterOpts& opts) { - return new TPartitionWriter(client, tabletId, partitionId, expectedGeneration, sourceId, opts); +IActor* CreatePartitionWriter(const TActorId& client, const std::optional<TString>& topicPath, ui64 tabletId, ui32 partitionId, + const std::optional<ui32> expectedGeneration, const TString& sourceId, + const TPartitionWriterOpts& opts) { + return new TPartitionWriter(client, topicPath, tabletId, partitionId, expectedGeneration, sourceId, opts); } + +#undef LOG_PREFIX +#undef TRACE +#undef DEBUG +#undef INFO +#undef ERROR + } diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h index 6a1a735ab94..fb2f8dd2602 100644 --- a/ydb/core/persqueue/writer/writer.h +++ b/ydb/core/persqueue/writer/writer.h @@ -2,8 +2,10 @@ #include <ydb/core/base/defs.h> #include <ydb/core/base/events.h> +#include <ydb/core/grpc_services/local_rate_limiter.h> #include <ydb/core/protos/msgbus.pb.h> #include <ydb/core/protos/msgbus_pq.pb.h> +#include <ydb/core/persqueue/pq_rl_helpers.h> #include <variant> @@ -81,7 +83,8 @@ struct TEvPartitionWriter { // Partition located on other node. PartitionNotLocal, // Partitition restarted. - PartitionDisconnected + PartitionDisconnected, + Overload }; struct TSuccess { @@ -124,11 +127,17 @@ struct TPartitionWriterOpts { bool AutoRegister = false; bool UseDeduplication = true; + std::optional<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode; + TRlContext RlCtx; + + bool CheckRequestUnits() const { return RlCtx; } + TPartitionWriterOpts& WithCheckState(bool value) { CheckState = value; return *this; } TPartitionWriterOpts& WithAutoRegister(bool value) { AutoRegister = value; return *this; } TPartitionWriterOpts& WithDeduplication(bool value) { UseDeduplication = value; return *this; } + TPartitionWriterOpts& WithCheckRequestUnits(const NKikimrPQ::TPQTabletConfig::EMeteringMode meteringMode , const TRlContext& rlCtx) { MeteringMode = meteringMode; RlCtx = rlCtx; return *this; } }; -IActor* CreatePartitionWriter(const TActorId& client, ui64 tabletId, ui32 partitionId, TMaybe<ui32> expectedGeneration, const TString& sourceId, +IActor* CreatePartitionWriter(const TActorId& client, const std::optional<TString>& topicPath, ui64 tabletId, ui32 partitionId, const std::optional<ui32> expectedGeneration, const TString& sourceId, const TPartitionWriterOpts& opts = {}); } diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make index 1e96ec8137a..eeab4c328c8 100644 --- a/ydb/core/persqueue/ya.make +++ b/ydb/core/persqueue/ya.make @@ -20,6 +20,7 @@ SRCS( pq_database.cpp pq_impl.cpp pq_l2_cache.cpp + pq_rl_helpers.cpp quota_tracker.cpp read_balancer.cpp account_read_quoter.cpp diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 926ed635dae..89341a2fe9b 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -281,7 +281,7 @@ public: auto opts = TPartitionWriterOpts() .WithCheckState(true) .WithAutoRegister(true); - Writer = RegisterWithSameMailbox(CreatePartitionWriter(SelfId(), ShardId, PartitionId, {}, SourceId, opts)); + Writer = RegisterWithSameMailbox(CreatePartitionWriter(SelfId(), {}, ShardId, PartitionId, {}, SourceId, opts)); Become(&TThis::StateInit); } diff --git a/ydb/core/tx/scheme_board/subscriber.cpp b/ydb/core/tx/scheme_board/subscriber.cpp index b96bb08776a..15a0cf2e1c2 100644 --- a/ydb/core/tx/scheme_board/subscriber.cpp +++ b/ydb/core/tx/scheme_board/subscriber.cpp @@ -3,6 +3,7 @@ #include "monitorable_actor.h" #include "subscriber.h" +#include <ydb/core/base/appdata.h> #include <ydb/core/base/statestorage_impl.h> #include <ydb/core/base/tabletid.h> #include <ydb/core/protos/scheme_board.pb.h> @@ -1054,6 +1055,18 @@ public: IActor* CreateSchemeBoardSubscriber( const TActorId& owner, + const TString& path +) { + auto& domains = AppData()->DomainsInfo->Domains; + Y_VERIFY(!domains.empty()); + auto& domain = domains.begin()->second; + ui32 schemeBoardGroup = domain->DefaultSchemeBoardGroup; + ui64 domainOwnerId = domain->SchemeRoot; + return CreateSchemeBoardSubscriber(owner, path, schemeBoardGroup, domainOwnerId); +} + +IActor* CreateSchemeBoardSubscriber( + const TActorId& owner, const TString& path, const ui64 stateStorageGroup, const ui64 domainOwnerId diff --git a/ydb/core/tx/scheme_board/subscriber.h b/ydb/core/tx/scheme_board/subscriber.h index 8e6cd2ffa46..e531dd8d123 100644 --- a/ydb/core/tx/scheme_board/subscriber.h +++ b/ydb/core/tx/scheme_board/subscriber.h @@ -10,6 +10,11 @@ namespace NKikimr { IActor* CreateSchemeBoardSubscriber( const TActorId& owner, + const TString& path +); + +IActor* CreateSchemeBoardSubscriber( + const TActorId& owner, const TString& path, const ui64 stateStorageGroup, const ui64 domainOwnerId diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index cf0b456e8db..c200de3d450 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -9,10 +9,10 @@ #include <ydb/core/grpc_services/rpc_deferrable.h> #include <ydb/core/grpc_services/rpc_scheme_base.h> #include <ydb/core/persqueue/partition.h> +#include <ydb/core/persqueue/pq_rl_helpers.h> #include <ydb/core/persqueue/write_meta.h> #include <ydb/public/api/protos/ydb_topic.pb.h> -#include <ydb/services/lib/actors/pq_rl_helpers.h> #include <ydb/services/lib/actors/pq_schema_actor.h> #include <ydb/services/lib/sharding/sharding.h> #include <ydb/services/persqueue_v1/actors/persqueue_utils.h> @@ -1312,7 +1312,7 @@ namespace NKikimr::NDataStreams::V1 { //----------------------------------------------------------------------------------- class TGetRecordsActor : public TPQGrpcSchemaBase<TGetRecordsActor, TEvDataStreamsGetRecordsRequest> - , private TRlHelpers + , private NPQ::TRlHelpers , public TCdcStreamCompatible { using TBase = TPQGrpcSchemaBase<TGetRecordsActor, TEvDataStreamsGetRecordsRequest>; @@ -1352,7 +1352,7 @@ namespace NKikimr::NDataStreams::V1 { : TBase(request, TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).IsValid() ? TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).GetStreamName() : "undefined") - , TRlHelpers(request, 8_KB, TDuration::Seconds(1)) + , TRlHelpers({}, request, 8_KB, false, TDuration::Seconds(1)) , ShardIterator{GetRequest<TProtoRequest>(request)->shard_iterator()} , StreamName{ShardIterator.IsValid() ? ShardIterator.GetStreamName() : "undefined"} , TabletId{0} diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h index 4e650800bcc..fa0e4fb7a48 100644 --- a/ydb/services/datastreams/put_records_actor.h +++ b/ydb/services/datastreams/put_records_actor.h @@ -5,11 +5,11 @@ #include <ydb/core/persqueue/events/global.h> #include <ydb/core/persqueue/utils.h> +#include <ydb/core/persqueue/pq_rl_helpers.h> #include <ydb/core/persqueue/write_meta.h> #include <ydb/core/protos/msgbus_pq.pb.h> #include <ydb/core/protos/grpc_pq_old.pb.h> -#include <ydb/services/lib/actors/pq_rl_helpers.h> #include <ydb/services/lib/actors/pq_schema_actor.h> #include <ydb/services/lib/sharding/sharding.h> @@ -213,7 +213,7 @@ namespace NKikimr::NDataStreams::V1 { template<class TDerived, class TProto> class TPutRecordsActorBase : public NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto> - , private NGRpcProxy::V1::TRlHelpers + , private NPQ::TRlHelpers { using TBase = NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto>; @@ -261,7 +261,7 @@ namespace NKikimr::NDataStreams::V1 { template<class TDerived, class TProto> TPutRecordsActorBase<TDerived, TProto>::TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request) : TBase(request, dynamic_cast<const typename TProto::TRequest*>(request->GetRequest())->stream_name()) - , TRlHelpers(request, 4_KB, TDuration::Seconds(1)) + , TRlHelpers({}, request, 4_KB, false, TDuration::Seconds(1)) , Ip(request->GetPeerName()) { Y_ENSURE(request); diff --git a/ydb/services/lib/actors/CMakeLists.darwin-x86_64.txt b/ydb/services/lib/actors/CMakeLists.darwin-x86_64.txt index 43f92f2ff06..887af75044f 100644 --- a/ydb/services/lib/actors/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/lib/actors/CMakeLists.darwin-x86_64.txt @@ -27,5 +27,4 @@ target_link_libraries(services-lib-actors PUBLIC ) target_sources(services-lib-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_schema_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_rl_helpers.cpp ) diff --git a/ydb/services/lib/actors/CMakeLists.linux-aarch64.txt b/ydb/services/lib/actors/CMakeLists.linux-aarch64.txt index 0c6cdd98c9a..c0ad2ebb2b5 100644 --- a/ydb/services/lib/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/services/lib/actors/CMakeLists.linux-aarch64.txt @@ -28,5 +28,4 @@ target_link_libraries(services-lib-actors PUBLIC ) target_sources(services-lib-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_schema_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_rl_helpers.cpp ) diff --git a/ydb/services/lib/actors/CMakeLists.linux-x86_64.txt b/ydb/services/lib/actors/CMakeLists.linux-x86_64.txt index 0c6cdd98c9a..c0ad2ebb2b5 100644 --- a/ydb/services/lib/actors/CMakeLists.linux-x86_64.txt +++ b/ydb/services/lib/actors/CMakeLists.linux-x86_64.txt @@ -28,5 +28,4 @@ target_link_libraries(services-lib-actors PUBLIC ) target_sources(services-lib-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_schema_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_rl_helpers.cpp ) diff --git a/ydb/services/lib/actors/CMakeLists.windows-x86_64.txt b/ydb/services/lib/actors/CMakeLists.windows-x86_64.txt index 43f92f2ff06..887af75044f 100644 --- a/ydb/services/lib/actors/CMakeLists.windows-x86_64.txt +++ b/ydb/services/lib/actors/CMakeLists.windows-x86_64.txt @@ -27,5 +27,4 @@ target_link_libraries(services-lib-actors PUBLIC ) target_sources(services-lib-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_schema_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_rl_helpers.cpp ) diff --git a/ydb/services/lib/actors/pq_rl_helpers.cpp b/ydb/services/lib/actors/pq_rl_helpers.cpp deleted file mode 100644 index 8a7e8bf5abf..00000000000 --- a/ydb/services/lib/actors/pq_rl_helpers.cpp +++ /dev/null @@ -1,70 +0,0 @@ -#include "pq_rl_helpers.h" - -#include <ydb/core/grpc_services/base/base.h> - -namespace NKikimr::NGRpcProxy::V1 { - -TRlHelpers::TRlHelpers(NGRpcService::IRequestCtxBase* reqCtx, ui64 blockSize, const TDuration& waitDuration) - : TStreamRequestUnitsCalculator(blockSize) - , Request(reqCtx) - , WaitDuration(waitDuration) -{ - Y_VERIFY(Request); -} - -bool TRlHelpers::IsQuotaRequired() const { - Y_VERIFY(MeteringMode.Defined()); - return MeteringMode == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS && Request->GetRlPath(); -} - -bool TRlHelpers::MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx) { - if (RlActor) { - return false; - } - - const auto selfId = ctx.SelfID; - const auto as = ctx.ActorSystem(); - - auto onSendAllowed = [selfId, as, tag]() { - as->Send(selfId, new TEvents::TEvWakeup(tag)); - }; - - auto onSendTimeout = [selfId, as]() { - as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::RlNoResource)); - }; - - RlActor = NRpcService::RateLimiterAcquireUseSameMailbox( - *Request, amount, WaitDuration, - std::move(onSendAllowed), std::move(onSendTimeout), ctx); - return true; -} - -void TRlHelpers::OnWakeup(EWakeupTag tag) { - switch (tag) { - case EWakeupTag::RlInit: - case EWakeupTag::RlAllowed: - case EWakeupTag::RlNoResource: - RlActor = {}; - break; - default: - break; - } -} - -const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& TRlHelpers::GetMeteringMode() const { - return MeteringMode; -} - -void TRlHelpers::SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode) { - MeteringMode = mode; -} - -ui64 TRlHelpers::CalcRuConsumption(ui64 payloadSize) { - if (!IsQuotaRequired()) { - return 0; - } - - return CalcConsumption(payloadSize); -} - -} diff --git a/ydb/services/lib/actors/pq_rl_helpers.h b/ydb/services/lib/actors/pq_rl_helpers.h deleted file mode 100644 index cf002033998..00000000000 --- a/ydb/services/lib/actors/pq_rl_helpers.h +++ /dev/null @@ -1,40 +0,0 @@ -#pragma once - -#include <ydb/core/grpc_services/local_rate_limiter.h> -#include <ydb/core/metering/stream_ru_calculator.h> -#include <ydb/core/protos/pqconfig.pb.h> - -#include <util/datetime/base.h> - -namespace NKikimr::NGRpcProxy::V1 { - -class TRlHelpers: public NMetering::TStreamRequestUnitsCalculator { -public: - explicit TRlHelpers(NGRpcService::IRequestCtxBase* reqCtx, ui64 blockSize, const TDuration& waitDuration); - -protected: - enum EWakeupTag: ui64 { - RlInit = 0, - RlAllowed = 1, - RlNoResource = 2, - RecheckAcl = 3, - }; - - bool IsQuotaRequired() const; - bool MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx); - void OnWakeup(EWakeupTag tag); - - const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& GetMeteringMode() const; - void SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode); - - ui64 CalcRuConsumption(ui64 payloadSize); - -private: - NGRpcService::IRequestCtxBase* const Request; - const TDuration WaitDuration; - - TActorId RlActor; - TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode; -}; - -} diff --git a/ydb/services/lib/actors/ya.make b/ydb/services/lib/actors/ya.make index 19e4db930b9..af0977f0001 100644 --- a/ydb/services/lib/actors/ya.make +++ b/ydb/services/lib/actors/ya.make @@ -2,7 +2,6 @@ LIBRARY() SRCS( pq_schema_actor.cpp - pq_rl_helpers.cpp ) PEERDIR( diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h index a4c8dd4078c..d13fef3e6bb 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.h +++ b/ydb/services/persqueue_v1/actors/read_session_actor.h @@ -7,7 +7,7 @@ #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/persqueue/events/global.h> -#include <ydb/services/lib/actors/pq_rl_helpers.h> +#include <ydb/core/persqueue/pq_rl_helpers.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> @@ -121,7 +121,7 @@ struct TFormedReadResponse: public TSimpleRefCount<TFormedReadResponse<TServerMe template <bool UseMigrationProtocol> // Migration protocol is "pqv1" class TReadSessionActor : public TActorBootstrapped<TReadSessionActor<UseMigrationProtocol>> - , private TRlHelpers + , private NPQ::TRlHelpers { using TClientMessage = typename std::conditional_t<UseMigrationProtocol, PersQueue::V1::MigrationStreamingReadClientMessage, diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp index ceeaa0221cb..9f99af51a86 100644 --- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp @@ -32,7 +32,7 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor( TIntrusivePtr<NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC, const NPersQueue::TTopicsListController& topicsHandler) - : TRlHelpers(request, READ_BLOCK_SIZE, TDuration::Minutes(1)) + : TRlHelpers({}, request, READ_BLOCK_SIZE, false) , Request(request) , ClientDC(clientDC.GetOrElse("other")) , StartTimestamp(TInstant::Now()) @@ -1909,6 +1909,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e break; case EWakeupTag::RlNoResource: + case EWakeupTag::RlInitNoResource: if (PendingQuota) { Y_VERIFY(MaybeRequestQuota(PendingQuota->RequiredQuota, EWakeupTag::RlAllowed, ctx)); } else { diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index c7ea6e863c5..bc679a850a1 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -10,10 +10,10 @@ #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/persqueue/events/global.h> +#include <ydb/core/persqueue/pq_rl_helpers.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/persqueue/writer/writer.h> #include <ydb/core/protos/grpc_pq_old.pb.h> -#include <ydb/services/lib/actors/pq_rl_helpers.h> #include <ydb/services/metadata/service.h> @@ -26,7 +26,7 @@ inline TActorId GetPQWriteServiceActorID() { template<bool UseMigrationProtocol> class TWriteSessionActor : public NActors::TActorBootstrapped<TWriteSessionActor<UseMigrationProtocol>> - , private TRlHelpers + , private NPQ::TRlHelpers { using TSelf = TWriteSessionActor<UseMigrationProtocol>; using TClientMessage = std::conditional_t<UseMigrationProtocol, PersQueue::V1::StreamingWriteClientMessage, @@ -228,7 +228,7 @@ private: NPersQueue::TTopicConverterPtr FullConverter; ui32 Partition; ui32 PreferedPartition; - TMaybe<ui32> ExpectedGeneration; + std::optional<ui32> ExpectedGeneration; bool PartitionFound = false; // 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1 diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 0aef6cfbf41..0b06d0be568 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -177,7 +177,7 @@ TWriteSessionActor<UseMigrationProtocol>::TWriteSessionActor( TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC, const NPersQueue::TTopicsListController& topicsController ) - : TRlHelpers(request, WRITE_BLOCK_SIZE, TDuration::Minutes(1)) + : TRlHelpers({}, request, WRITE_BLOCK_SIZE, false) , Request(request) , State(ES_CREATED) , SchemeCache(schemeCache) @@ -1034,7 +1034,7 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti } Writer = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter( - ctx.SelfID, PartitionTabletId, Partition, ExpectedGeneration, + ctx.SelfID, {}, PartitionTabletId, Partition, ExpectedGeneration, SourceId, TPartitionWriterOpts().WithDeduplication(UseDeduplication) )); State = ES_WAIT_WRITER_INIT; @@ -1771,6 +1771,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& break; case EWakeupTag::RlNoResource: + case EWakeupTag::RlInitNoResource: if (PendingQuotaRequest) { Y_VERIFY(MaybeRequestQuota(PendingQuotaRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)); } else { |