aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-29 15:09:38 +0300
committertesseract <tesseract@yandex-team.com>2023-08-29 18:44:48 +0300
commitccd68bc7ecf30f86692670b1c303d02f8758f600 (patch)
tree2293f328608eb5b2d37a49b65704d2ddd79f0821
parentb906b72cdf260008a312be3b4717a4593dc26786 (diff)
downloadydb-ccd68bc7ecf30f86692670b1c303d02f8758f600.tar.gz
Count RequestUnits for Kafka protocol
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp7
-rw-r--r--ydb/core/kafka_proxy/actors/actors.h3
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp23
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.h2
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp96
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h23
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp1
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h32
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.h35
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp69
-rw-r--r--ydb/core/persqueue/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/persqueue/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/persqueue/pq_rl_helpers.cpp117
-rw-r--r--ydb/core/persqueue/pq_rl_helpers.h88
-rw-r--r--ydb/core/persqueue/writer/writer.cpp238
-rw-r--r--ydb/core/persqueue/writer/writer.h13
-rw-r--r--ydb/core/persqueue/ya.make1
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp2
-rw-r--r--ydb/core/tx/scheme_board/subscriber.cpp13
-rw-r--r--ydb/core/tx/scheme_board/subscriber.h5
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp6
-rw-r--r--ydb/services/datastreams/put_records_actor.h6
-rw-r--r--ydb/services/lib/actors/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/lib/actors/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/lib/actors/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/lib/actors/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/lib/actors/pq_rl_helpers.cpp70
-rw-r--r--ydb/services/lib/actors/pq_rl_helpers.h40
-rw-r--r--ydb/services/lib/actors/ya.make1
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.h4
-rw-r--r--ydb/services/persqueue_v1/actors/read_session_actor.ipp3
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h6
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp5
35 files changed, 714 insertions, 203 deletions
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index e10d52aaf60..25a270d5666 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -480,12 +480,7 @@ void TGRpcRequestProxyImpl::ForgetDatabase(const TString& database) {
void TGRpcRequestProxyImpl::SubscribeToDatabase(const TString& database) {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::GRPC_SERVER, "Subscribe to " << database);
- Y_VERIFY(!AppData()->DomainsInfo->Domains.empty());
- auto& domain = AppData()->DomainsInfo->Domains.begin()->second;
- ui64 domainOwnerId = domain->SchemeRoot;
- ui32 schemeBoardGroup = domain->DefaultSchemeBoardGroup;
- THolder<IActor> subscriber{CreateSchemeBoardSubscriber(SelfId(), database, schemeBoardGroup, domainOwnerId)};
- TActorId subscriberId = Register(subscriber.Release());
+ TActorId subscriberId = Register(CreateSchemeBoardSubscriber(SelfId(), database));
auto itSubscriber = Subscribers.emplace(database, subscriberId);
if (!itSubscriber.second) {
Send(itSubscriber.first->second, new TEvents::TEvPoisonPill());
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h
index 1b98abe4d9f..526328bddb9 100644
--- a/ydb/core/kafka_proxy/actors/actors.h
+++ b/ydb/core/kafka_proxy/actors/actors.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <ydb/core/protos/config.pb.h>
#include <ydb/library/aclib/aclib.h>
@@ -33,6 +34,8 @@ struct TContext {
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TString ClientDC;
+ NKikimr::NPQ::TRlContext RlContext;
+
bool Authenticated() { return AuthenticationStep == SUCCESS; }
};
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
index 908446b0326..11b3cd582f6 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
@@ -19,6 +19,13 @@ NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context) {
return new TKafkaProduceActor(context);
}
+TString NormalizePath(const TString& database, const TString& topic) {
+ if (topic.Size() > database.Size() && topic.at(database.Size()) == '/' && topic.StartsWith(database)) {
+ return topic;
+ }
+ return CanonizePath(database + "/" + topic);
+}
+
TString TKafkaProduceActor::LogPrefix() {
TStringBuilder sb;
sb << "TKafkaProduceActor " << SelfId() << " State: ";
@@ -115,12 +122,14 @@ void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResu
auto* navigate = ev.Get()->Get()->Request.Get();
for (auto& info : navigate->ResultSet) {
if (NSchemeCache::TSchemeCacheNavigate::EStatus::Ok == info.Status) {
- auto topicPath = "/" + NKikimr::JoinPath(info.Path);
+ auto topicPath = CanonizePath(NKikimr::JoinPath(info.Path));
KAFKA_LOG_D("Received topic '" << topicPath << "' description");
TopicsForInitialization.erase(topicPath);
auto& topic = Topics[topicPath];
+ topic.MeteringMode = info.PQGroupInfo->Description.GetPQTabletConfig().GetMeteringMode();
+
if (info.SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, *Context->UserToken)) {
topic.Status = OK;
topic.ExpirationTime = now + TOPIC_OK_EXPIRATION_INTERVAL;
@@ -222,7 +231,7 @@ size_t TKafkaProduceActor::EnqueueInitialization() {
for(const auto& e : Requests) {
auto* r = e->Get()->Request;
for(const auto& topicData : r->TopicData) {
- const auto& topicPath = *topicData.Name;
+ const auto& topicPath = NormalizePath(Context->Database, *topicData.Name);
if (!Topics.contains(topicPath)) {
requireInitialization = true;
TopicsForInitialization.insert(topicPath);
@@ -315,7 +324,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const T
size_t position = 0;
for(const auto& topicData : r->TopicData) {
- const TString& topicPath = *topicData.Name;
+ const TString& topicPath = NormalizePath(Context->Database, *topicData.Name);
for(const auto& partitionData : topicData.PartitionData) {
const auto partitionId = partitionData.Index;
@@ -470,11 +479,13 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
partitionResponse.Index = partitionData.Index;
if (EKafkaErrors::NONE_ERROR != result.ErrorCode) {
+ KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode) << ", ErrorMessage=" << result.ErrorMessage);
partitionResponse.ErrorCode = result.ErrorCode;
partitionResponse.ErrorMessage = result.ErrorMessage;
} else {
auto* msg = result.Value->Get();
if (msg->IsSuccess()) {
+ KAFKA_LOG_T("Partition result success.");
partitionResponse.ErrorCode = EKafkaErrors::NONE_ERROR;
auto& writeResults = msg->Record.GetPartitionResponse().GetCmdWriteResult();
@@ -484,6 +495,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
partitionResponse.BaseOffset = lastResult.GetSeqNo();
}
} else {
+ KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason);
partitionResponse.ErrorCode = Convert(msg->GetError().Code);
partitionResponse.ErrorMessage = msg->GetError().Reason;
}
@@ -560,7 +572,10 @@ std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::Partit
}
auto tabletId = pit->second;
- auto* writerActor = CreatePartitionWriter(SelfId(), tabletId, partitionId, {}, SourceId, TPartitionWriterOpts().WithDeduplication(false));
+ TPartitionWriterOpts opts;
+ opts.WithDeduplication(false)
+ .WithCheckRequestUnits(topicInfo.MeteringMode, Context->RlContext);
+ auto* writerActor = CreatePartitionWriter(SelfId(), topicPath, tabletId, partitionId, {/*expectedGeneration*/}, SourceId, opts);
auto& writerInfo = partitionWriters[partitionId];
writerInfo.ActorId = ctx.RegisterWithSameMailbox(writerActor);
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
index e6ee6990009..b37c6890963 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
@@ -177,6 +177,8 @@ private:
ETopicStatus Status = OK;
TInstant ExpirationTime;
+ NKikimrPQ::TPQTabletConfig::EMeteringMode MeteringMode;
+
// partitioId -> tabletId
std::unordered_map<ui32, ui64> partitions;
};
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
index 0ef26fed5f9..e0c3bab0ce8 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
@@ -1,7 +1,9 @@
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
+#include <ydb/core/base/path.h>
#include <ydb/core/base/ticket_parser.h>
#include <ydb/core/kafka_proxy/kafka_events.h>
+#include <ydb/core/tx/scheme_board/subscriber.h>
#include <library/cpp/actors/core/actor.h>
#include "kafka_sasl_auth_actor.h"
@@ -33,13 +35,21 @@ void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) {
StartPlainAuth(ctx);
}
+void TKafkaSaslAuthActor::PassAway() {
+ if (SubscriberId) {
+ Send(SubscriberId, new TEvents::TEvPoison());
+ }
+ TActorBootstrapped::PassAway();
+}
+
void TKafkaSaslAuthActor::StartPlainAuth(const NActors::TActorContext& ctx) {
TAuthData authData;
if (!TryParseAuthDataTo(authData, ctx)) {
return;
}
- Database = authData.Database;
+ Database = CanonizePath(authData.Database);
SendLoginRequest(authData, ctx);
+ SendDescribeRequest(ctx);
}
void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx) {
@@ -48,17 +58,11 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketRes
return;
}
- auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>();
- responseToClient->ErrorCode = EKafkaErrors::NONE_ERROR;
- responseToClient->ErrorMessage = "";
- responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES));
+ Authentificated = true;
- auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
+ Token = ev->Get()->Token;
- auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, ev->Get()->Token, Database);
- Send(Context->ConnectionId, authResult);
-
- Die(ctx);
+ ReplyIfReady(ctx);
}
void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& /*ctx*/) {
@@ -69,6 +73,31 @@ void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NAct
}));
}
+void TKafkaSaslAuthActor::ReplyIfReady(const NActors::TActorContext& ctx) {
+ if (!Authentificated || !Described) {
+ return;
+ }
+
+ KAFKA_LOG_D("Authentificated success. Database='" << Database << "', "
+ << "FolderId='" << FolderId << "', "
+ << "ServiceAccountId='" << ServiceAccountId << "', "
+ << "DatabaseId='" << DatabaseId << "', "
+ << "Coordinator='" << Coordinator << "', "
+ << "ResourcePath='" << ResourcePath << "'");
+
+ auto responseToClient = std::make_shared<TSaslAuthenticateResponseData>();
+ responseToClient->ErrorCode = EKafkaErrors::NONE_ERROR;
+ responseToClient->ErrorMessage = "";
+ responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES));
+
+ auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
+
+ auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, Token, Database, FolderId, ServiceAccountId, DatabaseId, Coordinator, ResourcePath);
+ Send(Context->ConnectionId, authResult);
+
+ Die(ctx);
+}
+
void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx) {
SendAuthFailedAndDie(EKafkaErrors::SASL_AUTHENTICATION_FAILED, "", ev->Get()->ErrorMessage, ctx);
}
@@ -111,7 +140,7 @@ void TKafkaSaslAuthActor::SendAuthFailedAndDie(EKafkaErrors errorCode, const TSt
responseToClient->AuthBytes = TKafkaRawBytes(ERROR_AUTH_BYTES, sizeof(ERROR_AUTH_BYTES));
auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient);
- auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, nullptr, "", errorMessage);
+ auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::FAILED, evResponse, errorMessage);
Send(Context->ConnectionId, authResult);
Die(ctx);
@@ -125,7 +154,6 @@ void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authDa
using TRpcEv = NKikimr::NGRpcService::TGRpcRequestWrapperNoAuth<NKikimr::NGRpcService::TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>;
auto rpcFuture = NKikimr::NRpcService::DoLocalRpc<TRpcEv>(std::move(request), authData.Database, {}, actorSystem);
-
rpcFuture.Subscribe([authData, actorSystem, selfId = SelfId()](const NThreading::TFuture<Ydb::Auth::LoginResponse>& future) {
auto& response = future.GetValueSync();
@@ -146,4 +174,48 @@ void TKafkaSaslAuthActor::SendLoginRequest(TKafkaSaslAuthActor::TAuthData authDa
});
}
+void TKafkaSaslAuthActor::SendDescribeRequest(const NActors::TActorContext& /*ctx*/) {
+ if (Database.Empty()) {
+ Described = true;
+ return;
+ }
+
+ KAFKA_LOG_D("Describe database '" << Database << "'");
+ SubscriberId = Register(CreateSchemeBoardSubscriber(SelfId(), Database));
+}
+
+void TKafkaSaslAuthActor::Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev, const TActorContext& ctx) {
+ Described = true;
+
+ auto* result = ev->Get();
+ auto status = result->DescribeSchemeResult.GetStatus();
+ if (status != NKikimrScheme::EStatus::StatusSuccess) {
+ KAFKA_LOG_ERROR("Describe database '" << Database << "' error: " << status);
+ ReplyIfReady(ctx);
+ return;
+ }
+
+ for(const auto& attr : result->DescribeSchemeResult.GetPathDescription().GetUserAttributes()) {
+ const auto& key = attr.GetKey();
+ const auto& value = attr.GetValue();
+
+ KAFKA_LOG_D("Database attribute key=" << key << ", value=" << value);
+
+ if (key == "folder_id") {
+ FolderId = value;
+ } else if (key == "service_account_id") {
+ ServiceAccountId = value;
+ } else if (key == "database_id") {
+ DatabaseId = value;
+ } else if (key == "serverless_rt_coordination_node_path") {
+ Coordinator = value;
+ } else if (key == "serverless_rt_base_resource_ru") {
+ ResourcePath = value;
+ }
+ }
+
+ ReplyIfReady(ctx);
+}
+
+
} // NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
index 85d516b3759..8c3007096f5 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
@@ -2,6 +2,8 @@
#include "ydb/library/aclib/aclib.h"
#include <ydb/core/base/ticket_parser.h>
+#include <ydb/core/tx/scheme_board/events.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -9,6 +11,8 @@
namespace NKafka {
+using namespace NKikimr;
+
class TKafkaSaslAuthActor: public NActors::TActorBootstrapped<TKafkaSaslAuthActor> {
struct TEvPrivate {
@@ -48,22 +52,31 @@ public:
private:
STATEFN(StateWork) {
+ KAFKA_LOG_T("Received event: " << (*ev.Get()).GetTypeName());
switch (ev->GetTypeRewrite()) {
HFunc(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult, Handle);
HFunc(TEvPrivate::TEvTokenReady, Handle);
HFunc(TEvPrivate::TEvAuthFailed, Handle);
+ HFunc(TSchemeBoardEvents::TEvNotifyUpdate, Handle);
}
}
void Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx);
void Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& ctx);
void Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx);
+ void Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev, const TActorContext& ctx);
void StartPlainAuth(const NActors::TActorContext& ctx);
void SendLoginRequest(TKafkaSaslAuthActor::TAuthData authData, const NActors::TActorContext& ctx);
+ void SendDescribeRequest(const NActors::TActorContext& ctx);
void SendAuthFailedAndDie(EKafkaErrors errorCode, const TString& errorMessage, const TString& details, const NActors::TActorContext& ctx);
bool TryParseAuthDataTo(TKafkaSaslAuthActor::TAuthData& authData, const NActors::TActorContext& ctx);
+ void ReplyIfReady(const NActors::TActorContext& ctx);
+
+protected:
+ void PassAway() override;
+
private:
const TContext::TPtr Context;
const ui64 CorrelationId;
@@ -72,6 +85,16 @@ private:
const NKikimr::NRawSocket::TNetworkConfig::TSocketAddressType Address;
TString Database;
+ TIntrusiveConstPtr<NACLib::TUserToken> Token;
+ TString FolderId;
+ TString ServiceAccountId;
+ TString DatabaseId;
+ TString Coordinator;
+ TString ResourcePath;
+
+ bool Authentificated = false;
+ bool Described = false;
+ TActorId SubscriberId;
};
} // NKafka
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index bc951d1b95f..696d0799814 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -290,6 +290,7 @@ protected:
Context->UserToken = event->UserToken;
Context->Database = event->Database;
Context->AuthenticationStep = authStep;
+ Context->RlContext = {event->Coordinator, event->ResourcePath, event->Database, event->UserToken->GetSerializedToken()};
KAFKA_LOG_D("Authentificated successful. SID=" << Context->UserToken->GetUserSID());
}
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
index 8b70b059308..f9577438dfb 100644
--- a/ydb/core/kafka_proxy/kafka_events.h
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -49,17 +49,35 @@ struct TEvKafka {
};
struct TEvAuthResult : public TEventLocal<TEvAuthResult, EvAuthResult> {
- TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TIntrusiveConstPtr<NACLib::TUserToken> token, TString database, TString error = "")
- : AuthStep(authStep),
- UserToken(token),
- Database(database),
- Error(error),
- ClientResponse(std::move(clientResponse))
- {}
+ TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TString error = "")
+ : AuthStep(authStep)
+ , Error(error)
+ , ClientResponse(clientResponse) {
+ }
+
+ TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TIntrusiveConstPtr<NACLib::TUserToken> token, TString database,
+ TString folderId, TString serviceAccountId, TString databaseId, TString coordinator, TString resourcePath, TString error = "")
+ : AuthStep(authStep)
+ , UserToken(token)
+ , Database(database)
+ , FolderId(folderId)
+ , ServiceAccountId(serviceAccountId)
+ , DatabaseId(databaseId)
+ , Coordinator(coordinator)
+ , ResourcePath(resourcePath)
+ , Error(error)
+ , ClientResponse(std::move(clientResponse)) {
+ }
EAuthSteps AuthStep;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TString Database;
+ TString FolderId;
+ TString ServiceAccountId;
+ TString DatabaseId;
+ TString Coordinator;
+ TString ResourcePath;
+
TString Error;
TString SaslMechanism;
std::shared_ptr<TEvKafka::TEvResponse> ClientResponse;
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h
index 890b19ede5f..7461d0c78d1 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.h
+++ b/ydb/core/kafka_proxy/kafka_messages_int.h
@@ -356,7 +356,7 @@ public:
ythrow yexception() << "string field " << Meta::Name << " is too long to be serialized " << v.length();
}
if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
- return v.length() + SizeOfUnsignedVarint(v.length() + sizeof(TKafkaInt8));
+ return v.length() + SizeOfUnsignedVarint(v.length() + 1);
} else {
return v.length() + sizeof(TKafkaInt16);
}
@@ -450,7 +450,7 @@ class TypeStrategy<Meta, TKafkaRecords, TKafkaRecordsDesc> {
public:
inline static void DoWrite(TKafkaWritable& writable, TKafkaVersion version, const TKafkaRecords& value) {
if (value) {
- WriteArraySize<Meta>(writable, version, DoSize(CURRENT_RECORD_VERSION, value));
+ WriteArraySize<Meta>(writable, version, value->Size(CURRENT_RECORD_VERSION));
(*value).Write(writable, CURRENT_RECORD_VERSION);
} else {
WriteArraySize<Meta>(writable, version, 0);
@@ -490,11 +490,21 @@ public:
}
}
- inline static i64 DoSize(TKafkaVersion /*version*/, const TKafkaRecords& value) {
+ inline static i64 DoSize(TKafkaVersion version, const TKafkaRecords& value) {
if (value) {
- return (*value).Size(CURRENT_RECORD_VERSION);
+ const auto& v = *value;
+ const auto size = v.Size(CURRENT_RECORD_VERSION);
+ if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
+ return size + SizeOfUnsignedVarint(size + 1);
+ } else {
+ return size + sizeof(TKafkaInt32);
+ }
} else {
- return 0;
+ if (VersionCheck<Meta::FlexibleVersions.Min, Meta::FlexibleVersions.Max>(version)) {
+ return 1;
+ } else {
+ return sizeof(TKafkaInt32);
+ }
}
}
@@ -614,13 +624,24 @@ inline void Size(TSizeCollector& collector, TKafkaInt16 version, const typename
i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
collector.Size += size + SizeOfUnsignedVarint(Meta::Tag) + SizeOfUnsignedVarint(size);
+ if constexpr (DEBUG_ENABLED) {
+ Cerr << "Size of field '" << Meta::Name << "' " << size << " + " << SizeOfUnsignedVarint(Meta::Tag) << " + " << SizeOfUnsignedVarint(size) << Endl;
+ }
}
} else if (VersionCheck<Meta::PresentVersions.Min, Meta::PresentVersions.Max>(version)) {
- collector.Size += TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
+ i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
+ collector.Size += size;
+ if constexpr (DEBUG_ENABLED) {
+ Cerr << "Size of field '" << Meta::Name << "' " << size << Endl;
+ }
}
} else {
if (VersionCheck<Meta::PresentVersions.Min, Meta::PresentVersions.Max>(version)) {
- collector.Size += TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
+ i64 size = TypeStrategy<Meta, typename Meta::Type>::DoSize(version, value);
+ collector.Size += size;
+ if constexpr (DEBUG_ENABLED) {
+ Cerr << "Size of field '" << Meta::Name << "' " << size << Endl;
+ }
}
}
}
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index 4d4c5724e9e..ac633aa3cc5 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -125,7 +125,7 @@ public:
nullptr,
0)
);
- KikimrServer->GetRuntime()->SetLogPriority(NKikimrServices::KAFKA_PROXY, NActors::NLog::PRI_DEBUG);
+ KikimrServer->GetRuntime()->SetLogPriority(NKikimrServices::KAFKA_PROXY, NActors::NLog::PRI_TRACE);
ui16 grpc = KikimrServer->GetPort();
TString location = TStringBuilder() << "localhost:" << grpc;
@@ -169,7 +169,7 @@ using TInsecureTestServer = TTestServer<TKikimrWithGrpcAndRootSchema, false>;
using TSecureTestServer = TTestServer<TKikimrWithGrpcAndRootSchemaSecure, true>;
void Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version) {
- TWritableBuf sb(nullptr, request->Size(version));
+ TWritableBuf sb(nullptr, request->Size(version) + 1000);
TKafkaWritable writable(sb);
request->Write(writable, version);
so.Write(sb.Data(), sb.Size());
@@ -221,7 +221,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
Y_UNIT_TEST(ProduceScenario) {
TInsecureTestServer testServer;
- TString topicName = "topic-0-test";
+ TString topicName = "/Root/topic-0-test";
{
NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
@@ -235,13 +235,15 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TSocketOutput so(s);
TSocketInput si(s);
+ size_t correlationId = 0;
+
{
Cerr << ">>>>> ApiVersionsRequest\n";
TRequestHeaderData header;
header.RequestApiKey = NKafka::EApiKey::API_VERSIONS;
header.RequestApiVersion = 2;
- header.CorrelationId = 0;
+ header.CorrelationId = correlationId++;
header.ClientId = "test";
TApiVersionsRequestData request;
@@ -263,7 +265,7 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
TRequestHeaderData header;
header.RequestApiKey = NKafka::EApiKey::SASL_HANDSHAKE;
header.RequestApiVersion = 1;
- header.CorrelationId = 1;
+ header.CorrelationId = correlationId++;
header.ClientId = "test";
TSaslHandshakeRequestData request;
@@ -280,13 +282,13 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
}
{
- Cerr << ">>>>> SaslAuthenticateRequestData";
+ Cerr << ">>>>> SaslAuthenticateRequestData\n";
char authBytes[] = "ignored\0ouruser@/Root\0ourUserPassword";
TRequestHeaderData header;
header.RequestApiKey = NKafka::EApiKey::SASL_AUTHENTICATE;
header.RequestApiVersion = 2;
- header.CorrelationId = 2;
+ header.CorrelationId = correlationId++;
header.ClientId = "test";
TSaslAuthenticateRequestData request;
@@ -295,10 +297,61 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
Write(so, &header, &request);
auto response = Read(si, &header);
- Y_UNUSED(response);
auto* msg = dynamic_cast<TSaslAuthenticateResponseData*>(response.get());
UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
}
+
+ {
+ Cerr << ">>>>> TInitProducerIdRequestData\n";
+
+ TRequestHeaderData header;
+ header.RequestApiKey = NKafka::EApiKey::INIT_PRODUCER_ID;
+ header.RequestApiVersion = 4;
+ header.CorrelationId = correlationId++;
+ header.ClientId = "test";
+
+ TInitProducerIdRequestData request;
+ request.TransactionTimeoutMs = 5000;
+
+ Write(so, &header, &request);
+
+ auto response = Read(si, &header);
+ auto* msg = dynamic_cast<TInitProducerIdResponseData*>(response.get());
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+
+ {
+ Cerr << ">>>>> TProduceRequestData\n";
+
+ TRequestHeaderData header;
+ header.RequestApiKey = NKafka::EApiKey::PRODUCE;
+ header.RequestApiVersion = 9;
+ header.CorrelationId = correlationId++;
+ header.ClientId = "test-client-random-string";
+
+ TProduceRequestData request;
+ request.TopicData.resize(1);
+ request.TopicData[0].Name = topicName;
+ request.TopicData[0].PartitionData.resize(1);
+ request.TopicData[0].PartitionData[0].Index = 0; // Partition id
+ request.TopicData[0].PartitionData[0].Records.emplace();
+ request.TopicData[0].PartitionData[0].Records->BaseOffset = 3;
+ request.TopicData[0].PartitionData[0].Records->BaseSequence = 5;
+ request.TopicData[0].PartitionData[0].Records->Magic = 2; // Current supported
+ request.TopicData[0].PartitionData[0].Records->Records.resize(1);
+ request.TopicData[0].PartitionData[0].Records->Records[0].Key = "record-key";
+ request.TopicData[0].PartitionData[0].Records->Records[0].Value = "record-value";
+
+ Write(so, &header, &request);
+
+ auto response = Read(si, &header);
+ auto* msg = dynamic_cast<TProduceResponseData*>(response.get());
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, topicName);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0);
+ // UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
}
} \ No newline at end of file
diff --git a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
index 0009a60a21d..95b410b0854 100644
--- a/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.darwin-x86_64.txt
@@ -62,6 +62,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_rl_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
index 61db845d639..c81de2e4f04 100644
--- a/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-aarch64.txt
@@ -63,6 +63,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_rl_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp
diff --git a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
index 61db845d639..c81de2e4f04 100644
--- a/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.linux-x86_64.txt
@@ -63,6 +63,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_rl_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp
diff --git a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
index 0009a60a21d..95b410b0854 100644
--- a/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/persqueue/CMakeLists.windows-x86_64.txt
@@ -62,6 +62,7 @@ target_sources(ydb-core-persqueue PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_database.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_l2_cache.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/pq_rl_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/quota_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/read_balancer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/account_read_quoter.cpp
diff --git a/ydb/core/persqueue/pq_rl_helpers.cpp b/ydb/core/persqueue/pq_rl_helpers.cpp
new file mode 100644
index 00000000000..f3eb3348299
--- /dev/null
+++ b/ydb/core/persqueue/pq_rl_helpers.cpp
@@ -0,0 +1,117 @@
+#include "pq_rl_helpers.h"
+
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/tx/scheme_board/subscriber.h>
+
+namespace NKikimr::NPQ {
+
+TRlHelpers::TRlHelpers(const std::optional<TString>& topicPath, const TRlContext ctx, ui64 blockSize, bool subscribe, const TDuration& waitDuration)
+ : TStreamRequestUnitsCalculator(blockSize)
+ , TopicPath(topicPath)
+ , Ctx(ctx)
+ , Subscribe(subscribe)
+ , WaitDuration(waitDuration)
+{
+ Y_VERIFY_DEBUG(!subscribe || (topicPath && subscribe));
+}
+
+void TRlHelpers::Bootstrap(const TActorId selfId, const NActors::TActorContext& ctx) {
+ if (Subscribe) {
+ SubscriberId = ctx.Register(CreateSchemeBoardSubscriber(selfId, *TopicPath));
+ }
+}
+
+void TRlHelpers::PassAway(const TActorId selfId) {
+ TActorIdentity id(selfId);
+ if (RlActor) {
+ id.Send(RlActor, new TEvents::TEvPoison());
+ }
+ if (SubscriberId) {
+ id.Send(SubscriberId, new TEvents::TEvPoison());
+ }
+}
+
+bool TRlHelpers::IsQuotaInflight() const {
+ return !!RlActor;
+}
+
+bool TRlHelpers::IsQuotaRequired() const {
+ Y_VERIFY(MeteringMode.Defined());
+ return MeteringMode == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS && Ctx;
+}
+
+void TRlHelpers::RequestInitQuota(ui64 amount, const TActorContext& ctx) {
+ RequestQuota(amount, EWakeupTag::RlInit, EWakeupTag::RlInitNoResource, ctx);
+}
+
+void TRlHelpers::RequestDataQuota(ui64 amount, const TActorContext& ctx) {
+ RequestQuota(amount, EWakeupTag::RlAllowed, EWakeupTag::RlNoResource, ctx);
+}
+
+bool TRlHelpers::MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx) {
+ if (IsQuotaInflight()) {
+ return false;
+ }
+
+ RequestQuota(amount, tag, EWakeupTag::RlNoResource, ctx);
+ return true;
+}
+
+void TRlHelpers::RequestQuota(ui64 amount, EWakeupTag success, EWakeupTag timeout, const TActorContext& ctx) {
+ const auto selfId = ctx.SelfID;
+ const auto as = ctx.ActorSystem();
+
+ auto onSendAllowed = [selfId, as, success]() {
+ as->Send(selfId, new TEvents::TEvWakeup(success));
+ };
+
+ auto onSendTimeout = [selfId, as, timeout]() {
+ as->Send(selfId, new TEvents::TEvWakeup(timeout));
+ };
+
+ RlActor = NRpcService::RateLimiterAcquireUseSameMailbox(
+ Ctx.GetPath(), amount, WaitDuration,
+ std::move(onSendAllowed), std::move(onSendTimeout), ctx);
+}
+
+void TRlHelpers::OnWakeup(EWakeupTag tag) {
+ switch (tag) {
+ case EWakeupTag::RlInit:
+ case EWakeupTag::RlInitNoResource:
+ case EWakeupTag::RlAllowed:
+ case EWakeupTag::RlNoResource:
+ RlActor = {};
+ break;
+ default:
+ break;
+ }
+}
+
+const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& TRlHelpers::GetMeteringMode() const {
+ return MeteringMode;
+}
+
+void TRlHelpers::SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode) {
+ MeteringMode = mode;
+}
+
+ui64 TRlHelpers::CalcRuConsumption(ui64 payloadSize) {
+ if (!IsQuotaRequired()) {
+ return 0;
+ }
+
+ return CalcConsumption(payloadSize);
+}
+
+void TRlHelpers::Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev) {
+ auto* result = ev->Get();
+ auto status = result->DescribeSchemeResult.GetStatus();
+ if (status != NKikimrScheme::EStatus::StatusSuccess) {
+ //LOG_ERROR("Describe database '" << Database << "' error: " << status);
+ return;
+ }
+
+ MeteringMode = result->DescribeSchemeResult.GetPathDescription().GetPersQueueGroup().GetPQTabletConfig().GetMeteringMode();
+}
+
+}
diff --git a/ydb/core/persqueue/pq_rl_helpers.h b/ydb/core/persqueue/pq_rl_helpers.h
new file mode 100644
index 00000000000..836808c012a
--- /dev/null
+++ b/ydb/core/persqueue/pq_rl_helpers.h
@@ -0,0 +1,88 @@
+#pragma once
+
+#include <ydb/core/grpc_services/local_rate_limiter.h>
+#include <ydb/core/metering/stream_ru_calculator.h>
+#include <ydb/core/protos/pqconfig.pb.h>
+
+#include <util/datetime/base.h>
+
+namespace NKikimr::NPQ {
+
+class TRlContext {
+public:
+ TRlContext() {
+ }
+
+ TRlContext(const NGRpcService::IRequestCtxBase* reqCtx) {
+ Path.DatabaseName = reqCtx->GetDatabaseName().GetOrElse("");
+ Path.Token = reqCtx->GetSerializedToken();
+
+ if (auto rlPath = reqCtx->GetRlPath()) {
+ Path.ResourcePath = rlPath->ResourcePath;
+ Path.CoordinationNode = rlPath->CoordinationNode;
+ }
+ }
+
+ TRlContext(const TString& coordinationNode,
+ const TString& resourcePath,
+ const TString& databaseName,
+ const TString& token)
+ : Path({coordinationNode, resourcePath, databaseName, token}) {
+ }
+
+ operator bool() const { return !Path.ResourcePath.Empty() && !Path.CoordinationNode.Empty(); };
+ const NRpcService::TRlFullPath GetPath() const { return Path; }
+
+private:
+ NRpcService::TRlFullPath Path;
+};
+
+class TRlHelpers: public NMetering::TStreamRequestUnitsCalculator {
+public:
+ TRlHelpers(const std::optional<TString>& topicPath, const TRlContext ctx, ui64 blockSize, bool subscribe, const TDuration& waitDuration = TDuration::Minutes(1));
+
+protected:
+ enum EWakeupTag: ui64 {
+ RlInit = 0,
+ RlInitNoResource = 1,
+
+ RlAllowed = 2,
+ RlNoResource = 3,
+
+ RecheckAcl = 4,
+ };
+
+ void Bootstrap(const TActorId selfId, const NActors::TActorContext& ctx);
+ void PassAway(const TActorId selfId);
+
+ bool IsQuotaRequired() const;
+ bool IsQuotaInflight() const;
+
+ void RequestInitQuota(ui64 amount, const TActorContext& ctx);
+ void RequestDataQuota(ui64 amount, const TActorContext& ctx);
+
+ bool MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx);
+ void RequestQuota(ui64 amount, EWakeupTag success, EWakeupTag timeout, const TActorContext& ctx);
+
+ void OnWakeup(EWakeupTag tag);
+
+ const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& GetMeteringMode() const;
+ void SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode);
+
+ ui64 CalcRuConsumption(ui64 payloadSize);
+
+ void Handle(TSchemeBoardEvents::TEvNotifyUpdate::TPtr& ev);
+
+private:
+ const std::optional<TString> TopicPath;
+ const TRlContext Ctx;
+ const bool Subscribe;
+ const TDuration WaitDuration;
+
+ TActorId RlActor;
+ TActorId SubscriberId;
+
+ TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode;
+};
+
+}
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index 1f3beeb1cfd..36d13fe7008 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -7,6 +7,7 @@
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/persqueue/events/global.h>
+#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <ydb/public/lib/base/msgbus_status.h>
#include <util/generic/deque.h>
@@ -16,6 +17,19 @@
namespace NKikimr::NPQ {
+#if defined(LOG_PREFIX) || defined(TRACE) || defined(DEBUG) || defined(INFO) || defined(ERROR)
+#error "Already defined LOG_PREFIX or TRACE or DEBUG or INFO or ERROR"
+#endif
+
+
+#define LOG_PREFIX "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") "
+#define TRACE(message) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
+#define DEBUG(message) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
+#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
+#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
+
+static const ui64 WRITE_BLOCK_SIZE = 4_KB;
+
TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const {
return TStringBuilder() << "Success {"
<< " OwnerCookie: " << OwnerCookie
@@ -71,7 +85,10 @@ TString TEvPartitionWriter::TEvWriteResponse::ToString() const {
return out;
}
-class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
+class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRlHelpers {
+
+ static constexpr size_t MAX_QUOTA_INFLIGHT = 3;
+
static void FillHeader(NKikimrClient::TPersQueuePartitionRequest& request,
ui32 partitionId, const TActorId& pipeClient)
{
@@ -136,6 +153,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
for (const auto& [cookie, _] : std::exchange(PendingReserve, {})) {
SendWriteResult(ErrorCode, error, MakeResponse(cookie));
}
+ for (const auto& [cookie, _] : std::exchange(ReceivedReserve, {})) {
+ SendWriteResult(ErrorCode, error, MakeResponse(cookie));
+ }
for (const auto& [cookie, _] : std::exchange(Pending, {})) {
SendWriteResult(ErrorCode, error, MakeResponse(cookie));
}
@@ -236,14 +256,14 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
STATEFN(StateGetMaxSeqNo) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvPersQueue::TEvResponse, HandleMaxSeqNo);
+ HFunc(TEvPersQueue::TEvResponse, HandleMaxSeqNo);
hFunc(TEvPartitionWriter::TEvWriteRequest, HoldPending);
default:
return StateBase(ev);
}
}
- void HandleMaxSeqNo(TEvPersQueue::TEvResponse::TPtr& ev) {
+ void HandleMaxSeqNo(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;
TString error;
@@ -282,7 +302,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
Become(&TThis::StateWork);
if (Pending) {
- ReserveBytes();
+ ReserveBytes(ctx);
}
}
@@ -329,9 +349,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
/// Work
STATEFN(StateWork) {
+ DEBUG("Received event: " << (*ev.Get()).GetTypeName())
switch (ev->GetTypeRewrite()) {
- hFunc(TEvPartitionWriter::TEvWriteRequest, Handle);
+ HFunc(TEvPartitionWriter::TEvWriteRequest, Handle);
hFunc(TEvPersQueue::TEvResponse, Handle);
+ HFunc(TEvents::TEvWakeup, Handle);
default:
return StateBase(ev);
}
@@ -353,12 +375,21 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
Pending.emplace(cookie, std::move(ev->Get()->Record));
}
- void Handle(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) {
+ void Handle(TEvPartitionWriter::TEvWriteRequest::TPtr& ev, const TActorContext& ctx) {
HoldPending(ev);
- ReserveBytes();
+ ReserveBytes(ctx);
}
- void ReserveBytes() {
+ void ReserveBytes(const TActorContext& ctx) {
+ if (IsQuotaInflight()) {
+ return;
+ }
+
+ const bool checkQuota = Opts.CheckRequestUnits() && IsQuotaRequired();
+
+ size_t processed = 0;
+ PendingQuotaAmount = 0;
+
while (Pending) {
auto it = Pending.begin();
@@ -372,11 +403,93 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
cmd.SetSize(it->second.ByteSize());
cmd.SetLastRequest(false);
+ if (checkQuota) {
+ ++processed;
+ PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize());
+ PendingQuota.emplace_back(it->first);
+ }
+
NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
- PendingReserve.emplace(it->first, std::move(it->second));
+ PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), checkQuota });
Pending.erase(it);
+
+ if (checkQuota && processed == MAX_QUOTA_INFLIGHT) {
+ break;
+ }
+ }
+
+ if (checkQuota) {
+ RequestDataQuota(PendingQuotaAmount, ctx);
+ }
+ }
+
+ void EnqueueReservedAndProcess(ui64 cookie) {
+ Y_VERIFY(!PendingReserve.empty());
+ auto it = PendingReserve.begin();
+
+ Y_VERIFY(it->first == cookie);
+
+ ReceivedReserve.emplace(it->first, std::move(it->second));
+
+ ProcessQuota();
+ }
+
+ void ProcessQuota() {
+ auto rit = ReceivedReserve.begin();
+ auto qit = ReceivedQuota.begin();
+
+ while(rit != ReceivedReserve.end() && qit != ReceivedQuota.end()) {
+ auto& request = rit->second;
+ const auto cookie = rit->first;
+ TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted);
+ if (!request.QuotaChecked || request.QuotaAccepted) {
+ // A situation when a quota was not requested or was received while waiting for a reserve
+ Write(cookie, std::move(request.Request));
+ ReceivedReserve.erase(rit++);
+ continue;
+ }
+
+ if (cookie != *qit) {
+ ERROR("The order of reservation and quota requests should be the same. ReserveCookie=" << cookie << ", QuotaCookie=" << *qit);
+ Disconnected(TEvPartitionWriter::TEvWriteResponse::InternalError);
+ return;
+ }
+
+ Write(cookie, std::move(request.Request));
+ ReceivedReserve.erase(rit++);
+ ++qit;
}
+
+ while(rit != ReceivedReserve.end()) {
+ auto& request = rit->second;
+ const auto cookie = rit->first;
+ TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted);
+ if (request.QuotaChecked && !request.QuotaAccepted) {
+ break;
+ }
+
+ // A situation when a quota was not requested or was received while waiting for a reserve
+ Write(cookie, std::move(request.Request));
+ ReceivedReserve.erase(rit++);
+ }
+
+ while(qit != ReceivedQuota.end()) {
+ auto cookie = *qit;
+ TRACE("processing quota for request cookie=" << cookie);
+ auto pit = PendingReserve.find(cookie);
+
+ if (pit == PendingReserve.end()) {
+ ERROR("The received quota does not apply to any request. Cookie=" << *qit);
+ Disconnected(TEvPartitionWriter::TEvWriteResponse::InternalError);
+ return;
+ }
+
+ pit->second.QuotaAccepted = true;
+ ++qit;
+ }
+
+ ReceivedQuota.clear();
}
void Write(ui64 cookie) {
@@ -386,8 +499,14 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
Y_VERIFY(it->first == cookie);
Y_VERIFY(PendingWrite.empty() || PendingWrite.back() < cookie);
+ Write(cookie, std::move(it->second.Request));
+
+ PendingReserve.erase(it);
+ }
+
+ void Write(ui64 cookie, NKikimrClient::TPersQueueRequest&& req) {
auto ev = MakeHolder<TEvPersQueue::TEvRequest>();
- ev->Record = std::move(it->second);
+ ev->Record = req;
auto& request = *ev->Record.MutablePartitionRequest();
request.SetMessageNo(MessageNo++);
@@ -398,7 +517,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
PendingWrite.emplace_back(cookie);
- PendingReserve.erase(it);
}
void Handle(TEvPersQueue::TEvResponse::TPtr& ev) {
@@ -424,7 +542,19 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
}
WriteAccepted(cookie);
- Write(cookie);
+
+ Y_VERIFY(!PendingReserve.empty());
+ auto it = PendingReserve.begin();
+ auto& holder = it->second;
+
+ if ((holder.QuotaChecked && !holder.QuotaAccepted)|| !ReceivedReserve.empty()) {
+ // There may be two situations:
+ // - a quota has been requested, and the quota has not been received yet
+ // - the quota was not requested, for example, due to a change in the metering option, but the previous quota requests have not yet been processed
+ EnqueueReservedAndProcess(cookie);
+ } else {
+ Write(cookie);
+ }
} else {
if (PendingWrite.empty()) {
return WriteResult(TEvPartitionWriter::TEvWriteResponse::InternalError, "Unexpected Write response", std::move(record));
@@ -442,13 +572,13 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
}
}
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
auto msg = ev->Get();
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TEvClientConnected Status " << msg->Status << ", TabletId: " << msg->TabletId << ", NodeId " << msg->ServerId.NodeId() << ", Generation: " << msg->Generation);
+ DEBUG("TEvClientConnected Status " << msg->Status << ", TabletId: " << msg->TabletId << ", NodeId " << msg->ServerId.NodeId() << ", Generation: " << msg->Generation);
Y_VERIFY_DEBUG(msg->TabletId == TabletId);
if (msg->Status != NKikimrProto::OK) {
- LOG_ERROR_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with status " << ev->Get()->Status);
+ ERROR("received TEvClientConnected with status " << ev->Get()->Status);
Disconnected(TEvPartitionWriter::TEvWriteResponse::InternalError);
return;
}
@@ -459,22 +589,22 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
{
if(*ExpectedGeneration != msg->Generation)
{
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong generation. Expected: " << *ExpectedGeneration << ", received " << msg->Generation);
+ INFO("received TEvClientConnected with wrong generation. Expected: " << *ExpectedGeneration << ", received " << msg->Generation);
Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionNotLocal);
PassAway();
}
if (NActors::TActivationContext::ActorSystem()->NodeId != msg->ServerId.NodeId())
{
- LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientConnected with wrong NodeId. Expected: " << NActors::TActivationContext::ActorSystem()->NodeId << ", received " << msg->ServerId.NodeId());
+ INFO("received TEvClientConnected with wrong NodeId. Expected: " << NActors::TActivationContext::ActorSystem()->NodeId << ", received " << msg->ServerId.NodeId());
Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionNotLocal);
PassAway();
}
}
}
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
if (ev->Get()->TabletId == TabletId) {
- LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "TPartitionWriter " << TabletId << " (partition=" << PartitionId << ") received TEvClientDestroyed");
+ DEBUG("received TEvClientDestroyed");
Disconnected(TEvPartitionWriter::TEvWriteResponse::PartitionDisconnected);
}
}
@@ -487,6 +617,29 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
TActorBootstrapped::PassAway();
}
+ void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
+ const auto tag = static_cast<EWakeupTag>(ev->Get()->Tag);
+ OnWakeup(tag);
+ switch (tag) {
+ case EWakeupTag::RlAllowed:
+ ReceivedQuota.insert(ReceivedQuota.end(), PendingQuota.begin(), PendingQuota.end());
+ PendingQuota.clear();
+
+ ProcessQuota();
+
+ break;
+
+ case EWakeupTag::RlNoResource:
+ // Re-requesting the quota. We do this until we get a quota.
+ // We do not request a quota with a long waiting time because the writer may already be a destroyer, and the quota will still be waiting to be received.
+ RequestDataQuota(PendingQuotaAmount, ctx);
+ break;
+
+ default:
+ Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
+ }
+ }
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::PQ_PARTITION_WRITER_ACTOR;
@@ -494,18 +647,23 @@ public:
explicit TPartitionWriter(
const TActorId& client,
+ const std::optional<TString>& topicPath,
ui64 tabletId,
ui32 partitionId,
- TMaybe<ui32> expectedGeneration,
+ const std::optional<ui32> expectedGeneration,
const TString& sourceId,
const TPartitionWriterOpts& opts)
- : Client(client)
+ : TRlHelpers(topicPath, opts.RlCtx, WRITE_BLOCK_SIZE, !!opts.RlCtx)
+ , Client(client)
, TabletId(tabletId)
, PartitionId(partitionId)
, ExpectedGeneration(expectedGeneration)
, SourceId(sourceId)
, Opts(opts)
{
+ if (Opts.MeteringMode) {
+ SetMeteringMode(*Opts.MeteringMode);
+ }
}
void Bootstrap() {
@@ -524,8 +682,9 @@ public:
STATEFN(StateBase) {
switch (ev->GetTypeRewrite()) {
- HFunc(TEvTabletPipe::TEvClientConnected, Handle);
- HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(TSchemeBoardEvents::TEvNotifyUpdate, TRlHelpers::Handle);
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
@@ -541,7 +700,7 @@ private:
const TActorId Client;
const ui64 TabletId;
const ui32 PartitionId;
- const TMaybe<ui32> ExpectedGeneration;
+ const std::optional<ui32> ExpectedGeneration;
const TString SourceId;
const TPartitionWriterOpts Opts;
@@ -550,15 +709,40 @@ private:
bool Registered = false;
ui64 MessageNo = 0;
+ struct RequestHolder {
+ NKikimrClient::TPersQueueRequest Request;
+ bool QuotaChecked;
+ bool QuotaAccepted;
+
+ RequestHolder(NKikimrClient::TPersQueueRequest&& request, bool quotaChecked)
+ : Request(request)
+ , QuotaChecked(quotaChecked)
+ , QuotaAccepted(false) {
+ }
+ };
+
TMap<ui64, NKikimrClient::TPersQueueRequest> Pending;
- TMap<ui64, NKikimrClient::TPersQueueRequest> PendingReserve;
+ TMap<ui64, RequestHolder> PendingReserve;
+ TMap<ui64, RequestHolder> ReceivedReserve;
+ TDeque<ui64> PendingQuota;
+ ui64 PendingQuotaAmount;
+ TDeque<ui64> ReceivedQuota;
TDeque<ui64> PendingWrite;
TEvPartitionWriter::TEvWriteResponse::EErrors ErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrors::InternalError;
}; // TPartitionWriter
-IActor* CreatePartitionWriter(const TActorId& client, ui64 tabletId, ui32 partitionId, TMaybe<ui32> expectedGeneration, const TString& sourceId, const TPartitionWriterOpts& opts) {
- return new TPartitionWriter(client, tabletId, partitionId, expectedGeneration, sourceId, opts);
+IActor* CreatePartitionWriter(const TActorId& client, const std::optional<TString>& topicPath, ui64 tabletId, ui32 partitionId,
+ const std::optional<ui32> expectedGeneration, const TString& sourceId,
+ const TPartitionWriterOpts& opts) {
+ return new TPartitionWriter(client, topicPath, tabletId, partitionId, expectedGeneration, sourceId, opts);
}
+
+#undef LOG_PREFIX
+#undef TRACE
+#undef DEBUG
+#undef INFO
+#undef ERROR
+
}
diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h
index 6a1a735ab94..fb2f8dd2602 100644
--- a/ydb/core/persqueue/writer/writer.h
+++ b/ydb/core/persqueue/writer/writer.h
@@ -2,8 +2,10 @@
#include <ydb/core/base/defs.h>
#include <ydb/core/base/events.h>
+#include <ydb/core/grpc_services/local_rate_limiter.h>
#include <ydb/core/protos/msgbus.pb.h>
#include <ydb/core/protos/msgbus_pq.pb.h>
+#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <variant>
@@ -81,7 +83,8 @@ struct TEvPartitionWriter {
// Partition located on other node.
PartitionNotLocal,
// Partitition restarted.
- PartitionDisconnected
+ PartitionDisconnected,
+ Overload
};
struct TSuccess {
@@ -124,11 +127,17 @@ struct TPartitionWriterOpts {
bool AutoRegister = false;
bool UseDeduplication = true;
+ std::optional<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode;
+ TRlContext RlCtx;
+
+ bool CheckRequestUnits() const { return RlCtx; }
+
TPartitionWriterOpts& WithCheckState(bool value) { CheckState = value; return *this; }
TPartitionWriterOpts& WithAutoRegister(bool value) { AutoRegister = value; return *this; }
TPartitionWriterOpts& WithDeduplication(bool value) { UseDeduplication = value; return *this; }
+ TPartitionWriterOpts& WithCheckRequestUnits(const NKikimrPQ::TPQTabletConfig::EMeteringMode meteringMode , const TRlContext& rlCtx) { MeteringMode = meteringMode; RlCtx = rlCtx; return *this; }
};
-IActor* CreatePartitionWriter(const TActorId& client, ui64 tabletId, ui32 partitionId, TMaybe<ui32> expectedGeneration, const TString& sourceId,
+IActor* CreatePartitionWriter(const TActorId& client, const std::optional<TString>& topicPath, ui64 tabletId, ui32 partitionId, const std::optional<ui32> expectedGeneration, const TString& sourceId,
const TPartitionWriterOpts& opts = {});
}
diff --git a/ydb/core/persqueue/ya.make b/ydb/core/persqueue/ya.make
index 1e96ec8137a..eeab4c328c8 100644
--- a/ydb/core/persqueue/ya.make
+++ b/ydb/core/persqueue/ya.make
@@ -20,6 +20,7 @@ SRCS(
pq_database.cpp
pq_impl.cpp
pq_l2_cache.cpp
+ pq_rl_helpers.cpp
quota_tracker.cpp
read_balancer.cpp
account_read_quoter.cpp
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 926ed635dae..89341a2fe9b 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -281,7 +281,7 @@ public:
auto opts = TPartitionWriterOpts()
.WithCheckState(true)
.WithAutoRegister(true);
- Writer = RegisterWithSameMailbox(CreatePartitionWriter(SelfId(), ShardId, PartitionId, {}, SourceId, opts));
+ Writer = RegisterWithSameMailbox(CreatePartitionWriter(SelfId(), {}, ShardId, PartitionId, {}, SourceId, opts));
Become(&TThis::StateInit);
}
diff --git a/ydb/core/tx/scheme_board/subscriber.cpp b/ydb/core/tx/scheme_board/subscriber.cpp
index b96bb08776a..15a0cf2e1c2 100644
--- a/ydb/core/tx/scheme_board/subscriber.cpp
+++ b/ydb/core/tx/scheme_board/subscriber.cpp
@@ -3,6 +3,7 @@
#include "monitorable_actor.h"
#include "subscriber.h"
+#include <ydb/core/base/appdata.h>
#include <ydb/core/base/statestorage_impl.h>
#include <ydb/core/base/tabletid.h>
#include <ydb/core/protos/scheme_board.pb.h>
@@ -1054,6 +1055,18 @@ public:
IActor* CreateSchemeBoardSubscriber(
const TActorId& owner,
+ const TString& path
+) {
+ auto& domains = AppData()->DomainsInfo->Domains;
+ Y_VERIFY(!domains.empty());
+ auto& domain = domains.begin()->second;
+ ui32 schemeBoardGroup = domain->DefaultSchemeBoardGroup;
+ ui64 domainOwnerId = domain->SchemeRoot;
+ return CreateSchemeBoardSubscriber(owner, path, schemeBoardGroup, domainOwnerId);
+}
+
+IActor* CreateSchemeBoardSubscriber(
+ const TActorId& owner,
const TString& path,
const ui64 stateStorageGroup,
const ui64 domainOwnerId
diff --git a/ydb/core/tx/scheme_board/subscriber.h b/ydb/core/tx/scheme_board/subscriber.h
index 8e6cd2ffa46..e531dd8d123 100644
--- a/ydb/core/tx/scheme_board/subscriber.h
+++ b/ydb/core/tx/scheme_board/subscriber.h
@@ -10,6 +10,11 @@ namespace NKikimr {
IActor* CreateSchemeBoardSubscriber(
const TActorId& owner,
+ const TString& path
+);
+
+IActor* CreateSchemeBoardSubscriber(
+ const TActorId& owner,
const TString& path,
const ui64 stateStorageGroup,
const ui64 domainOwnerId
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index cf0b456e8db..c200de3d450 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -9,10 +9,10 @@
#include <ydb/core/grpc_services/rpc_deferrable.h>
#include <ydb/core/grpc_services/rpc_scheme_base.h>
#include <ydb/core/persqueue/partition.h>
+#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <ydb/core/persqueue/write_meta.h>
#include <ydb/public/api/protos/ydb_topic.pb.h>
-#include <ydb/services/lib/actors/pq_rl_helpers.h>
#include <ydb/services/lib/actors/pq_schema_actor.h>
#include <ydb/services/lib/sharding/sharding.h>
#include <ydb/services/persqueue_v1/actors/persqueue_utils.h>
@@ -1312,7 +1312,7 @@ namespace NKikimr::NDataStreams::V1 {
//-----------------------------------------------------------------------------------
class TGetRecordsActor : public TPQGrpcSchemaBase<TGetRecordsActor, TEvDataStreamsGetRecordsRequest>
- , private TRlHelpers
+ , private NPQ::TRlHelpers
, public TCdcStreamCompatible
{
using TBase = TPQGrpcSchemaBase<TGetRecordsActor, TEvDataStreamsGetRecordsRequest>;
@@ -1352,7 +1352,7 @@ namespace NKikimr::NDataStreams::V1 {
: TBase(request, TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).IsValid()
? TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).GetStreamName()
: "undefined")
- , TRlHelpers(request, 8_KB, TDuration::Seconds(1))
+ , TRlHelpers({}, request, 8_KB, false, TDuration::Seconds(1))
, ShardIterator{GetRequest<TProtoRequest>(request)->shard_iterator()}
, StreamName{ShardIterator.IsValid() ? ShardIterator.GetStreamName() : "undefined"}
, TabletId{0}
diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h
index 4e650800bcc..fa0e4fb7a48 100644
--- a/ydb/services/datastreams/put_records_actor.h
+++ b/ydb/services/datastreams/put_records_actor.h
@@ -5,11 +5,11 @@
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/utils.h>
+#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <ydb/core/persqueue/write_meta.h>
#include <ydb/core/protos/msgbus_pq.pb.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
-#include <ydb/services/lib/actors/pq_rl_helpers.h>
#include <ydb/services/lib/actors/pq_schema_actor.h>
#include <ydb/services/lib/sharding/sharding.h>
@@ -213,7 +213,7 @@ namespace NKikimr::NDataStreams::V1 {
template<class TDerived, class TProto>
class TPutRecordsActorBase
: public NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto>
- , private NGRpcProxy::V1::TRlHelpers
+ , private NPQ::TRlHelpers
{
using TBase = NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto>;
@@ -261,7 +261,7 @@ namespace NKikimr::NDataStreams::V1 {
template<class TDerived, class TProto>
TPutRecordsActorBase<TDerived, TProto>::TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request)
: TBase(request, dynamic_cast<const typename TProto::TRequest*>(request->GetRequest())->stream_name())
- , TRlHelpers(request, 4_KB, TDuration::Seconds(1))
+ , TRlHelpers({}, request, 4_KB, false, TDuration::Seconds(1))
, Ip(request->GetPeerName())
{
Y_ENSURE(request);
diff --git a/ydb/services/lib/actors/CMakeLists.darwin-x86_64.txt b/ydb/services/lib/actors/CMakeLists.darwin-x86_64.txt
index 43f92f2ff06..887af75044f 100644
--- a/ydb/services/lib/actors/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/lib/actors/CMakeLists.darwin-x86_64.txt
@@ -27,5 +27,4 @@ target_link_libraries(services-lib-actors PUBLIC
)
target_sources(services-lib-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_schema_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_rl_helpers.cpp
)
diff --git a/ydb/services/lib/actors/CMakeLists.linux-aarch64.txt b/ydb/services/lib/actors/CMakeLists.linux-aarch64.txt
index 0c6cdd98c9a..c0ad2ebb2b5 100644
--- a/ydb/services/lib/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/lib/actors/CMakeLists.linux-aarch64.txt
@@ -28,5 +28,4 @@ target_link_libraries(services-lib-actors PUBLIC
)
target_sources(services-lib-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_schema_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_rl_helpers.cpp
)
diff --git a/ydb/services/lib/actors/CMakeLists.linux-x86_64.txt b/ydb/services/lib/actors/CMakeLists.linux-x86_64.txt
index 0c6cdd98c9a..c0ad2ebb2b5 100644
--- a/ydb/services/lib/actors/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/lib/actors/CMakeLists.linux-x86_64.txt
@@ -28,5 +28,4 @@ target_link_libraries(services-lib-actors PUBLIC
)
target_sources(services-lib-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_schema_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_rl_helpers.cpp
)
diff --git a/ydb/services/lib/actors/CMakeLists.windows-x86_64.txt b/ydb/services/lib/actors/CMakeLists.windows-x86_64.txt
index 43f92f2ff06..887af75044f 100644
--- a/ydb/services/lib/actors/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/lib/actors/CMakeLists.windows-x86_64.txt
@@ -27,5 +27,4 @@ target_link_libraries(services-lib-actors PUBLIC
)
target_sources(services-lib-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_schema_actor.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/lib/actors/pq_rl_helpers.cpp
)
diff --git a/ydb/services/lib/actors/pq_rl_helpers.cpp b/ydb/services/lib/actors/pq_rl_helpers.cpp
deleted file mode 100644
index 8a7e8bf5abf..00000000000
--- a/ydb/services/lib/actors/pq_rl_helpers.cpp
+++ /dev/null
@@ -1,70 +0,0 @@
-#include "pq_rl_helpers.h"
-
-#include <ydb/core/grpc_services/base/base.h>
-
-namespace NKikimr::NGRpcProxy::V1 {
-
-TRlHelpers::TRlHelpers(NGRpcService::IRequestCtxBase* reqCtx, ui64 blockSize, const TDuration& waitDuration)
- : TStreamRequestUnitsCalculator(blockSize)
- , Request(reqCtx)
- , WaitDuration(waitDuration)
-{
- Y_VERIFY(Request);
-}
-
-bool TRlHelpers::IsQuotaRequired() const {
- Y_VERIFY(MeteringMode.Defined());
- return MeteringMode == NKikimrPQ::TPQTabletConfig::METERING_MODE_REQUEST_UNITS && Request->GetRlPath();
-}
-
-bool TRlHelpers::MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx) {
- if (RlActor) {
- return false;
- }
-
- const auto selfId = ctx.SelfID;
- const auto as = ctx.ActorSystem();
-
- auto onSendAllowed = [selfId, as, tag]() {
- as->Send(selfId, new TEvents::TEvWakeup(tag));
- };
-
- auto onSendTimeout = [selfId, as]() {
- as->Send(selfId, new TEvents::TEvWakeup(EWakeupTag::RlNoResource));
- };
-
- RlActor = NRpcService::RateLimiterAcquireUseSameMailbox(
- *Request, amount, WaitDuration,
- std::move(onSendAllowed), std::move(onSendTimeout), ctx);
- return true;
-}
-
-void TRlHelpers::OnWakeup(EWakeupTag tag) {
- switch (tag) {
- case EWakeupTag::RlInit:
- case EWakeupTag::RlAllowed:
- case EWakeupTag::RlNoResource:
- RlActor = {};
- break;
- default:
- break;
- }
-}
-
-const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& TRlHelpers::GetMeteringMode() const {
- return MeteringMode;
-}
-
-void TRlHelpers::SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode) {
- MeteringMode = mode;
-}
-
-ui64 TRlHelpers::CalcRuConsumption(ui64 payloadSize) {
- if (!IsQuotaRequired()) {
- return 0;
- }
-
- return CalcConsumption(payloadSize);
-}
-
-}
diff --git a/ydb/services/lib/actors/pq_rl_helpers.h b/ydb/services/lib/actors/pq_rl_helpers.h
deleted file mode 100644
index cf002033998..00000000000
--- a/ydb/services/lib/actors/pq_rl_helpers.h
+++ /dev/null
@@ -1,40 +0,0 @@
-#pragma once
-
-#include <ydb/core/grpc_services/local_rate_limiter.h>
-#include <ydb/core/metering/stream_ru_calculator.h>
-#include <ydb/core/protos/pqconfig.pb.h>
-
-#include <util/datetime/base.h>
-
-namespace NKikimr::NGRpcProxy::V1 {
-
-class TRlHelpers: public NMetering::TStreamRequestUnitsCalculator {
-public:
- explicit TRlHelpers(NGRpcService::IRequestCtxBase* reqCtx, ui64 blockSize, const TDuration& waitDuration);
-
-protected:
- enum EWakeupTag: ui64 {
- RlInit = 0,
- RlAllowed = 1,
- RlNoResource = 2,
- RecheckAcl = 3,
- };
-
- bool IsQuotaRequired() const;
- bool MaybeRequestQuota(ui64 amount, EWakeupTag tag, const TActorContext& ctx);
- void OnWakeup(EWakeupTag tag);
-
- const TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode>& GetMeteringMode() const;
- void SetMeteringMode(NKikimrPQ::TPQTabletConfig::EMeteringMode mode);
-
- ui64 CalcRuConsumption(ui64 payloadSize);
-
-private:
- NGRpcService::IRequestCtxBase* const Request;
- const TDuration WaitDuration;
-
- TActorId RlActor;
- TMaybe<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode;
-};
-
-}
diff --git a/ydb/services/lib/actors/ya.make b/ydb/services/lib/actors/ya.make
index 19e4db930b9..af0977f0001 100644
--- a/ydb/services/lib/actors/ya.make
+++ b/ydb/services/lib/actors/ya.make
@@ -2,7 +2,6 @@ LIBRARY()
SRCS(
pq_schema_actor.cpp
- pq_rl_helpers.cpp
)
PEERDIR(
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.h b/ydb/services/persqueue_v1/actors/read_session_actor.h
index a4c8dd4078c..d13fef3e6bb 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.h
@@ -7,7 +7,7 @@
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
#include <ydb/core/persqueue/events/global.h>
-#include <ydb/services/lib/actors/pq_rl_helpers.h>
+#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>
@@ -121,7 +121,7 @@ struct TFormedReadResponse: public TSimpleRefCount<TFormedReadResponse<TServerMe
template <bool UseMigrationProtocol> // Migration protocol is "pqv1"
class TReadSessionActor
: public TActorBootstrapped<TReadSessionActor<UseMigrationProtocol>>
- , private TRlHelpers
+ , private NPQ::TRlHelpers
{
using TClientMessage = typename std::conditional_t<UseMigrationProtocol,
PersQueue::V1::MigrationStreamingReadClientMessage,
diff --git a/ydb/services/persqueue_v1/actors/read_session_actor.ipp b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
index ceeaa0221cb..9f99af51a86 100644
--- a/ydb/services/persqueue_v1/actors/read_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/read_session_actor.ipp
@@ -32,7 +32,7 @@ TReadSessionActor<UseMigrationProtocol>::TReadSessionActor(
TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
const TMaybe<TString> clientDC,
const NPersQueue::TTopicsListController& topicsHandler)
- : TRlHelpers(request, READ_BLOCK_SIZE, TDuration::Minutes(1))
+ : TRlHelpers({}, request, READ_BLOCK_SIZE, false)
, Request(request)
, ClientDC(clientDC.GetOrElse("other"))
, StartTimestamp(TInstant::Now())
@@ -1909,6 +1909,7 @@ void TReadSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& e
break;
case EWakeupTag::RlNoResource:
+ case EWakeupTag::RlInitNoResource:
if (PendingQuota) {
Y_VERIFY(MaybeRequestQuota(PendingQuota->RequiredQuota, EWakeupTag::RlAllowed, ctx));
} else {
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index c7ea6e863c5..bc679a850a1 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -10,10 +10,10 @@
#include <ydb/core/grpc_services/grpc_request_proxy.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/persqueue/events/global.h>
+#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
-#include <ydb/services/lib/actors/pq_rl_helpers.h>
#include <ydb/services/metadata/service.h>
@@ -26,7 +26,7 @@ inline TActorId GetPQWriteServiceActorID() {
template<bool UseMigrationProtocol>
class TWriteSessionActor
: public NActors::TActorBootstrapped<TWriteSessionActor<UseMigrationProtocol>>
- , private TRlHelpers
+ , private NPQ::TRlHelpers
{
using TSelf = TWriteSessionActor<UseMigrationProtocol>;
using TClientMessage = std::conditional_t<UseMigrationProtocol, PersQueue::V1::StreamingWriteClientMessage,
@@ -228,7 +228,7 @@ private:
NPersQueue::TTopicConverterPtr FullConverter;
ui32 Partition;
ui32 PreferedPartition;
- TMaybe<ui32> ExpectedGeneration;
+ std::optional<ui32> ExpectedGeneration;
bool PartitionFound = false;
// 'SourceId' is called 'MessageGroupId' since gRPC data plane API v1
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 0aef6cfbf41..0b06d0be568 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -177,7 +177,7 @@ TWriteSessionActor<UseMigrationProtocol>::TWriteSessionActor(
TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, const TMaybe<TString> clientDC,
const NPersQueue::TTopicsListController& topicsController
)
- : TRlHelpers(request, WRITE_BLOCK_SIZE, TDuration::Minutes(1))
+ : TRlHelpers({}, request, WRITE_BLOCK_SIZE, false)
, Request(request)
, State(ES_CREATED)
, SchemeCache(schemeCache)
@@ -1034,7 +1034,7 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti
}
Writer = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter(
- ctx.SelfID, PartitionTabletId, Partition, ExpectedGeneration,
+ ctx.SelfID, {}, PartitionTabletId, Partition, ExpectedGeneration,
SourceId, TPartitionWriterOpts().WithDeduplication(UseDeduplication)
));
State = ES_WAIT_WRITER_INIT;
@@ -1771,6 +1771,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr&
break;
case EWakeupTag::RlNoResource:
+ case EWakeupTag::RlInitNoResource:
if (PendingQuotaRequest) {
Y_VERIFY(MaybeRequestQuota(PendingQuotaRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx));
} else {