aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsavnik <savnik@yandex-team.com>2023-08-09 13:31:44 +0300
committersavnik <savnik@yandex-team.com>2023-08-09 16:30:30 +0300
commitdb84ab468d808d350c525103bec68c4958f9114b (patch)
tree7b865bfc4462b18bb6f232a233e78718f7fa2768
parent4700a34cf159bab16151bc9ae30c259884bbf6a3 (diff)
downloadydb-db84ab468d808d350c525103bec68c4958f9114b.tar.gz
Kafka auth
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp10
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp127
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h77
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp29
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h10
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.cpp250
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.h205
-rw-r--r--ydb/core/kafka_proxy/ya.make1
12 files changed, 711 insertions, 2 deletions
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
index 4fe0387ca6..732c093b32 100644
--- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
@@ -24,6 +24,7 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
index 2acbd612c9..dc2faaaf69 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
@@ -25,6 +25,7 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
index 2acbd612c9..dc2faaaf69 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
@@ -25,6 +25,7 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
index 4fe0387ca6..732c093b32 100644
--- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
@@ -24,6 +24,7 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
index 4acdd053b2..220e04aa52 100644
--- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
@@ -8,7 +8,7 @@ NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 c
TApiVersionsResponseData::TPtr GetApiVersions() {
TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>();
- response->ApiKeys.resize(4);
+ response->ApiKeys.resize(6);
response->ApiKeys[0].ApiKey = PRODUCE;
response->ApiKeys[0].MinVersion = 3; // From version 3 record batch format is 2. Supported only 2th batch format.
@@ -26,6 +26,14 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
response->ApiKeys[3].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Min;
response->ApiKeys[3].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Max;
+ response->ApiKeys[4].ApiKey = SASL_HANDSHAKE;
+ response->ApiKeys[4].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Min;//savnik: check
+ response->ApiKeys[4].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Max;
+
+ response->ApiKeys[5].ApiKey = SASL_AUTHENTICATE;
+ response->ApiKeys[5].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Min;
+ response->ApiKeys[5].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Max;
+
return response;
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
new file mode 100644
index 0000000000..c0f39b7207
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
@@ -0,0 +1,127 @@
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/core/base/ticket_parser.h>
+#include "kafka_sasl_auth_actor.h"
+
+namespace NKafka {
+
+NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const TSaslHandshakeRequestData* message) {
+ return new TKafkaSaslAuthActor(parent, correlationId, message);
+}
+
+NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message) {
+ return new TKafkaSaslAuthActor(parent, correlationId, address, message);
+}
+
+void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) {
+ if (ReqType) {
+ Send(Parent, new TEvKafka::TEvResponse(CorrelationId, Handshake()));
+ Die(ctx);
+ } else {
+ Become(&TKafkaSaslAuthActor::StateAuth);
+ PlainAuth(ctx);
+ }
+}
+
+void TKafkaSaslAuthActor::PlainAuth(const NActors::TActorContext& ctx) {
+ TKafkaBytes auth_bytes_opt = AuthenticateRequestData->AuthBytes;
+ TString username;
+ TString password;
+ TString database;
+
+ if (auth_bytes_opt.has_value()) {
+ TKafkaRawBytes auth_bytes = auth_bytes_opt.value();
+ TString auth_data(auth_bytes.data(), auth_bytes.size());
+
+ size_t first_null_pos = auth_data.find('\0');
+ if(first_null_pos != TString::npos && first_null_pos < auth_data.length() - 1) {
+ size_t second_null_pos = auth_data.find('\0', first_null_pos + 1);
+ if(second_null_pos != TString::npos) {
+ auto loginAndDatabase = auth_data.substr(first_null_pos + 1, second_null_pos - first_null_pos - 1);
+ std::tie(username, database) = ParseLoginAndDatabase(loginAndDatabase);
+ password = auth_data.substr(second_null_pos + 1);
+ }
+ }
+ }
+
+ Ydb::Auth::LoginRequest request;
+ request.set_user(username);
+ request.set_password(password);
+ TActorSystem* actorSystem = ctx.ActorSystem();
+ using TRpcEv = NKikimr::NGRpcService::TGRpcRequestWrapperNoAuth<NKikimr::NGRpcService::TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>;
+ auto rpcFuture = NKikimr::NRpcService::DoLocalRpc<TRpcEv>(std::move(request), database, {}, actorSystem);
+ rpcFuture.Subscribe([database, actorSystem, selfId = SelfId()](const NThreading::TFuture<Ydb::Auth::LoginResponse>& future) {
+ auto& response = future.GetValueSync();
+ if (response.operation().status() == Ydb::StatusIds::SUCCESS) {
+ auto tokenReady = std::make_unique<TEvPrivate::TEvTokenReady>();
+ response.operation().result().UnpackTo(&(tokenReady->LoginResult));
+ tokenReady->Database = database;
+ actorSystem->Send(selfId, tokenReady.release());
+ } else {
+ auto authFailed = std::make_unique<TEvPrivate::TEvAuthFailed>();
+ if (response.operation().issues_size() > 0) {
+ authFailed->ErrorMessage = response.operation().issues(0).message();
+ } else {
+ authFailed->ErrorMessage = Ydb::StatusIds_StatusCode_Name(response.operation().status());
+ }
+ actorSystem->Send(selfId, authFailed.release());
+ }
+ });
+}
+
+void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx) {
+ TSaslAuthenticateResponseData::TPtr response = std::make_shared<TSaslAuthenticateResponseData>();
+ response->AuthBytes = TKafkaRawBytes("", "");
+ if (ev->Get()->Error) {
+ response->ErrorCode = 1;
+ response->ErrorMessage = ev->Get()->Error.Message;
+ //savnik: close conn
+ } else {
+ response->ErrorCode = 0;
+ response->ErrorMessage = "";
+ Send(Parent, new TEvKafka::TEvAuthSuccess(ev->Get()->Token));
+ }
+ Send(Parent, new TEvKafka::TEvResponse(CorrelationId, response));
+ Die(ctx);
+}
+
+void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext&) {
+ Send(NKikimr::MakeTicketParserID(), new NKikimr::TEvTicketParser::TEvAuthorizeTicket({
+ .Database = ev->Get()->Database,
+ .Ticket = ev->Get()->LoginResult.token(),
+ .PeerName = TStringBuilder() << Address,
+ }));
+}
+
+void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx) {
+ TSaslAuthenticateResponseData::TPtr saslResponse = std::make_shared<TSaslAuthenticateResponseData>();
+ saslResponse->AuthBytes = TKafkaRawBytes("", "");
+ saslResponse->ErrorCode = 1;
+ saslResponse->ErrorMessage = ev->Get()->ErrorMessage;
+ Send(Parent, new TEvKafka::TEvResponse(CorrelationId, saslResponse));
+ //savnik: close conn
+ Die(ctx);
+}
+
+std::pair<TString, TString> TKafkaSaslAuthActor::ParseLoginAndDatabase(const TString& str) {
+ size_t pos = str.rfind('@');
+ if (pos == std::string::npos) {
+ return {str, ""}; //savnik
+ }
+ return {str.substr(0, pos), str.substr(pos + 1)};
+}
+
+TSaslHandshakeResponseData::TPtr TKafkaSaslAuthActor::Handshake() {
+ TSaslHandshakeResponseData::TPtr response = std::make_shared<TSaslHandshakeResponseData>();
+ if (HandshakeRequestData->Mechanism == "PLAIN") {
+ response->ErrorCode = 0;
+ response->Mechanisms.push_back("PLAIN");
+ } else {
+ response->ErrorCode = 1;
+ }
+
+ return response;
+}
+
+} \ No newline at end of file
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
new file mode 100644
index 0000000000..fab9482cae
--- /dev/null
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
@@ -0,0 +1,77 @@
+#include "ydb/library/aclib/aclib.h"
+#include <ydb/core/base/ticket_parser.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
+
+#include "../kafka_events.h"
+
+namespace NKafka {
+
+class TKafkaSaslAuthActor: public NActors::TActorBootstrapped<TKafkaSaslAuthActor> {
+
+struct TEvPrivate {
+ enum EEv {
+ EvTokenReady = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvAuthFailed,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
+
+ struct TEvTokenReady : TEventLocal<TEvTokenReady, EvTokenReady> {
+ Ydb::Auth::LoginResult LoginResult;
+ TString Database;
+ };
+
+ struct TEvAuthFailed : TEventLocal<TEvAuthFailed, EvAuthFailed> {
+ TString ErrorMessage;
+ };
+};
+
+public:
+ TKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const TSaslHandshakeRequestData* message)
+ : Parent(parent)
+ , CorrelationId(correlationId)
+ , HandshakeRequestData(message)
+ , ReqType(true) {
+ }
+
+ TKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message)
+ : Parent(parent)
+ , CorrelationId(correlationId)
+ , AuthenticateRequestData(message)
+ , Address(address)
+ , ReqType(false) {
+ }
+
+ STATEFN(StateAuth) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult, Handle);
+ HFunc(TEvPrivate::TEvTokenReady, Handle);
+ HFunc(TEvPrivate::TEvAuthFailed, Handle);
+ //savnik: poison pill
+ }
+ }
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+ void Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx);
+ void Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& ctx);
+ void Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx);
+
+ TSaslHandshakeResponseData::TPtr Handshake();
+ void PlainAuth(const NActors::TActorContext& ctx);
+
+ std::pair<TString, TString> ParseLoginAndDatabase(const TString& str);
+
+private:
+ const TActorId Parent;
+ const ui64 CorrelationId;
+ const TSaslHandshakeRequestData* HandshakeRequestData;
+ const TSaslAuthenticateRequestData* AuthenticateRequestData;
+ const NKikimr::NRawSocket::TNetworkConfig::TSocketAddressType Address;
+ const TIntrusiveConstPtr<NACLib::TUserToken> Token;
+ const bool ReqType;
+};
+
+} // NKafka
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index cf19589853..efa5bd2cb1 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -16,10 +16,14 @@ namespace NKafka {
using namespace NActors;
using namespace NKikimr;
+
NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message);
NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message);
NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const ui64 correlationId, const TMetadataRequestData* message);
NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const TString& clientDC);
+NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const TSaslHandshakeRequestData* message);
+NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message);
+
char Hex(const unsigned char c) {
return c < 10 ? '0' + c : 'A' + c - 10;
@@ -73,7 +77,7 @@ public:
NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier;
TString ClientDC;
- i32 CorrelationId = 0;
+ i32 CorrelationId = 1;
std::shared_ptr<Msg> Request;
std::unordered_map<ui64, Msg::TPtr> PendingRequests;
std::deque<Msg::TPtr> PendingRequestsQueue;
@@ -87,6 +91,8 @@ public:
TActorId ProduceActorId;
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+
TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
const NKikimrConfig::TKafkaProxyConfig& config)
@@ -213,6 +219,14 @@ protected:
Register(CreateKafkaMetadataActor(SelfId(), header->CorrelationId, message));
}
+ void HandleMessage(const TRequestHeaderData* header, const TSaslAuthenticateRequestData* message) {
+ Register(CreateKafkaSaslAuthActor(SelfId(), header->CorrelationId, Address, message));
+ }
+
+ void HandleMessage(const TRequestHeaderData* header, const TSaslHandshakeRequestData* message) {
+ Register(CreateKafkaSaslAuthActor(SelfId(), header->CorrelationId, message));
+ }
+
void ProcessRequest() {
KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize
<< ", Size=" << Request->Size);
@@ -240,6 +254,14 @@ protected:
HandleMessage(&Request->Header, dynamic_cast<TMetadataRequestData*>(message));
return;
+ case SASL_HANDSHAKE:
+ HandleMessage(&Request->Header, dynamic_cast<TSaslHandshakeRequestData*>(message));
+ return;
+
+ case SASL_AUTHENTICATE:
+ HandleMessage(&Request->Header, dynamic_cast<TSaslAuthenticateRequestData*>(message));
+ return;
+
default:
KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey);
PassAway();
@@ -251,6 +273,10 @@ protected:
Reply(r->CorrelationId, r->Response);
}
+ void Handle(TEvKafka::TEvAuthSuccess::TPtr auth) {
+ UserToken = auth->Get()->UserToken;
+ }
+
void Reply(const ui64 correlationId, TApiMessage::TPtr response) {
auto it = PendingRequests.find(correlationId);
if (it == PendingRequests.end()) {
@@ -437,6 +463,7 @@ protected:
hFunc(TEvPollerReady, HandleConnected);
hFunc(TEvPollerRegisterResult, HandleConnected);
hFunc(TEvKafka::TEvResponse, Handle);
+ hFunc(TEvKafka::TEvAuthSuccess, Handle);
default:
KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName());
}
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
index cf9bcfadd6..c987e72f73 100644
--- a/ydb/core/kafka_proxy/kafka_events.h
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -4,6 +4,7 @@
#include <ydb/core/base/events.h>
#include "kafka_messages.h"
+#include "ydb/library/aclib/aclib.h"
using namespace NActors;
@@ -13,6 +14,7 @@ struct TEvKafka {
enum EEv {
EvRequest = EventSpaceBegin(NKikimr::TKikimrEvents::TKikimrEvents::ES_KAFKA),
EvProduceRequest,
+ EvAuthSuccess,
EvWakeup,
EvResponse = EvRequest + 256,
EvInternalEvents = EvResponse + 256,
@@ -34,6 +36,14 @@ struct TEvKafka {
const TProduceRequestData* Request;
};
+ struct TEvAuthSuccess : public TEventLocal<TEvAuthSuccess, EvAuthSuccess> {
+ TEvAuthSuccess(TIntrusiveConstPtr<NACLib::TUserToken> token)
+ : UserToken(token)
+ {}
+
+ TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
+ };
+
struct TEvResponse : public TEventLocal<TEvResponse, EvResponse> {
TEvResponse(const ui64 correlationId, const TApiMessage::TPtr response)
: CorrelationId(correlationId)
diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp
index 5e1e40e6f3..68d4a0777c 100644
--- a/ydb/core/kafka_proxy/kafka_messages.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages.cpp
@@ -14,10 +14,14 @@ std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey) {
return std::make_unique<TFetchRequestData>();
case METADATA:
return std::make_unique<TMetadataRequestData>();
+ case SASL_HANDSHAKE:
+ return std::make_unique<TSaslHandshakeRequestData>();
case API_VERSIONS:
return std::make_unique<TApiVersionsRequestData>();
case INIT_PRODUCER_ID:
return std::make_unique<TInitProducerIdRequestData>();
+ case SASL_AUTHENTICATE:
+ return std::make_unique<TSaslAuthenticateRequestData>();
default:
ythrow yexception() << "Unsupported request API key " << apiKey;
}
@@ -31,10 +35,14 @@ std::unique_ptr<TApiMessage> CreateResponse(i16 apiKey) {
return std::make_unique<TFetchResponseData>();
case METADATA:
return std::make_unique<TMetadataResponseData>();
+ case SASL_HANDSHAKE:
+ return std::make_unique<TSaslHandshakeResponseData>();
case API_VERSIONS:
return std::make_unique<TApiVersionsResponseData>();
case INIT_PRODUCER_ID:
return std::make_unique<TInitProducerIdResponseData>();
+ case SASL_AUTHENTICATE:
+ return std::make_unique<TSaslAuthenticateResponseData>();
default:
ythrow yexception() << "Unsupported response API key " << apiKey;
}
@@ -60,6 +68,8 @@ TKafkaVersion RequestHeaderVersion(i16 apiKey, TKafkaVersion _version) {
} else {
return 1;
}
+ case SASL_HANDSHAKE:
+ return 1;
case API_VERSIONS:
if (_version >= 3) {
return 2;
@@ -72,6 +82,12 @@ TKafkaVersion RequestHeaderVersion(i16 apiKey, TKafkaVersion _version) {
} else {
return 1;
}
+ case SASL_AUTHENTICATE:
+ if (_version >= 2) {
+ return 2;
+ } else {
+ return 1;
+ }
default:
ythrow yexception() << "Unsupported API key " << apiKey;
break;
@@ -98,6 +114,8 @@ TKafkaVersion ResponseHeaderVersion(i16 apiKey, TKafkaVersion _version) {
} else {
return 0;
}
+ case SASL_HANDSHAKE:
+ return 0;
case API_VERSIONS:
// ApiVersionsResponse always includes a v0 header.
// See KIP-511 for details.
@@ -108,6 +126,12 @@ TKafkaVersion ResponseHeaderVersion(i16 apiKey, TKafkaVersion _version) {
} else {
return 0;
}
+ case SASL_AUTHENTICATE:
+ if (_version >= 2) {
+ return 1;
+ } else {
+ return 0;
+ }
default:
ythrow yexception() << "Unsupported API key " << apiKey;
break;
@@ -1855,6 +1879,115 @@ i32 TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::S
//
+// TSaslHandshakeRequestData
+//
+const TSaslHandshakeRequestData::MechanismMeta::Type TSaslHandshakeRequestData::MechanismMeta::Default = {""};
+
+TSaslHandshakeRequestData::TSaslHandshakeRequestData()
+ : Mechanism(MechanismMeta::Default)
+{}
+
+void TSaslHandshakeRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TSaslHandshakeRequestData";
+ }
+ NPrivate::Read<MechanismMeta>(_readable, _version, Mechanism);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TSaslHandshakeRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TSaslHandshakeRequestData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<MechanismMeta>(_collector, _writable, _version, Mechanism);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TSaslHandshakeRequestData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<MechanismMeta>(_collector, _version, Mechanism);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TSaslHandshakeResponseData
+//
+const TSaslHandshakeResponseData::ErrorCodeMeta::Type TSaslHandshakeResponseData::ErrorCodeMeta::Default = 0;
+
+TSaslHandshakeResponseData::TSaslHandshakeResponseData()
+ : ErrorCode(ErrorCodeMeta::Default)
+{}
+
+void TSaslHandshakeResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TSaslHandshakeResponseData";
+ }
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<MechanismsMeta>(_readable, _version, Mechanisms);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TSaslHandshakeResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TSaslHandshakeResponseData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+ NPrivate::Write<MechanismsMeta>(_collector, _writable, _version, Mechanisms);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TSaslHandshakeResponseData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+ NPrivate::Size<MechanismsMeta>(_collector, _version, Mechanisms);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
// TApiVersionsRequestData
//
const TApiVersionsRequestData::ClientSoftwareNameMeta::Type TApiVersionsRequestData::ClientSoftwareNameMeta::Default = {""};
@@ -2328,4 +2461,121 @@ i32 TInitProducerIdResponseData::Size(TKafkaVersion _version) const {
}
return _collector.Size;
}
+
+
+//
+// TSaslAuthenticateRequestData
+//
+
+TSaslAuthenticateRequestData::TSaslAuthenticateRequestData()
+{}
+
+void TSaslAuthenticateRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TSaslAuthenticateRequestData";
+ }
+ NPrivate::Read<AuthBytesMeta>(_readable, _version, AuthBytes);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TSaslAuthenticateRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TSaslAuthenticateRequestData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<AuthBytesMeta>(_collector, _writable, _version, AuthBytes);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TSaslAuthenticateRequestData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<AuthBytesMeta>(_collector, _version, AuthBytes);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
+
+
+//
+// TSaslAuthenticateResponseData
+//
+const TSaslAuthenticateResponseData::ErrorCodeMeta::Type TSaslAuthenticateResponseData::ErrorCodeMeta::Default = 0;
+const TSaslAuthenticateResponseData::ErrorMessageMeta::Type TSaslAuthenticateResponseData::ErrorMessageMeta::Default = {""};
+const TSaslAuthenticateResponseData::SessionLifetimeMsMeta::Type TSaslAuthenticateResponseData::SessionLifetimeMsMeta::Default = 0;
+
+TSaslAuthenticateResponseData::TSaslAuthenticateResponseData()
+ : ErrorCode(ErrorCodeMeta::Default)
+ , ErrorMessage(ErrorMessageMeta::Default)
+ , SessionLifetimeMs(SessionLifetimeMsMeta::Default)
+{}
+
+void TSaslAuthenticateResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't read version " << _version << " of TSaslAuthenticateResponseData";
+ }
+ NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode);
+ NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage);
+ NPrivate::Read<AuthBytesMeta>(_readable, _version, AuthBytes);
+ NPrivate::Read<SessionLifetimeMsMeta>(_readable, _version, SessionLifetimeMs);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ int _numTaggedFields = _readable.readUnsignedVarint();
+ for (int _i = 0; _i < _numTaggedFields; ++_i) {
+ int _tag = _readable.readUnsignedVarint();
+ int _size = _readable.readUnsignedVarint();
+ switch (_tag) {
+ default:
+ _readable.skip(_size); // skip unknown tag
+ break;
+ }
+ }
+ }
+}
+
+void TSaslAuthenticateResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
+ if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
+ ythrow yexception() << "Can't write version " << _version << " of TSaslAuthenticateResponseData";
+ }
+ NPrivate::TWriteCollector _collector;
+ NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode);
+ NPrivate::Write<ErrorMessageMeta>(_collector, _writable, _version, ErrorMessage);
+ NPrivate::Write<AuthBytesMeta>(_collector, _writable, _version, AuthBytes);
+ NPrivate::Write<SessionLifetimeMsMeta>(_collector, _writable, _version, SessionLifetimeMs);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _writable.writeUnsignedVarint(_collector.NumTaggedFields);
+
+ }
+}
+
+i32 TSaslAuthenticateResponseData::Size(TKafkaVersion _version) const {
+ NPrivate::TSizeCollector _collector;
+ NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode);
+ NPrivate::Size<ErrorMessageMeta>(_collector, _version, ErrorMessage);
+ NPrivate::Size<AuthBytesMeta>(_collector, _version, AuthBytes);
+ NPrivate::Size<SessionLifetimeMsMeta>(_collector, _version, SessionLifetimeMs);
+
+ if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
+ _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
+ }
+ return _collector.Size;
+}
} //namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h
index 28c8d2388d..0d63f9fd9e 100644
--- a/ydb/core/kafka_proxy/kafka_messages.h
+++ b/ydb/core/kafka_proxy/kafka_messages.h
@@ -19,8 +19,10 @@ enum EApiKey {
PRODUCE = 0, // [ZK_BROKER, BROKER]
FETCH = 1, // [ZK_BROKER, BROKER, CONTROLLER]
METADATA = 3, // [ZK_BROKER, BROKER]
+ SASL_HANDSHAKE = 17, // [ZK_BROKER, BROKER, CONTROLLER]
API_VERSIONS = 18, // [ZK_BROKER, BROKER, CONTROLLER]
INIT_PRODUCER_ID = 22, // [ZK_BROKER, BROKER]
+ SASL_AUTHENTICATE = 36, // [ZK_BROKER, BROKER, CONTROLLER]
};
@@ -2080,6 +2082,94 @@ public:
};
+class TSaslHandshakeRequestData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TSaslHandshakeRequestData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 1};
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+
+ TSaslHandshakeRequestData();
+ ~TSaslHandshakeRequestData() = default;
+
+ struct MechanismMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "mechanism";
+ static constexpr const char* About = "The SASL mechanism chosen by the client.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ MechanismMeta::Type Mechanism;
+
+ i16 ApiKey() const override { return SASL_HANDSHAKE; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TSaslHandshakeRequestData& other) const = default;
+};
+
+
+class TSaslHandshakeResponseData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TSaslHandshakeResponseData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 1};
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+
+ TSaslHandshakeResponseData();
+ ~TSaslHandshakeResponseData() = default;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The error code, or 0 if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ struct MechanismsMeta {
+ using ItemType = TKafkaString;
+ using ItemTypeDesc = NPrivate::TKafkaStringDesc;
+ using Type = std::vector<TKafkaString>;
+ using TypeDesc = NPrivate::TKafkaArrayDesc;
+
+ static constexpr const char* Name = "mechanisms";
+ static constexpr const char* About = "The mechanisms enabled in the server.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = VersionsNever;
+ };
+ MechanismsMeta::Type Mechanisms;
+
+ i16 ApiKey() const override { return SASL_HANDSHAKE; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TSaslHandshakeResponseData& other) const = default;
+};
+
+
class TApiVersionsRequestData : public TApiMessage {
public:
typedef std::shared_ptr<TApiVersionsRequestData> TPtr;
@@ -2611,4 +2701,119 @@ public:
bool operator==(const TInitProducerIdResponseData& other) const = default;
};
+
+class TSaslAuthenticateRequestData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TSaslAuthenticateRequestData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 2};
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+
+ TSaslAuthenticateRequestData();
+ ~TSaslAuthenticateRequestData() = default;
+
+ struct AuthBytesMeta {
+ using Type = TKafkaBytes;
+ using TypeDesc = NPrivate::TKafkaBytesDesc;
+
+ static constexpr const char* Name = "authBytes";
+ static constexpr const char* About = "The SASL authentication bytes from the client, as defined by the SASL mechanism.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ AuthBytesMeta::Type AuthBytes;
+
+ i16 ApiKey() const override { return SASL_AUTHENTICATE; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TSaslAuthenticateRequestData& other) const = default;
+};
+
+
+class TSaslAuthenticateResponseData : public TApiMessage {
+public:
+ typedef std::shared_ptr<TSaslAuthenticateResponseData> TPtr;
+
+ struct MessageMeta {
+ static constexpr TKafkaVersions PresentVersions = {0, 2};
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+
+ TSaslAuthenticateResponseData();
+ ~TSaslAuthenticateResponseData() = default;
+
+ struct ErrorCodeMeta {
+ using Type = TKafkaInt16;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "errorCode";
+ static constexpr const char* About = "The error code, or 0 if there was no error.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ ErrorCodeMeta::Type ErrorCode;
+
+ struct ErrorMessageMeta {
+ using Type = TKafkaString;
+ using TypeDesc = NPrivate::TKafkaStringDesc;
+
+ static constexpr const char* Name = "errorMessage";
+ static constexpr const char* About = "The error message, or null if there was no error.";
+ static const Type Default; // = {""};
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsAlways;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ ErrorMessageMeta::Type ErrorMessage;
+
+ struct AuthBytesMeta {
+ using Type = TKafkaBytes;
+ using TypeDesc = NPrivate::TKafkaBytesDesc;
+
+ static constexpr const char* Name = "authBytes";
+ static constexpr const char* About = "The SASL authentication bytes from the server, as defined by the SASL mechanism.";
+
+ static constexpr TKafkaVersions PresentVersions = VersionsAlways;
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ AuthBytesMeta::Type AuthBytes;
+
+ struct SessionLifetimeMsMeta {
+ using Type = TKafkaInt64;
+ using TypeDesc = NPrivate::TKafkaIntDesc;
+
+ static constexpr const char* Name = "sessionLifetimeMs";
+ static constexpr const char* About = "The SASL authentication bytes from the server, as defined by the SASL mechanism.";
+ static const Type Default; // = 0;
+
+ static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()};
+ static constexpr TKafkaVersions TaggedVersions = VersionsNever;
+ static constexpr TKafkaVersions NullableVersions = VersionsNever;
+ static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
+ };
+ SessionLifetimeMsMeta::Type SessionLifetimeMs;
+
+ i16 ApiKey() const override { return SASL_AUTHENTICATE; };
+ i32 Size(TKafkaVersion version) const override;
+ void Read(TKafkaReadable& readable, TKafkaVersion version) override;
+ void Write(TKafkaWritable& writable, TKafkaVersion version) const override;
+
+ bool operator==(const TSaslAuthenticateResponseData& other) const = default;
+};
+
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make
index 03fab884a5..a9aba3a277 100644
--- a/ydb/core/kafka_proxy/ya.make
+++ b/ydb/core/kafka_proxy/ya.make
@@ -5,6 +5,7 @@ SRCS(
actors/kafka_init_producer_id_actor.cpp
actors/kafka_metadata_actor.cpp
actors/kafka_produce_actor.cpp
+ actors/kafka_sasl_auth_actor.cpp
kafka_connection.cpp
kafka_connection.h
kafka_listener.h