diff options
author | savnik <savnik@yandex-team.com> | 2023-08-09 13:31:44 +0300 |
---|---|---|
committer | savnik <savnik@yandex-team.com> | 2023-08-09 16:30:30 +0300 |
commit | db84ab468d808d350c525103bec68c4958f9114b (patch) | |
tree | 7b865bfc4462b18bb6f232a233e78718f7fa2768 | |
parent | 4700a34cf159bab16151bc9ae30c259884bbf6a3 (diff) | |
download | ydb-db84ab468d808d350c525103bec68c4958f9114b.tar.gz |
Kafka auth
-rw-r--r-- | ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp | 127 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h | 77 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 29 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_events.h | 10 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages.cpp | 250 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages.h | 205 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ya.make | 1 |
12 files changed, 711 insertions, 2 deletions
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt index 4fe0387ca6..732c093b32 100644 --- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt @@ -24,6 +24,7 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt index 2acbd612c9..dc2faaaf69 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt @@ -25,6 +25,7 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt index 2acbd612c9..dc2faaaf69 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt @@ -25,6 +25,7 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt index 4fe0387ca6..732c093b32 100644 --- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt @@ -24,6 +24,7 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp diff --git a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp index 4acdd053b2..220e04aa52 100644 --- a/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp @@ -8,7 +8,7 @@ NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 c TApiVersionsResponseData::TPtr GetApiVersions() { TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>(); - response->ApiKeys.resize(4); + response->ApiKeys.resize(6); response->ApiKeys[0].ApiKey = PRODUCE; response->ApiKeys[0].MinVersion = 3; // From version 3 record batch format is 2. Supported only 2th batch format. @@ -26,6 +26,14 @@ TApiVersionsResponseData::TPtr GetApiVersions() { response->ApiKeys[3].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Min; response->ApiKeys[3].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Max; + response->ApiKeys[4].ApiKey = SASL_HANDSHAKE; + response->ApiKeys[4].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Min;//savnik: check + response->ApiKeys[4].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Max; + + response->ApiKeys[5].ApiKey = SASL_AUTHENTICATE; + response->ApiKeys[5].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Min; + response->ApiKeys[5].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersions.Max; + return response; } diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp new file mode 100644 index 0000000000..c0f39b7207 --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp @@ -0,0 +1,127 @@ +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> +#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> +#include <library/cpp/actors/core/actor.h> +#include <ydb/core/base/ticket_parser.h> +#include "kafka_sasl_auth_actor.h" + +namespace NKafka { + +NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const TSaslHandshakeRequestData* message) { + return new TKafkaSaslAuthActor(parent, correlationId, message); +} + +NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message) { + return new TKafkaSaslAuthActor(parent, correlationId, address, message); +} + +void TKafkaSaslAuthActor::Bootstrap(const NActors::TActorContext& ctx) { + if (ReqType) { + Send(Parent, new TEvKafka::TEvResponse(CorrelationId, Handshake())); + Die(ctx); + } else { + Become(&TKafkaSaslAuthActor::StateAuth); + PlainAuth(ctx); + } +} + +void TKafkaSaslAuthActor::PlainAuth(const NActors::TActorContext& ctx) { + TKafkaBytes auth_bytes_opt = AuthenticateRequestData->AuthBytes; + TString username; + TString password; + TString database; + + if (auth_bytes_opt.has_value()) { + TKafkaRawBytes auth_bytes = auth_bytes_opt.value(); + TString auth_data(auth_bytes.data(), auth_bytes.size()); + + size_t first_null_pos = auth_data.find('\0'); + if(first_null_pos != TString::npos && first_null_pos < auth_data.length() - 1) { + size_t second_null_pos = auth_data.find('\0', first_null_pos + 1); + if(second_null_pos != TString::npos) { + auto loginAndDatabase = auth_data.substr(first_null_pos + 1, second_null_pos - first_null_pos - 1); + std::tie(username, database) = ParseLoginAndDatabase(loginAndDatabase); + password = auth_data.substr(second_null_pos + 1); + } + } + } + + Ydb::Auth::LoginRequest request; + request.set_user(username); + request.set_password(password); + TActorSystem* actorSystem = ctx.ActorSystem(); + using TRpcEv = NKikimr::NGRpcService::TGRpcRequestWrapperNoAuth<NKikimr::NGRpcService::TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>; + auto rpcFuture = NKikimr::NRpcService::DoLocalRpc<TRpcEv>(std::move(request), database, {}, actorSystem); + rpcFuture.Subscribe([database, actorSystem, selfId = SelfId()](const NThreading::TFuture<Ydb::Auth::LoginResponse>& future) { + auto& response = future.GetValueSync(); + if (response.operation().status() == Ydb::StatusIds::SUCCESS) { + auto tokenReady = std::make_unique<TEvPrivate::TEvTokenReady>(); + response.operation().result().UnpackTo(&(tokenReady->LoginResult)); + tokenReady->Database = database; + actorSystem->Send(selfId, tokenReady.release()); + } else { + auto authFailed = std::make_unique<TEvPrivate::TEvAuthFailed>(); + if (response.operation().issues_size() > 0) { + authFailed->ErrorMessage = response.operation().issues(0).message(); + } else { + authFailed->ErrorMessage = Ydb::StatusIds_StatusCode_Name(response.operation().status()); + } + actorSystem->Send(selfId, authFailed.release()); + } + }); +} + +void TKafkaSaslAuthActor::Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx) { + TSaslAuthenticateResponseData::TPtr response = std::make_shared<TSaslAuthenticateResponseData>(); + response->AuthBytes = TKafkaRawBytes("", ""); + if (ev->Get()->Error) { + response->ErrorCode = 1; + response->ErrorMessage = ev->Get()->Error.Message; + //savnik: close conn + } else { + response->ErrorCode = 0; + response->ErrorMessage = ""; + Send(Parent, new TEvKafka::TEvAuthSuccess(ev->Get()->Token)); + } + Send(Parent, new TEvKafka::TEvResponse(CorrelationId, response)); + Die(ctx); +} + +void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext&) { + Send(NKikimr::MakeTicketParserID(), new NKikimr::TEvTicketParser::TEvAuthorizeTicket({ + .Database = ev->Get()->Database, + .Ticket = ev->Get()->LoginResult.token(), + .PeerName = TStringBuilder() << Address, + })); +} + +void TKafkaSaslAuthActor::Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx) { + TSaslAuthenticateResponseData::TPtr saslResponse = std::make_shared<TSaslAuthenticateResponseData>(); + saslResponse->AuthBytes = TKafkaRawBytes("", ""); + saslResponse->ErrorCode = 1; + saslResponse->ErrorMessage = ev->Get()->ErrorMessage; + Send(Parent, new TEvKafka::TEvResponse(CorrelationId, saslResponse)); + //savnik: close conn + Die(ctx); +} + +std::pair<TString, TString> TKafkaSaslAuthActor::ParseLoginAndDatabase(const TString& str) { + size_t pos = str.rfind('@'); + if (pos == std::string::npos) { + return {str, ""}; //savnik + } + return {str.substr(0, pos), str.substr(pos + 1)}; +} + +TSaslHandshakeResponseData::TPtr TKafkaSaslAuthActor::Handshake() { + TSaslHandshakeResponseData::TPtr response = std::make_shared<TSaslHandshakeResponseData>(); + if (HandshakeRequestData->Mechanism == "PLAIN") { + response->ErrorCode = 0; + response->Mechanisms.push_back("PLAIN"); + } else { + response->ErrorCode = 1; + } + + return response; +} + +}
\ No newline at end of file diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h new file mode 100644 index 0000000000..fab9482cae --- /dev/null +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h @@ -0,0 +1,77 @@ +#include "ydb/library/aclib/aclib.h" +#include <ydb/core/base/ticket_parser.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> + +#include "../kafka_events.h" + +namespace NKafka { + +class TKafkaSaslAuthActor: public NActors::TActorBootstrapped<TKafkaSaslAuthActor> { + +struct TEvPrivate { + enum EEv { + EvTokenReady = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvAuthFailed, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + struct TEvTokenReady : TEventLocal<TEvTokenReady, EvTokenReady> { + Ydb::Auth::LoginResult LoginResult; + TString Database; + }; + + struct TEvAuthFailed : TEventLocal<TEvAuthFailed, EvAuthFailed> { + TString ErrorMessage; + }; +}; + +public: + TKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const TSaslHandshakeRequestData* message) + : Parent(parent) + , CorrelationId(correlationId) + , HandshakeRequestData(message) + , ReqType(true) { + } + + TKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message) + : Parent(parent) + , CorrelationId(correlationId) + , AuthenticateRequestData(message) + , Address(address) + , ReqType(false) { + } + + STATEFN(StateAuth) { + switch (ev->GetTypeRewrite()) { + HFunc(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult, Handle); + HFunc(TEvPrivate::TEvTokenReady, Handle); + HFunc(TEvPrivate::TEvAuthFailed, Handle); + //savnik: poison pill + } + } + + void Bootstrap(const NActors::TActorContext& ctx); + + void Handle(NKikimr::TEvTicketParser::TEvAuthorizeTicketResult::TPtr& ev, const NActors::TActorContext& ctx); + void Handle(TEvPrivate::TEvTokenReady::TPtr& ev, const NActors::TActorContext& ctx); + void Handle(TEvPrivate::TEvAuthFailed::TPtr& ev, const NActors::TActorContext& ctx); + + TSaslHandshakeResponseData::TPtr Handshake(); + void PlainAuth(const NActors::TActorContext& ctx); + + std::pair<TString, TString> ParseLoginAndDatabase(const TString& str); + +private: + const TActorId Parent; + const ui64 CorrelationId; + const TSaslHandshakeRequestData* HandshakeRequestData; + const TSaslAuthenticateRequestData* AuthenticateRequestData; + const NKikimr::NRawSocket::TNetworkConfig::TSocketAddressType Address; + const TIntrusiveConstPtr<NACLib::TUserToken> Token; + const bool ReqType; +}; + +} // NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index cf19589853..efa5bd2cb1 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -16,10 +16,14 @@ namespace NKafka { using namespace NActors; using namespace NKikimr; + NActors::IActor* CreateKafkaApiVersionsActor(const TActorId parent, const ui64 correlationId, const TApiVersionsRequestData* message); NActors::IActor* CreateKafkaInitProducerIdActor(const TActorId parent, const ui64 correlationId, const TInitProducerIdRequestData* message); NActors::IActor* CreateKafkaMetadataActor(const TActorId parent, const ui64 correlationId, const TMetadataRequestData* message); NActors::IActor* CreateKafkaProduceActor(const TActorId parent, const TString& clientDC); +NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const TSaslHandshakeRequestData* message); +NActors::IActor* CreateKafkaSaslAuthActor(const TActorId parent, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TSaslAuthenticateRequestData* message); + char Hex(const unsigned char c) { return c < 10 ? '0' + c : 'A' + c - 10; @@ -73,7 +77,7 @@ public: NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; TString ClientDC; - i32 CorrelationId = 0; + i32 CorrelationId = 1; std::shared_ptr<Msg> Request; std::unordered_map<ui64, Msg::TPtr> PendingRequests; std::deque<Msg::TPtr> PendingRequestsQueue; @@ -87,6 +91,8 @@ public: TActorId ProduceActorId; + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const NKikimrConfig::TKafkaProxyConfig& config) @@ -213,6 +219,14 @@ protected: Register(CreateKafkaMetadataActor(SelfId(), header->CorrelationId, message)); } + void HandleMessage(const TRequestHeaderData* header, const TSaslAuthenticateRequestData* message) { + Register(CreateKafkaSaslAuthActor(SelfId(), header->CorrelationId, Address, message)); + } + + void HandleMessage(const TRequestHeaderData* header, const TSaslHandshakeRequestData* message) { + Register(CreateKafkaSaslAuthActor(SelfId(), header->CorrelationId, message)); + } + void ProcessRequest() { KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize << ", Size=" << Request->Size); @@ -240,6 +254,14 @@ protected: HandleMessage(&Request->Header, dynamic_cast<TMetadataRequestData*>(message)); return; + case SASL_HANDSHAKE: + HandleMessage(&Request->Header, dynamic_cast<TSaslHandshakeRequestData*>(message)); + return; + + case SASL_AUTHENTICATE: + HandleMessage(&Request->Header, dynamic_cast<TSaslAuthenticateRequestData*>(message)); + return; + default: KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey); PassAway(); @@ -251,6 +273,10 @@ protected: Reply(r->CorrelationId, r->Response); } + void Handle(TEvKafka::TEvAuthSuccess::TPtr auth) { + UserToken = auth->Get()->UserToken; + } + void Reply(const ui64 correlationId, TApiMessage::TPtr response) { auto it = PendingRequests.find(correlationId); if (it == PendingRequests.end()) { @@ -437,6 +463,7 @@ protected: hFunc(TEvPollerReady, HandleConnected); hFunc(TEvPollerRegisterResult, HandleConnected); hFunc(TEvKafka::TEvResponse, Handle); + hFunc(TEvKafka::TEvAuthSuccess, Handle); default: KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName()); } diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index cf9bcfadd6..c987e72f73 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -4,6 +4,7 @@ #include <ydb/core/base/events.h> #include "kafka_messages.h" +#include "ydb/library/aclib/aclib.h" using namespace NActors; @@ -13,6 +14,7 @@ struct TEvKafka { enum EEv { EvRequest = EventSpaceBegin(NKikimr::TKikimrEvents::TKikimrEvents::ES_KAFKA), EvProduceRequest, + EvAuthSuccess, EvWakeup, EvResponse = EvRequest + 256, EvInternalEvents = EvResponse + 256, @@ -34,6 +36,14 @@ struct TEvKafka { const TProduceRequestData* Request; }; + struct TEvAuthSuccess : public TEventLocal<TEvAuthSuccess, EvAuthSuccess> { + TEvAuthSuccess(TIntrusiveConstPtr<NACLib::TUserToken> token) + : UserToken(token) + {} + + TIntrusiveConstPtr<NACLib::TUserToken> UserToken; + }; + struct TEvResponse : public TEventLocal<TEvResponse, EvResponse> { TEvResponse(const ui64 correlationId, const TApiMessage::TPtr response) : CorrelationId(correlationId) diff --git a/ydb/core/kafka_proxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index 5e1e40e6f3..68d4a0777c 100644 --- a/ydb/core/kafka_proxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp @@ -14,10 +14,14 @@ std::unique_ptr<TApiMessage> CreateRequest(i16 apiKey) { return std::make_unique<TFetchRequestData>(); case METADATA: return std::make_unique<TMetadataRequestData>(); + case SASL_HANDSHAKE: + return std::make_unique<TSaslHandshakeRequestData>(); case API_VERSIONS: return std::make_unique<TApiVersionsRequestData>(); case INIT_PRODUCER_ID: return std::make_unique<TInitProducerIdRequestData>(); + case SASL_AUTHENTICATE: + return std::make_unique<TSaslAuthenticateRequestData>(); default: ythrow yexception() << "Unsupported request API key " << apiKey; } @@ -31,10 +35,14 @@ std::unique_ptr<TApiMessage> CreateResponse(i16 apiKey) { return std::make_unique<TFetchResponseData>(); case METADATA: return std::make_unique<TMetadataResponseData>(); + case SASL_HANDSHAKE: + return std::make_unique<TSaslHandshakeResponseData>(); case API_VERSIONS: return std::make_unique<TApiVersionsResponseData>(); case INIT_PRODUCER_ID: return std::make_unique<TInitProducerIdResponseData>(); + case SASL_AUTHENTICATE: + return std::make_unique<TSaslAuthenticateResponseData>(); default: ythrow yexception() << "Unsupported response API key " << apiKey; } @@ -60,6 +68,8 @@ TKafkaVersion RequestHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 1; } + case SASL_HANDSHAKE: + return 1; case API_VERSIONS: if (_version >= 3) { return 2; @@ -72,6 +82,12 @@ TKafkaVersion RequestHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 1; } + case SASL_AUTHENTICATE: + if (_version >= 2) { + return 2; + } else { + return 1; + } default: ythrow yexception() << "Unsupported API key " << apiKey; break; @@ -98,6 +114,8 @@ TKafkaVersion ResponseHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 0; } + case SASL_HANDSHAKE: + return 0; case API_VERSIONS: // ApiVersionsResponse always includes a v0 header. // See KIP-511 for details. @@ -108,6 +126,12 @@ TKafkaVersion ResponseHeaderVersion(i16 apiKey, TKafkaVersion _version) { } else { return 0; } + case SASL_AUTHENTICATE: + if (_version >= 2) { + return 1; + } else { + return 0; + } default: ythrow yexception() << "Unsupported API key " << apiKey; break; @@ -1855,6 +1879,115 @@ i32 TMetadataResponseData::TMetadataResponseTopic::TMetadataResponsePartition::S // +// TSaslHandshakeRequestData +// +const TSaslHandshakeRequestData::MechanismMeta::Type TSaslHandshakeRequestData::MechanismMeta::Default = {""}; + +TSaslHandshakeRequestData::TSaslHandshakeRequestData() + : Mechanism(MechanismMeta::Default) +{} + +void TSaslHandshakeRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TSaslHandshakeRequestData"; + } + NPrivate::Read<MechanismMeta>(_readable, _version, Mechanism); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TSaslHandshakeRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TSaslHandshakeRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<MechanismMeta>(_collector, _writable, _version, Mechanism); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TSaslHandshakeRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<MechanismMeta>(_collector, _version, Mechanism); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TSaslHandshakeResponseData +// +const TSaslHandshakeResponseData::ErrorCodeMeta::Type TSaslHandshakeResponseData::ErrorCodeMeta::Default = 0; + +TSaslHandshakeResponseData::TSaslHandshakeResponseData() + : ErrorCode(ErrorCodeMeta::Default) +{} + +void TSaslHandshakeResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TSaslHandshakeResponseData"; + } + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<MechanismsMeta>(_readable, _version, Mechanisms); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TSaslHandshakeResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TSaslHandshakeResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + NPrivate::Write<MechanismsMeta>(_collector, _writable, _version, Mechanisms); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TSaslHandshakeResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + NPrivate::Size<MechanismsMeta>(_collector, _version, Mechanisms); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// // TApiVersionsRequestData // const TApiVersionsRequestData::ClientSoftwareNameMeta::Type TApiVersionsRequestData::ClientSoftwareNameMeta::Default = {""}; @@ -2328,4 +2461,121 @@ i32 TInitProducerIdResponseData::Size(TKafkaVersion _version) const { } return _collector.Size; } + + +// +// TSaslAuthenticateRequestData +// + +TSaslAuthenticateRequestData::TSaslAuthenticateRequestData() +{} + +void TSaslAuthenticateRequestData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TSaslAuthenticateRequestData"; + } + NPrivate::Read<AuthBytesMeta>(_readable, _version, AuthBytes); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TSaslAuthenticateRequestData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TSaslAuthenticateRequestData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<AuthBytesMeta>(_collector, _writable, _version, AuthBytes); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TSaslAuthenticateRequestData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<AuthBytesMeta>(_collector, _version, AuthBytes); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} + + +// +// TSaslAuthenticateResponseData +// +const TSaslAuthenticateResponseData::ErrorCodeMeta::Type TSaslAuthenticateResponseData::ErrorCodeMeta::Default = 0; +const TSaslAuthenticateResponseData::ErrorMessageMeta::Type TSaslAuthenticateResponseData::ErrorMessageMeta::Default = {""}; +const TSaslAuthenticateResponseData::SessionLifetimeMsMeta::Type TSaslAuthenticateResponseData::SessionLifetimeMsMeta::Default = 0; + +TSaslAuthenticateResponseData::TSaslAuthenticateResponseData() + : ErrorCode(ErrorCodeMeta::Default) + , ErrorMessage(ErrorMessageMeta::Default) + , SessionLifetimeMs(SessionLifetimeMsMeta::Default) +{} + +void TSaslAuthenticateResponseData::Read(TKafkaReadable& _readable, TKafkaVersion _version) { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't read version " << _version << " of TSaslAuthenticateResponseData"; + } + NPrivate::Read<ErrorCodeMeta>(_readable, _version, ErrorCode); + NPrivate::Read<ErrorMessageMeta>(_readable, _version, ErrorMessage); + NPrivate::Read<AuthBytesMeta>(_readable, _version, AuthBytes); + NPrivate::Read<SessionLifetimeMsMeta>(_readable, _version, SessionLifetimeMs); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + int _numTaggedFields = _readable.readUnsignedVarint(); + for (int _i = 0; _i < _numTaggedFields; ++_i) { + int _tag = _readable.readUnsignedVarint(); + int _size = _readable.readUnsignedVarint(); + switch (_tag) { + default: + _readable.skip(_size); // skip unknown tag + break; + } + } + } +} + +void TSaslAuthenticateResponseData::Write(TKafkaWritable& _writable, TKafkaVersion _version) const { + if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) { + ythrow yexception() << "Can't write version " << _version << " of TSaslAuthenticateResponseData"; + } + NPrivate::TWriteCollector _collector; + NPrivate::Write<ErrorCodeMeta>(_collector, _writable, _version, ErrorCode); + NPrivate::Write<ErrorMessageMeta>(_collector, _writable, _version, ErrorMessage); + NPrivate::Write<AuthBytesMeta>(_collector, _writable, _version, AuthBytes); + NPrivate::Write<SessionLifetimeMsMeta>(_collector, _writable, _version, SessionLifetimeMs); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _writable.writeUnsignedVarint(_collector.NumTaggedFields); + + } +} + +i32 TSaslAuthenticateResponseData::Size(TKafkaVersion _version) const { + NPrivate::TSizeCollector _collector; + NPrivate::Size<ErrorCodeMeta>(_collector, _version, ErrorCode); + NPrivate::Size<ErrorMessageMeta>(_collector, _version, ErrorMessage); + NPrivate::Size<AuthBytesMeta>(_collector, _version, AuthBytes); + NPrivate::Size<SessionLifetimeMsMeta>(_collector, _version, SessionLifetimeMs); + + if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) { + _collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields); + } + return _collector.Size; +} } //namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h index 28c8d2388d..0d63f9fd9e 100644 --- a/ydb/core/kafka_proxy/kafka_messages.h +++ b/ydb/core/kafka_proxy/kafka_messages.h @@ -19,8 +19,10 @@ enum EApiKey { PRODUCE = 0, // [ZK_BROKER, BROKER] FETCH = 1, // [ZK_BROKER, BROKER, CONTROLLER] METADATA = 3, // [ZK_BROKER, BROKER] + SASL_HANDSHAKE = 17, // [ZK_BROKER, BROKER, CONTROLLER] API_VERSIONS = 18, // [ZK_BROKER, BROKER, CONTROLLER] INIT_PRODUCER_ID = 22, // [ZK_BROKER, BROKER] + SASL_AUTHENTICATE = 36, // [ZK_BROKER, BROKER, CONTROLLER] }; @@ -2080,6 +2082,94 @@ public: }; +class TSaslHandshakeRequestData : public TApiMessage { +public: + typedef std::shared_ptr<TSaslHandshakeRequestData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 1}; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + + TSaslHandshakeRequestData(); + ~TSaslHandshakeRequestData() = default; + + struct MechanismMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "mechanism"; + static constexpr const char* About = "The SASL mechanism chosen by the client."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + MechanismMeta::Type Mechanism; + + i16 ApiKey() const override { return SASL_HANDSHAKE; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TSaslHandshakeRequestData& other) const = default; +}; + + +class TSaslHandshakeResponseData : public TApiMessage { +public: + typedef std::shared_ptr<TSaslHandshakeResponseData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 1}; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + + TSaslHandshakeResponseData(); + ~TSaslHandshakeResponseData() = default; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + ErrorCodeMeta::Type ErrorCode; + + struct MechanismsMeta { + using ItemType = TKafkaString; + using ItemTypeDesc = NPrivate::TKafkaStringDesc; + using Type = std::vector<TKafkaString>; + using TypeDesc = NPrivate::TKafkaArrayDesc; + + static constexpr const char* Name = "mechanisms"; + static constexpr const char* About = "The mechanisms enabled in the server."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = VersionsNever; + }; + MechanismsMeta::Type Mechanisms; + + i16 ApiKey() const override { return SASL_HANDSHAKE; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TSaslHandshakeResponseData& other) const = default; +}; + + class TApiVersionsRequestData : public TApiMessage { public: typedef std::shared_ptr<TApiVersionsRequestData> TPtr; @@ -2611,4 +2701,119 @@ public: bool operator==(const TInitProducerIdResponseData& other) const = default; }; + +class TSaslAuthenticateRequestData : public TApiMessage { +public: + typedef std::shared_ptr<TSaslAuthenticateRequestData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 2}; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + + TSaslAuthenticateRequestData(); + ~TSaslAuthenticateRequestData() = default; + + struct AuthBytesMeta { + using Type = TKafkaBytes; + using TypeDesc = NPrivate::TKafkaBytesDesc; + + static constexpr const char* Name = "authBytes"; + static constexpr const char* About = "The SASL authentication bytes from the client, as defined by the SASL mechanism."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + AuthBytesMeta::Type AuthBytes; + + i16 ApiKey() const override { return SASL_AUTHENTICATE; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TSaslAuthenticateRequestData& other) const = default; +}; + + +class TSaslAuthenticateResponseData : public TApiMessage { +public: + typedef std::shared_ptr<TSaslAuthenticateResponseData> TPtr; + + struct MessageMeta { + static constexpr TKafkaVersions PresentVersions = {0, 2}; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + + TSaslAuthenticateResponseData(); + ~TSaslAuthenticateResponseData() = default; + + struct ErrorCodeMeta { + using Type = TKafkaInt16; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "errorCode"; + static constexpr const char* About = "The error code, or 0 if there was no error."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + ErrorCodeMeta::Type ErrorCode; + + struct ErrorMessageMeta { + using Type = TKafkaString; + using TypeDesc = NPrivate::TKafkaStringDesc; + + static constexpr const char* Name = "errorMessage"; + static constexpr const char* About = "The error message, or null if there was no error."; + static const Type Default; // = {""}; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsAlways; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + ErrorMessageMeta::Type ErrorMessage; + + struct AuthBytesMeta { + using Type = TKafkaBytes; + using TypeDesc = NPrivate::TKafkaBytesDesc; + + static constexpr const char* Name = "authBytes"; + static constexpr const char* About = "The SASL authentication bytes from the server, as defined by the SASL mechanism."; + + static constexpr TKafkaVersions PresentVersions = VersionsAlways; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + AuthBytesMeta::Type AuthBytes; + + struct SessionLifetimeMsMeta { + using Type = TKafkaInt64; + using TypeDesc = NPrivate::TKafkaIntDesc; + + static constexpr const char* Name = "sessionLifetimeMs"; + static constexpr const char* About = "The SASL authentication bytes from the server, as defined by the SASL mechanism."; + static const Type Default; // = 0; + + static constexpr TKafkaVersions PresentVersions = {1, Max<TKafkaVersion>()}; + static constexpr TKafkaVersions TaggedVersions = VersionsNever; + static constexpr TKafkaVersions NullableVersions = VersionsNever; + static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; + }; + SessionLifetimeMsMeta::Type SessionLifetimeMs; + + i16 ApiKey() const override { return SASL_AUTHENTICATE; }; + i32 Size(TKafkaVersion version) const override; + void Read(TKafkaReadable& readable, TKafkaVersion version) override; + void Write(TKafkaWritable& writable, TKafkaVersion version) const override; + + bool operator==(const TSaslAuthenticateResponseData& other) const = default; +}; + } // namespace NKafka diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make index 03fab884a5..a9aba3a277 100644 --- a/ydb/core/kafka_proxy/ya.make +++ b/ydb/core/kafka_proxy/ya.make @@ -5,6 +5,7 @@ SRCS( actors/kafka_init_producer_id_actor.cpp actors/kafka_metadata_actor.cpp actors/kafka_produce_actor.cpp + actors/kafka_sasl_auth_actor.cpp kafka_connection.cpp kafka_connection.h kafka_listener.h |