diff options
author | tesseract <tesseract@yandex-team.com> | 2023-06-21 16:50:52 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-06-21 16:50:52 +0300 |
commit | 8039da344cd945e51a620e1c7b1c9810ab771c0b (patch) | |
tree | eb63c46d69d03a2b4c15f3facfe3fea9fc5acf2e | |
parent | 7a2f532d8834a9ca4331bed840a0a8c09624d33b (diff) | |
download | ydb-8039da344cd945e51a620e1c7b1c9810ab771c0b.tar.gz |
Listen to port for kafka protocol
61 files changed, 881 insertions, 150 deletions
diff --git a/ydb/core/CMakeLists.txt b/ydb/core/CMakeLists.txt index 7e953aaa5d..474cc8c050 100644 --- a/ydb/core/CMakeLists.txt +++ b/ydb/core/CMakeLists.txt @@ -32,7 +32,7 @@ add_subdirectory(grpc_streaming) add_subdirectory(health_check) add_subdirectory(http_proxy) add_subdirectory(io_formats) -add_subdirectory(kafkaproxy) +add_subdirectory(kafka_proxy) add_subdirectory(kesus) add_subdirectory(keyvalue) add_subdirectory(kqp) @@ -49,6 +49,7 @@ add_subdirectory(pgproxy) add_subdirectory(protos) add_subdirectory(public_http) add_subdirectory(quoter) +add_subdirectory(raw_socket) add_subdirectory(scheme) add_subdirectory(scheme_types) add_subdirectory(security) diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt index a08bd34a7b..fd8b8d879d 100644 --- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt @@ -65,6 +65,7 @@ target_link_libraries(run PUBLIC core-kesus-proxy core-kesus-tablet ydb-core-keyvalue + ydb-core-kafka_proxy ydb-core-kqp core-kqp-rm_service ydb-core-load_test diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt index 444d21918a..0f090796cf 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt @@ -66,6 +66,7 @@ target_link_libraries(run PUBLIC core-kesus-proxy core-kesus-tablet ydb-core-keyvalue + ydb-core-kafka_proxy ydb-core-kqp core-kqp-rm_service ydb-core-load_test diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt index 444d21918a..0f090796cf 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt @@ -66,6 +66,7 @@ target_link_libraries(run PUBLIC core-kesus-proxy core-kesus-tablet ydb-core-keyvalue + ydb-core-kafka_proxy ydb-core-kqp core-kqp-rm_service ydb-core-load_test diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt index a08bd34a7b..fd8b8d879d 100644 --- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt @@ -65,6 +65,7 @@ target_link_libraries(run PUBLIC core-kesus-proxy core-kesus-tablet ydb-core-keyvalue + ydb-core-kafka_proxy ydb-core-kqp core-kqp-rm_service ydb-core-load_test diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index c486031d78..10467bd824 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -72,6 +72,7 @@ union TBasicKikimrServicesMask { bool EnableExternalIndex: 1; bool EnableConveyor: 1; bool EnableLocalPgWire:1; + bool EnableKafkaProxy:1; }; ui64 Raw; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index f0ce5d1870..b5c2944af0 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -63,6 +63,8 @@ #include <ydb/core/health_check/health_check.h> +#include <ydb/core/kafka_proxy/kafka_proxy.h> + #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/proxy_service/kqp_proxy_service.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> @@ -2840,9 +2842,7 @@ void TLocalPgWireServiceInitializer::InitializeServices(NActors::TActorSystemSet ); NPG::TListenerSettings settings; - if (Config.GetLocalPgWireConfig().HasListeningPort()) { - settings.Port = Config.GetLocalPgWireConfig().GetListeningPort(); - } + settings.Port = Config.GetLocalPgWireConfig().GetListeningPort(); if (Config.GetLocalPgWireConfig().HasSslCertificate()) { settings.SslCertificatePem = Config.GetLocalPgWireConfig().GetSslCertificate(); } @@ -2854,5 +2854,26 @@ void TLocalPgWireServiceInitializer::InitializeServices(NActors::TActorSystemSet ); } +TKafkaProxyServiceInitializer::TKafkaProxyServiceInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) +{ +} + +void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + if (Config.GetKafkaProxyConfig().GetEnableKafkaProxy()) { + NKafka::TListenerSettings settings; + settings.Port = Config.GetKafkaProxyConfig().GetListeningPort(); + if (Config.GetKafkaProxyConfig().HasSslCertificate()) { + settings.SslCertificatePem = Config.GetKafkaProxyConfig().GetSslCertificate(); + } + + setup->LocalServices.emplace_back( + TActorId(), + TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings), + TMailboxType::HTSwap, appData->UserPoolId) + ); + } +} + } // namespace NKikimrServicesInitializers } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 20b109ae4c..cc6ed3a9cf 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -559,5 +559,12 @@ public: void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; }; +class TKafkaProxyServiceInitializer : public IKikimrServicesInitializer { +public: + TKafkaProxyServiceInitializer(const TKikimrRunConfig& runConfig); + + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + } // namespace NKikimrServicesInitializers } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index d8c5c4a25b..f150aeb4aa 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1592,6 +1592,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TLocalPgWireServiceInitializer(runConfig)); } + if (serviceMask.EnableKafkaProxy) { + sil->AddServiceInitializer(new TKafkaProxyServiceInitializer(runConfig)); + } + return sil; } diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index 48d0b30951..3be8ed95c3 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -80,6 +80,7 @@ PEERDIR( ydb/core/kesus/proxy ydb/core/kesus/tablet ydb/core/keyvalue + ydb/core/kafka_proxy ydb/core/kqp ydb/core/kqp/rm_service ydb/core/load_test diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..d7e93e12b0 --- /dev/null +++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-core-kafka_proxy) +target_link_libraries(ydb-core-kafka_proxy PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-protos + ydb-core-base + ydb-core-protos + ydb-core-raw_socket +) +target_sources(ydb-core-kafka_proxy PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp +) diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..e2dc3a0969 --- /dev/null +++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-core-kafka_proxy) +target_link_libraries(ydb-core-kafka_proxy PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-protos + ydb-core-base + ydb-core-protos + ydb-core-raw_socket +) +target_sources(ydb-core-kafka_proxy PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp +) diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..e2dc3a0969 --- /dev/null +++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-core-kafka_proxy) +target_link_libraries(ydb-core-kafka_proxy PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-protos + ydb-core-base + ydb-core-protos + ydb-core-raw_socket +) +target_sources(ydb-core-kafka_proxy PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp +) diff --git a/ydb/core/kafkaproxy/CMakeLists.txt b/ydb/core/kafka_proxy/CMakeLists.txt index f8b31df0c1..f8b31df0c1 100644 --- a/ydb/core/kafkaproxy/CMakeLists.txt +++ b/ydb/core/kafka_proxy/CMakeLists.txt diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..d7e93e12b0 --- /dev/null +++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(ut) + +add_library(ydb-core-kafka_proxy) +target_link_libraries(ydb-core-kafka_proxy PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-protos + ydb-core-base + ydb-core-protos + ydb-core-raw_socket +) +target_sources(ydb-core-kafka_proxy PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp +) diff --git a/ydb/core/kafkaproxy/kafka.h b/ydb/core/kafka_proxy/kafka.h index 8e4f5651b2..8e4f5651b2 100644 --- a/ydb/core/kafkaproxy/kafka.h +++ b/ydb/core/kafka_proxy/kafka.h diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp new file mode 100644 index 0000000000..f9934a1a8d --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -0,0 +1,427 @@ +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/core/raw_socket/sock_config.h> + +#include "kafka_connection.h" +#include "kafka_messages.h" +#include "kafka_log_impl.h" + +#include <strstream> +#include <sstream> +#include <iosfwd> + +namespace NKafka { + +using namespace NActors; + +char Hex(const unsigned char c) { + return c < 10 ? '0' + c : 'A' + c - 10; +} + +void Print(const TString& marker, TBuffer& buffer, ssize_t length) { + TStringBuilder sb; + for (ssize_t i = 0; i < length; ++i) { + char c = buffer.Data()[i]; + if (i > 0) { + sb << ", "; + } + sb << "0x" << Hex(c >> 4) << Hex(c & 0x0F); + } + KAFKA_LOG_T("Packet " << marker << ": " << sb); +} + +class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNetworkConfig { +public: + using TBase = TActorBootstrapped<TKafkaConnection>; + + // TODO check standard ip packet MTU. On my desktop it is 1500 on eth and wlp interfaces. It is 1300 on the tun interface. It + // is 1500 and 8950 on the dev server interfaces. + static constexpr size_t BufferSize = 8950; + static constexpr size_t MinDirectSize = 256; + + struct Msg { + size_t Size = 0; + TKafkaInt32 ExpectedSize = 0; + TRequestHeaderData Header; + std::unique_ptr<TMessage> Message; + }; + + static constexpr TDuration InactivityTimeout = TDuration::Minutes(10); + TEvPollerReady* InactivityEvent = nullptr; + TPollerToken::TPtr PollerToken; + + TIntrusivePtr<TSocketDescriptor> Socket; + TSocketAddressType Address; + THPTimer InactivityTimer; + + bool IsAuthRequired = true; + bool IsSslSupported = true; + + bool ConnectionEstablished = false; + bool CloseConnection = false; + + TBuffer Buffer; + size_t Length; + size_t Position; + + Msg Request; + bool HeaderSizeWasRead; + bool HeaderWasRead; + bool MessageSizeWasRead; + bool MessageWasRead; + std::unique_ptr<TReadContext> Ctx; + + TReadDemand Demand; + + TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) + : Socket(std::move(socket)) + , Address(address) + , Length(0) + , Position(0) + , HeaderSizeWasRead(false) + , HeaderWasRead(false) + , MessageSizeWasRead(false) + , MessageWasRead(false) + , Demand(NoDemand) { + SetNonBlock(); + IsSslSupported = IsSslSupported && Socket->IsSslSupported(); + Buffer.Resize(BufferSize); + } + + void Bootstrap() { + Become(&TKafkaConnection::StateAccepting); + Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false)); + KAFKA_LOG_D("incoming connection opened"); + OnAccept(); + } + + void PassAway() override { + if (ConnectionEstablished) { + ConnectionEstablished = false; + } + Shutdown(); + TBase::PassAway(); + } + +protected: + void SetNonBlock() noexcept { + Socket->SetNonBlock(); + } + + void Shutdown() { + if (Socket) { + Socket->Shutdown(); + } + } + + ssize_t SocketSend(const void* data, size_t size) { + return Socket->Send(data, size); + } + + ssize_t SocketReceive(void* data, size_t size) { + return Socket->Receive(data, size); + } + + void RequestPoller() { + Socket->RequestPoller(PollerToken); + } + + SOCKET GetRawSocket() const { + return Socket->GetRawSocket(); + } + + TString LogPrefix() const { + return TStringBuilder() << "(#" << GetRawSocket() << "," << Address->ToString() << ") "; + } + + void OnAccept() { + InactivityTimer.Reset(); + TBase::Become(&TKafkaConnection::StateConnected); + Send(SelfId(), new TEvPollerReady(nullptr, true, true)); + } + + void HandleAccepting(TEvPollerRegisterResult::TPtr ev) { + PollerToken = std::move(ev->Get()->PollerToken); + OnAccept(); + } + + void HandleAccepting(NActors::TEvPollerReady::TPtr) { + OnAccept(); + } + + STATEFN(StateAccepting) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPollerReady, HandleAccepting); + hFunc(TEvPollerRegisterResult, HandleAccepting); + } + } + + void HandleMessage(TRequestHeaderData* header, TApiVersionsRequestData* /*message*/) { + TApiVersionsResponseData response; + response.ApiKeys.resize(4); + + response.ApiKeys[0].ApiKey = PRODUCE; + response.ApiKeys[0].MinVersion = TProduceRequestData::MessageMeta::PresentVersionMin; + response.ApiKeys[0].MaxVersion = TProduceRequestData::MessageMeta::PresentVersionMax; + + response.ApiKeys[1].ApiKey = API_VERSIONS; + response.ApiKeys[1].MinVersion = TApiVersionsRequestData::MessageMeta::PresentVersionMin; + response.ApiKeys[1].MaxVersion = TApiVersionsRequestData::MessageMeta::PresentVersionMax; + + response.ApiKeys[2].ApiKey = METADATA; + response.ApiKeys[2].MinVersion = TMetadataRequestData::MessageMeta::PresentVersionMin; + response.ApiKeys[2].MaxVersion = TMetadataRequestData::MessageMeta::PresentVersionMax; + + response.ApiKeys[3].ApiKey = INIT_PRODUCER_ID; + response.ApiKeys[3].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersionMin; + response.ApiKeys[3].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersionMax; + + Reply(header, &response); + } + + void HandleMessage(TRequestHeaderData* header, TProduceRequestData* message) { + TProduceResponseData response; + response.Responses.resize(message->TopicData.size()); + int i = 0; + for (auto& data : message->TopicData) { + response.Responses[i].Name = data.Name; + response.Responses[i].PartitionResponses.resize(data.PartitionData.size()); + int j = 0; + for (auto& p : data.PartitionData) { + response.Responses[i].PartitionResponses[j].Index = p.Index; + response.Responses[i].PartitionResponses[j].BaseOffset = 40; + + ++j; + } + ++i; + } + + Reply(header, &response); + } + + void HandleMessage(TRequestHeaderData* header, TInitProducerIdRequestData* /*message*/) { + TInitProducerIdResponseData response; + response.ProducerEpoch = 1; + response.ProducerId = 1; + response.ErrorCode = 0; + response.ThrottleTimeMs = 0; + + Reply(header, &response); + } + + void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* /*message*/) { + TMetadataResponseData response; + response.ThrottleTimeMs = 0; + response.ClusterId = "cluster-ahjgk"; + response.ControllerId = 1; + + response.Brokers.resize(1); + response.Brokers[0].NodeId = 1; + response.Brokers[0].Host = "lbk-dev-02.search.yandex.net"; + response.Brokers[0].Port = 9092; + + response.Topics.resize(1); + response.Topics[0].TopicId = TKafkaUuid(0, 1); + response.Topics[0].Name = "topic-1"; + response.Topics[0].Partitions.resize(1); + response.Topics[0].Partitions[0].LeaderId = 1; // response.Brokers[0].NodeId + response.Topics[0].Partitions[0].ReplicaNodes.resize(1); + response.Topics[0].Partitions[0].ReplicaNodes[0] = 1; + response.Topics[0].Partitions[0].IsrNodes.resize(1); + response.Topics[0].Partitions[0].IsrNodes[0] = 1; + + Reply(header, &response); + } + + void ProcessRequest() { + KAFKA_LOG_D("process message: ApiKey=" << Request.Header.RequestApiKey << ", ExpectedSize=" << Request.ExpectedSize + << ", Size=" << Request.Size); + switch (Request.Header.RequestApiKey) { + case PRODUCE: + HandleMessage(&Request.Header, dynamic_cast<TProduceRequestData*>(Request.Message.get())); + return; + + case API_VERSIONS: + HandleMessage(&Request.Header, dynamic_cast<TApiVersionsRequestData*>(Request.Message.get())); + return; + + case INIT_PRODUCER_ID: + HandleMessage(&Request.Header, dynamic_cast<TInitProducerIdRequestData*>(Request.Message.get())); + return; + + case METADATA: + HandleMessage(&Request.Header, dynamic_cast<TMetadataRequestData*>(Request.Message.get())); + return; + + default: + KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request.Header.RequestApiKey); + } + } + + void Reply(TRequestHeaderData* header, TApiMessage* reply) { + // TODO improve allocation + TKafkaVersion headerVersion = ResponseHeaderVersion(header->RequestApiKey, header->RequestApiVersion); + TKafkaVersion version = header->RequestApiVersion; + + TResponseHeaderData responseHeader; + responseHeader.CorrelationId = header->CorrelationId; + + TKafkaInt32 size = responseHeader.Size(headerVersion) + reply->Size(version); + + std::stringstream sb; + TKafkaWritable writable(sb); + writable << size; + responseHeader.Write(writable, headerVersion); + reply->Write(writable, version); + + TBuffer b; + b.Reserve(size + sizeof(size)); + sb.read(b.data(), size + sizeof(size)); + + Print("sent", b, size + sizeof(size)); + + SocketSend(b.Data(), size + sizeof(size)); + } + + void DoRead() { + for (;;) { + while (Demand) { + ssize_t received = 0; + if (Position < Length) { + KAFKA_LOG_T("Read from buffer: Position=" << Position << ", Length=" << Length + << ", Demand=" << Demand.GetLength()); + received = std::min(Demand.Length, Length - Position); + if (!Demand.Skip()) { + memcpy(Demand.Buffer, Buffer.Data() + Position, received); + } + Position += received; + } else if (!Demand.Skip() && Demand.Length >= MinDirectSize) { + ssize_t res = SocketReceive(Demand.Buffer, Demand.GetLength()); + if (-res == EAGAIN || -res == EWOULDBLOCK) { + return; + } else if (-res == EINTR) { + continue; + } else if (!res) { + KAFKA_LOG_ERROR("connection closed"); + return PassAway(); + } else if (res < 0) { + KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res)); + return PassAway(); + } + received = res; + if (!received) { + return; + } + } else { + Position = 0; + Length = 0; + ssize_t res = SocketReceive(Buffer.Data(), BufferSize); + if (-res == EAGAIN || -res == EWOULDBLOCK) { + return; + } else if (-res == EINTR) { + continue; + } else if (!res) { + KAFKA_LOG_ERROR("connection closed"); + return PassAway(); + } else if (res < 0) { + KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res)); + return PassAway(); + } + Length = res; + Print("received", Buffer, Length); + if (!Length) { + return; + } + + continue; + } + + Request.Size += received; + Demand.Buffer += received; + Demand.Length -= received; + } + if (Ctx) { + Demand = Ctx->Next(); + } + if (!Demand) { + if (MessageWasRead) { + HeaderSizeWasRead = false; + MessageWasRead = false; + HeaderWasRead = false; + + ProcessRequest(); + + Request = Msg(); + Ctx = nullptr; + } + if (!HeaderSizeWasRead) { + Demand = TReadDemand((char*)&(Request.ExpectedSize), sizeof(Request.ExpectedSize)); + HeaderSizeWasRead = true; + Ctx = nullptr; + } else if (!HeaderWasRead) { + NPrivate::NormalizeNumber(Request.ExpectedSize); + + KAFKA_LOG_T("start read new message. ExpectedSize=" << Request.ExpectedSize); + + Ctx = Request.Header.CreateReadContext(2); + + HeaderWasRead = true; + } else { + KAFKA_LOG_T("received header. ApiKey=" << Request.Header.RequestApiKey + << ", Version=" << Request.Header.RequestApiVersion); + + i16 apiKey = Request.Header.RequestApiKey; + TKafkaVersion version = Request.Header.RequestApiVersion; + + Request.Message = CreateRequest(apiKey); + Ctx = Request.Message->CreateReadContext(version); + + MessageWasRead = true; + } + } + } + } + + void HandleConnected(TEvPollerReady::TPtr event) { + if (event->Get()->Read) { + DoRead(); + + if (event->Get() == InactivityEvent) { + const TDuration passed = TDuration::Seconds(std::abs(InactivityTimer.Passed())); + if (passed >= InactivityTimeout) { + KAFKA_LOG_D("connection closed by inactivity timeout"); + return PassAway(); // timeout + } else { + Schedule(InactivityTimeout - passed, InactivityEvent = new TEvPollerReady(nullptr, false, false)); + } + } + } + if (event->Get()->Write) { + if (!FlushOutput()) { + return; + } + } + RequestPoller(); + } + + bool FlushOutput() { + return true; // TODO + } + + void HandleConnected(TEvPollerRegisterResult::TPtr ev) { + PollerToken = std::move(ev->Get()->PollerToken); + PollerToken->Request(true, true); + } + + STATEFN(StateConnected) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPollerReady, HandleConnected); + hFunc(TEvPollerRegisterResult, HandleConnected); + } + } +}; + +NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { + return new TKafkaConnection(std::move(socket), std::move(address)); +} + +} // namespace NKafka
\ No newline at end of file diff --git a/ydb/core/kafka_proxy/kafka_connection.h b/ydb/core/kafka_proxy/kafka_connection.h new file mode 100644 index 0000000000..e23cf9995f --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_connection.h @@ -0,0 +1,13 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <ydb/core/raw_socket/sock_config.h> +#include <ydb/core/raw_socket/sock_impl.h> + +namespace NKafka { + +using namespace NKikimr::NRawSocket; + +NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address); + +} diff --git a/ydb/core/kafka_proxy/kafka_listener.h b/ydb/core/kafka_proxy/kafka_listener.h new file mode 100644 index 0000000000..a5a36cfc3c --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_listener.h @@ -0,0 +1,19 @@ +#pragma once + +#include <ydb/core/raw_socket/sock_listener.h> +#include "kafka_connection.h" + +namespace NKafka { + +using namespace NKikimr::NRawSocket; + +inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings = {.Port = 9092}) { + return CreateSocketListener( + poller, settings, + [](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { + return CreateKafkaConnection(socket, address); + }, + NKikimrServices::EServiceKikimr::KAFKA_PROXY); +} + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_log.h b/ydb/core/kafka_proxy/kafka_log.h new file mode 100644 index 0000000000..8547259b36 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_log.h @@ -0,0 +1,11 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/log.h> +#include <ydb/core/protos/services.pb.h> + +namespace NKafka { + +inline TString LogPrefix() { return {}; } + +} diff --git a/ydb/core/kafka_proxy/kafka_log_impl.h b/ydb/core/kafka_proxy/kafka_log_impl.h new file mode 100644 index 0000000000..2aa54cd111 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_log_impl.h @@ -0,0 +1,11 @@ +#pragma once + +#include "kafka_log.h" + +#define KAFKA_LOG_T(stream) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream) +#define KAFKA_LOG_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream) +#define KAFKA_LOG_I(stream) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream) +#define KAFKA_LOG_W(stream) LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream) +#define KAFKA_LOG_NOTICE(stream) LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream) +#define KAFKA_LOG_ERROR(stream) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream) + diff --git a/ydb/core/kafkaproxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp index 7c45f6719c..7c45f6719c 100644 --- a/ydb/core/kafkaproxy/kafka_messages.cpp +++ b/ydb/core/kafka_proxy/kafka_messages.cpp diff --git a/ydb/core/kafkaproxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h index c070177e7f..c070177e7f 100644 --- a/ydb/core/kafkaproxy/kafka_messages.h +++ b/ydb/core/kafka_proxy/kafka_messages.h diff --git a/ydb/core/kafkaproxy/kafka_messages_int.cpp b/ydb/core/kafka_proxy/kafka_messages_int.cpp index 9604fdaa76..9604fdaa76 100644 --- a/ydb/core/kafkaproxy/kafka_messages_int.cpp +++ b/ydb/core/kafka_proxy/kafka_messages_int.cpp diff --git a/ydb/core/kafkaproxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h index 077d8b46b0..3f02644272 100644 --- a/ydb/core/kafkaproxy/kafka_messages_int.h +++ b/ydb/core/kafka_proxy/kafka_messages_int.h @@ -14,6 +14,8 @@ namespace NKafka { namespace NPrivate { +static constexpr TKafkaInt32 MAX_RECORDS_SIZE = 1 << 28; // 256Mb + struct TKafkaBoolDesc { static constexpr bool Default = true; static constexpr bool Nullable = false; @@ -484,12 +486,14 @@ public: } else { ythrow yexception() << "non-nullable field " << Meta::Name << " was serialized as null"; } + } else if (length > MAX_RECORDS_SIZE) { + ythrow yexception() << "records fields " << Meta::Name << " has invalid length " << length; } value = TKafkaRawBytes(); value->Resize(length); return TReadDemand(value->data(), length); - } + } private: bool WasRead; diff --git a/ydb/core/kafka_proxy/kafka_proxy.h b/ydb/core/kafka_proxy/kafka_proxy.h new file mode 100644 index 0000000000..d1a5213373 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_proxy.h @@ -0,0 +1,3 @@ +#pragma once + +#include "kafka_listener.h" diff --git a/ydb/core/kafkaproxy/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt index 5a13fbe24f..b20b2c0a03 100644 --- a/ydb/core/kafkaproxy/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt @@ -7,47 +7,47 @@ -add_executable(ydb-core-kafkaproxy-ut) -target_link_libraries(ydb-core-kafkaproxy-ut PUBLIC +add_executable(ydb-core-kafka_proxy-ut) +target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC contrib-libs-cxxsupp yutil library-cpp-cpuid_check cpp-testing-gtest cpp-testing-gtest_main - ydb-core-kafkaproxy + ydb-core-kafka_proxy ) -target_link_options(ydb-core-kafkaproxy-ut PRIVATE +target_link_options(ydb-core-kafka_proxy-ut PRIVATE -Wl,-platform_version,macos,11.0,11.0 -fPIC -fPIC -framework CoreFoundation ) -target_sources(ydb-core-kafkaproxy-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_serialization.cpp +target_sources(ydb-core-kafka_proxy-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp ) add_test( NAME - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut COMMAND - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut ) set_property( TEST - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut PROPERTY LABELS SMALL ) set_property( TEST - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut PROPERTY PROCESSORS 1 ) -target_allocator(ydb-core-kafkaproxy-ut +target_allocator(ydb-core-kafka_proxy-ut system_allocator ) -vcs_info(ydb-core-kafkaproxy-ut) +vcs_info(ydb-core-kafka_proxy-ut) diff --git a/ydb/core/kafkaproxy/ut/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt index c03a1c10ed..38872fb558 100644 --- a/ydb/core/kafkaproxy/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt @@ -7,16 +7,16 @@ -add_executable(ydb-core-kafkaproxy-ut) -target_link_libraries(ydb-core-kafkaproxy-ut PUBLIC +add_executable(ydb-core-kafka_proxy-ut) +target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil cpp-testing-gtest cpp-testing-gtest_main - ydb-core-kafkaproxy + ydb-core-kafka_proxy ) -target_link_options(ydb-core-kafkaproxy-ut PRIVATE +target_link_options(ydb-core-kafka_proxy-ut PRIVATE -ldl -lrt -Wl,--no-as-needed @@ -26,31 +26,31 @@ target_link_options(ydb-core-kafkaproxy-ut PRIVATE -lrt -ldl ) -target_sources(ydb-core-kafkaproxy-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_serialization.cpp +target_sources(ydb-core-kafka_proxy-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp ) add_test( NAME - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut COMMAND - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut ) set_property( TEST - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut PROPERTY LABELS SMALL ) set_property( TEST - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut PROPERTY PROCESSORS 1 ) -target_allocator(ydb-core-kafkaproxy-ut +target_allocator(ydb-core-kafka_proxy-ut cpp-malloc-jemalloc ) -vcs_info(ydb-core-kafkaproxy-ut) +vcs_info(ydb-core-kafka_proxy-ut) diff --git a/ydb/core/kafkaproxy/ut/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt index 9089cb7458..a75ef634f8 100644 --- a/ydb/core/kafkaproxy/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt @@ -7,17 +7,17 @@ -add_executable(ydb-core-kafkaproxy-ut) -target_link_libraries(ydb-core-kafkaproxy-ut PUBLIC +add_executable(ydb-core-kafka_proxy-ut) +target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil library-cpp-cpuid_check cpp-testing-gtest cpp-testing-gtest_main - ydb-core-kafkaproxy + ydb-core-kafka_proxy ) -target_link_options(ydb-core-kafkaproxy-ut PRIVATE +target_link_options(ydb-core-kafka_proxy-ut PRIVATE -ldl -lrt -Wl,--no-as-needed @@ -27,32 +27,32 @@ target_link_options(ydb-core-kafkaproxy-ut PRIVATE -lrt -ldl ) -target_sources(ydb-core-kafkaproxy-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_serialization.cpp +target_sources(ydb-core-kafka_proxy-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp ) add_test( NAME - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut COMMAND - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut ) set_property( TEST - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut PROPERTY LABELS SMALL ) set_property( TEST - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut PROPERTY PROCESSORS 1 ) -target_allocator(ydb-core-kafkaproxy-ut +target_allocator(ydb-core-kafka_proxy-ut cpp-malloc-tcmalloc libs-tcmalloc-no_percpu_cache ) -vcs_info(ydb-core-kafkaproxy-ut) +vcs_info(ydb-core-kafka_proxy-ut) diff --git a/ydb/core/kafkaproxy/ut/CMakeLists.txt b/ydb/core/kafka_proxy/ut/CMakeLists.txt index f8b31df0c1..f8b31df0c1 100644 --- a/ydb/core/kafkaproxy/ut/CMakeLists.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.txt diff --git a/ydb/core/kafkaproxy/ut/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt index 1b948214ae..5bb08d1134 100644 --- a/ydb/core/kafkaproxy/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt @@ -7,40 +7,40 @@ -add_executable(ydb-core-kafkaproxy-ut) -target_link_libraries(ydb-core-kafkaproxy-ut PUBLIC +add_executable(ydb-core-kafka_proxy-ut) +target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC contrib-libs-cxxsupp yutil library-cpp-cpuid_check cpp-testing-gtest cpp-testing-gtest_main - ydb-core-kafkaproxy + ydb-core-kafka_proxy ) -target_sources(ydb-core-kafkaproxy-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_serialization.cpp +target_sources(ydb-core-kafka_proxy-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp ) add_test( NAME - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut COMMAND - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut ) set_property( TEST - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut PROPERTY LABELS SMALL ) set_property( TEST - ydb-core-kafkaproxy-ut + ydb-core-kafka_proxy-ut PROPERTY PROCESSORS 1 ) -target_allocator(ydb-core-kafkaproxy-ut +target_allocator(ydb-core-kafka_proxy-ut system_allocator ) -vcs_info(ydb-core-kafkaproxy-ut) +vcs_info(ydb-core-kafka_proxy-ut) diff --git a/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp b/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp index c66ba20a03..c66ba20a03 100644 --- a/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp +++ b/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp diff --git a/ydb/core/kafkaproxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp index 3bca84033b..ed8185637a 100644 --- a/ydb/core/kafkaproxy/ut/ut_serialization.cpp +++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp @@ -585,7 +585,21 @@ TEST(Serialization, TKafkaFloat64_PresentVersion_NotTaggedVersion) { } } +TEST(Serialization, RequestHeader_reference) { + ui8 reference[] = {0x00, 0x12, 0x00, 0x00, 0x7F, 0x6F, 0x6F, 0x68, 0x00, 0x0A, 0x70, 0x72, 0x6F, 0x64, 0x75, 0x63, + 0x65, 0x72, 0x2D, 0x31}; + std::stringstream sb; + sb.write((char*)reference, sizeof(reference)); + + TReadProcessor processor(sb); + TRequestHeaderData result; + processor.Read(&result, 1); + + EXPECT_EQ(result.RequestApiKey, 0x12); + EXPECT_EQ(result.RequestApiVersion, 0x00); + EXPECT_EQ(result.ClientId, "producer-1"); +} TEST(Serialization, ProduceRequestData_reference) { // original kafka serialized value (java implementation) diff --git a/ydb/core/kafkaproxy/ut/ya.make b/ydb/core/kafka_proxy/ut/ya.make index 1a384fdd1d..4626bbd0dc 100644 --- a/ydb/core/kafkaproxy/ut/ya.make +++ b/ydb/core/kafka_proxy/ut/ya.make @@ -6,7 +6,7 @@ SRCS( ) PEERDIR( - ydb/core/kafkaproxy + ydb/core/kafka_proxy ) END() diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make new file mode 100644 index 0000000000..b99a60dd3d --- /dev/null +++ b/ydb/core/kafka_proxy/ya.make @@ -0,0 +1,29 @@ +LIBRARY() + +SRCS( + kafka_connection.cpp + kafka_connection.h + kafka_listener.h + kafka.h + kafka_log.h + kafka_log_impl.h + kafka_messages.cpp + kafka_messages.h + kafka_messages_int.cpp + kafka_messages_int.h + kafka_proxy.h +) + +PEERDIR( + library/cpp/actors/core + library/cpp/actors/protos + ydb/core/base + ydb/core/protos + ydb/core/raw_socket +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt b/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt index 7e189a4cbf..15fa663dc7 100644 --- a/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt @@ -16,10 +16,10 @@ target_link_libraries(ydb-core-pgproxy PUBLIC cpp-actors-protos ydb-core-base ydb-core-protos + ydb-core-raw_socket ) target_sources(ydb-core-pgproxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_connection.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_listener.cpp ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy.cpp ) diff --git a/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt b/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt index 8d5de35255..21e428babe 100644 --- a/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt @@ -17,10 +17,10 @@ target_link_libraries(ydb-core-pgproxy PUBLIC cpp-actors-protos ydb-core-base ydb-core-protos + ydb-core-raw_socket ) target_sources(ydb-core-pgproxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_connection.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_listener.cpp ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy.cpp ) diff --git a/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt b/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt index 8d5de35255..21e428babe 100644 --- a/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt @@ -17,10 +17,10 @@ target_link_libraries(ydb-core-pgproxy PUBLIC cpp-actors-protos ydb-core-base ydb-core-protos + ydb-core-raw_socket ) target_sources(ydb-core-pgproxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_connection.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_listener.cpp ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy.cpp ) diff --git a/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt b/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt index 7e189a4cbf..15fa663dc7 100644 --- a/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt @@ -16,10 +16,10 @@ target_link_libraries(ydb-core-pgproxy PUBLIC cpp-actors-protos ydb-core-base ydb-core-protos + ydb-core-raw_socket ) target_sources(ydb-core-pgproxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_connection.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_listener.cpp ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy_types.cpp ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy.cpp ) diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index 323858c628..e9529d2389 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -1,10 +1,10 @@ #include "pg_connection.h" -#include "pg_proxy_config.h" #include "pg_proxy_types.h" #include "pg_proxy_events.h" #include "pg_stream.h" #include "pg_log_impl.h" #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/core/raw_socket/sock_config.h> namespace NPG { diff --git a/ydb/core/pgproxy/pg_connection.h b/ydb/core/pgproxy/pg_connection.h index ee41c18ea8..a9448b4a88 100644 --- a/ydb/core/pgproxy/pg_connection.h +++ b/ydb/core/pgproxy/pg_connection.h @@ -1,11 +1,14 @@ #pragma once #include <library/cpp/actors/core/actor.h> -#include "pg_proxy_config.h" -#include "pg_proxy_impl.h" +#include <ydb/core/raw_socket/sock_config.h> +#include <ydb/core/raw_socket/sock_impl.h> namespace NPG { -NActors::IActor* CreatePGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const NActors::TActorId& databaseProxy); +using namespace NKikimr::NRawSocket; -} +NActors::IActor* CreatePGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, + const NActors::TActorId& databaseProxy); + +} // namespace NPG diff --git a/ydb/core/pgproxy/pg_listener.h b/ydb/core/pgproxy/pg_listener.h index e5c578414f..31488c0dd1 100644 --- a/ydb/core/pgproxy/pg_listener.h +++ b/ydb/core/pgproxy/pg_listener.h @@ -1,19 +1,20 @@ #pragma once -#include <library/cpp/actors/core/actor.h> -#include "pg_proxy_config.h" -#include "pg_proxy_impl.h" +#include <ydb/core/raw_socket/sock_listener.h> +#include "pg_connection.h" namespace NPG { -struct TListenerSettings { - uint16_t Port = 5432; - TString CertificateFile; - TString PrivateKeyFile; - TString SslCertificatePem; -}; - -NActors::IActor* CreatePGListener(const NActors::TActorId& poller, const NActors::TActorId& databaseProxy, const TListenerSettings& settings = {}); +using namespace NKikimr::NRawSocket; +inline NActors::IActor* CreatePGListener(const NActors::TActorId& poller, const NActors::TActorId databaseProxy, + const TListenerSettings& settings = {.Port = 5432}) { + return CreateSocketListener( + poller, settings, + [=](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { + return CreatePGConnection(socket, address, databaseProxy); + }, + NKikimrServices::EServiceKikimr::PGWIRE); } +} // namespace NPG diff --git a/ydb/core/pgproxy/pg_proxy.cpp b/ydb/core/pgproxy/pg_proxy.cpp index 242dab9269..9376f65af8 100644 --- a/ydb/core/pgproxy/pg_proxy.cpp +++ b/ydb/core/pgproxy/pg_proxy.cpp @@ -1,5 +1,6 @@ #include "pg_proxy.h" +#include "pg_connection.h" #include "pg_listener.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/interconnect/poller_actor.h> @@ -7,6 +8,7 @@ namespace NPG { using namespace NActors; +using namespace NKikimr::NRawSocket; class TPGProxy : public TActorBootstrapped<TPGProxy> { public: @@ -29,9 +31,8 @@ public: TActorId Listener; }; - NActors::IActor* CreatePGProxy() { return new TPGProxy(); } -}
\ No newline at end of file +} // namespace NPG diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h index 32ad9662db..acb31d0d0d 100644 --- a/ydb/core/pgproxy/pg_proxy_events.h +++ b/ydb/core/pgproxy/pg_proxy_events.h @@ -2,11 +2,13 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/event_local.h> -#include "pg_proxy_config.h" +#include <ydb/core/raw_socket/sock_config.h> #include "pg_proxy_types.h" namespace NPG { +using namespace NKikimr::NRawSocket; + struct TEvPGEvents { enum EEv { EvConnectionOpened = EventSpaceBegin(NActors::TEvents::ES_PGWIRE), diff --git a/ydb/core/pgproxy/pg_proxy_ut.cpp b/ydb/core/pgproxy/pg_proxy_ut.cpp index 0387a1719a..5d92b948d2 100644 --- a/ydb/core/pgproxy/pg_proxy_ut.cpp +++ b/ydb/core/pgproxy/pg_proxy_ut.cpp @@ -5,7 +5,6 @@ #include <library/cpp/actors/testlib/test_runtime.h> #include <ydb/core/pgproxy/pg_proxy.h> -#include <ydb/core/pgproxy/pg_listener.h> #include <ydb/core/pgproxy/pg_log.h> #include <ydb/core/pgproxy/pg_proxy_events.h> #include <ydb/core/protos/services.pb.h> @@ -13,12 +12,16 @@ #include <util/network/socket.h> #include <util/string/hex.h> +#include "pg_listener.h" + #ifdef NDEBUG #define Ctest Cnull #else #define Ctest Cerr #endif +using namespace NKikimr::NRawSocket; + class TTestActorRuntime : public NActors::TTestActorRuntimeBase { public: void InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) override { diff --git a/ydb/core/pgproxy/ya.make b/ydb/core/pgproxy/ya.make index e84082c662..50492de467 100644 --- a/ydb/core/pgproxy/ya.make +++ b/ydb/core/pgproxy/ya.make @@ -3,19 +3,14 @@ LIBRARY() SRCS( pg_connection.cpp pg_connection.h - pg_listener.cpp pg_listener.h pg_log_impl.h pg_log.h - pg_proxy_config.h pg_proxy_events.h - pg_proxy_impl.h - pg_proxy_ssl.h pg_proxy_types.cpp pg_proxy_types.h pg_proxy.cpp pg_proxy.h - pg_sock64.h pg_stream.h ) @@ -24,6 +19,7 @@ PEERDIR( library/cpp/actors/protos ydb/core/base ydb/core/protos + ydb/core/raw_socket ) END() diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index fcd260195f..e09a4bd41b 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1788,6 +1788,13 @@ message TLocalPgWireConfig { optional string SslCertificate = 5; } +message TKafkaProxyConfig { + optional bool EnableKafkaProxy = 1 [default = true]; + + optional int32 ListeningPort = 2 [default = 9092]; + optional string SslCertificate = 3; +} + message TAwsCompatibilityConfig { optional string AwsRegion = 1; } @@ -1861,6 +1868,7 @@ message TAppConfig { optional TColumnShardConfig ColumnShardConfig = 66; optional TLocalPgWireConfig LocalPgWireConfig = 69; optional TAwsCompatibilityConfig AwsCompatibilityConfig = 70; + optional TKafkaProxyConfig KafkaProxyConfig = 71; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 241d0d5091..56f35e787b 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -369,6 +369,8 @@ enum EServiceKikimr { EXT_INDEX = 1900; TX_CONVEYOR = 2000; ARROW_HELPER = 2100; + + KAFKA_PROXY = 2200; }; message TActivity { diff --git a/ydb/core/kafkaproxy/CMakeLists.darwin-x86_64.txt b/ydb/core/raw_socket/CMakeLists.darwin-x86_64.txt index 3e701fd246..f01952a82b 100644 --- a/ydb/core/kafkaproxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/raw_socket/CMakeLists.darwin-x86_64.txt @@ -6,10 +6,9 @@ # original buildsystem will not be accepted. -add_subdirectory(ut) -add_library(ydb-core-kafkaproxy) -target_link_libraries(ydb-core-kafkaproxy PUBLIC +add_library(ydb-core-raw_socket) +target_link_libraries(ydb-core-raw_socket PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core @@ -17,7 +16,6 @@ target_link_libraries(ydb-core-kafkaproxy PUBLIC ydb-core-base ydb-core-protos ) -target_sources(ydb-core-kafkaproxy PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages_int.cpp +target_sources(ydb-core-raw_socket PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/raw_socket/sock_listener.cpp ) diff --git a/ydb/core/kafkaproxy/CMakeLists.linux-aarch64.txt b/ydb/core/raw_socket/CMakeLists.linux-aarch64.txt index 59a73cf06e..7ddee08602 100644 --- a/ydb/core/kafkaproxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/raw_socket/CMakeLists.linux-aarch64.txt @@ -6,10 +6,9 @@ # original buildsystem will not be accepted. -add_subdirectory(ut) -add_library(ydb-core-kafkaproxy) -target_link_libraries(ydb-core-kafkaproxy PUBLIC +add_library(ydb-core-raw_socket) +target_link_libraries(ydb-core-raw_socket PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil @@ -18,7 +17,6 @@ target_link_libraries(ydb-core-kafkaproxy PUBLIC ydb-core-base ydb-core-protos ) -target_sources(ydb-core-kafkaproxy PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages_int.cpp +target_sources(ydb-core-raw_socket PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/raw_socket/sock_listener.cpp ) diff --git a/ydb/core/kafkaproxy/CMakeLists.linux-x86_64.txt b/ydb/core/raw_socket/CMakeLists.linux-x86_64.txt index 59a73cf06e..7ddee08602 100644 --- a/ydb/core/kafkaproxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/raw_socket/CMakeLists.linux-x86_64.txt @@ -6,10 +6,9 @@ # original buildsystem will not be accepted. -add_subdirectory(ut) -add_library(ydb-core-kafkaproxy) -target_link_libraries(ydb-core-kafkaproxy PUBLIC +add_library(ydb-core-raw_socket) +target_link_libraries(ydb-core-raw_socket PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil @@ -18,7 +17,6 @@ target_link_libraries(ydb-core-kafkaproxy PUBLIC ydb-core-base ydb-core-protos ) -target_sources(ydb-core-kafkaproxy PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages_int.cpp +target_sources(ydb-core-raw_socket PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/raw_socket/sock_listener.cpp ) diff --git a/ydb/core/raw_socket/CMakeLists.txt b/ydb/core/raw_socket/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/raw_socket/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/kafkaproxy/CMakeLists.windows-x86_64.txt b/ydb/core/raw_socket/CMakeLists.windows-x86_64.txt index 3e701fd246..f01952a82b 100644 --- a/ydb/core/kafkaproxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/raw_socket/CMakeLists.windows-x86_64.txt @@ -6,10 +6,9 @@ # original buildsystem will not be accepted. -add_subdirectory(ut) -add_library(ydb-core-kafkaproxy) -target_link_libraries(ydb-core-kafkaproxy PUBLIC +add_library(ydb-core-raw_socket) +target_link_libraries(ydb-core-raw_socket PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core @@ -17,7 +16,6 @@ target_link_libraries(ydb-core-kafkaproxy PUBLIC ydb-core-base ydb-core-protos ) -target_sources(ydb-core-kafkaproxy PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages_int.cpp +target_sources(ydb-core-raw_socket PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/raw_socket/sock_listener.cpp ) diff --git a/ydb/core/pgproxy/pg_sock64.h b/ydb/core/raw_socket/sock64.h index da12e4d0fd..db05a6ab15 100644 --- a/ydb/core/pgproxy/pg_sock64.h +++ b/ydb/core/raw_socket/sock64.h @@ -3,9 +3,9 @@ #include <optional> #include <util/network/sock.h> #include <library/cpp/actors/interconnect/poller_actor.h> -#include "pg_proxy_ssl.h" +#include "sock_ssl.h" -namespace NPG { +namespace NKikimr::NRawSocket { class TInet64StreamSocket : public TStreamSocket { using TBase = TStreamSocket; @@ -194,4 +194,4 @@ public: } }; -} +} // namespace NKikimr::NRawSocket diff --git a/ydb/core/pgproxy/pg_proxy_config.h b/ydb/core/raw_socket/sock_config.h index 364ea7c832..011049d03a 100644 --- a/ydb/core/pgproxy/pg_proxy_config.h +++ b/ydb/core/raw_socket/sock_config.h @@ -3,21 +3,23 @@ #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/protos/services_common.pb.h> -#include "pg_sock64.h" -namespace NPG { +#include "sock64.h" + +namespace NKikimr::NRawSocket { struct TNetworkConfig { static constexpr size_t BUFFER_SIZE = 1 * 1024; static constexpr int LISTEN_QUEUE = 10; static constexpr TDuration SOCKET_TIMEOUT = TDuration::MilliSeconds(60000); static constexpr TDuration CONNECTION_TIMEOUT = TDuration::MilliSeconds(60000); + using TSocketType = TInet64StreamSocket; using TSecureSocketType = TInet64SecureStreamSocket; using TSocketAddressType = std::shared_ptr<ISockAddr>; }; -} +} // namespace NKikimr::NRawSocket inline IOutputStream& operator <<(IOutputStream& out, const std::shared_ptr<ISockAddr>& addr) { return out << addr->ToString(); diff --git a/ydb/core/pgproxy/pg_proxy_impl.h b/ydb/core/raw_socket/sock_impl.h index 60bd67ddef..632acaa1cf 100644 --- a/ydb/core/pgproxy/pg_proxy_impl.h +++ b/ydb/core/raw_socket/sock_impl.h @@ -1,11 +1,11 @@ #pragma once #include <library/cpp/actors/interconnect/poller_actor.h> -#include "pg_proxy_config.h" -#include "pg_sock64.h" -#include "pg_proxy_ssl.h" +#include "sock_config.h" +#include "sock64.h" +#include "sock_ssl.h" -namespace NPG { +namespace NKikimr::NRawSocket { struct TEndpointInfo { TSslHelpers::TSslHolder<SSL_CTX> SecureContext; @@ -98,4 +98,4 @@ public: } }; -} +} // namespace NKikimr::NRawSocket diff --git a/ydb/core/pgproxy/pg_listener.cpp b/ydb/core/raw_socket/sock_listener.cpp index da74ad095e..9a443ce021 100644 --- a/ydb/core/pgproxy/pg_listener.cpp +++ b/ydb/core/raw_socket/sock_listener.cpp @@ -3,34 +3,37 @@ #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/interconnect/poller_actor.h> #include <util/network/sock.h> -#include "pg_proxy.h" -#include "pg_listener.h" -#include "pg_proxy_impl.h" -#include "pg_proxy_config.h" -#include "pg_sock64.h" -#include "pg_connection.h" -#include "pg_log_impl.h" +#include <ydb/core/protos/services.pb.h> -namespace NPG { +#include "sock_listener.h" +#include "sock_config.h" +#include "sock64.h" + +namespace NKikimr::NRawSocket { using namespace NActors; -class TPGListener : public TActorBootstrapped<TPGListener>, TNetworkConfig { +class TSocketListener: public TActorBootstrapped<TSocketListener>, TNetworkConfig { public: - using TBase = NActors::TActor<TPGListener>; - using TThis = TPGListener; + using TBase = TActor<TSocketListener>; + using TThis = TSocketListener; + TActorId Poller; - TActorId DatabaseProxy; TListenerSettings Settings; + TConnectionCreator ConnectionCreator; + NKikimrServices::EServiceKikimr Service; + TIntrusivePtr<TSocketDescriptor> Socket; - NActors::TPollerToken::TPtr PollerToken; + TPollerToken::TPtr PollerToken; THashSet<TActorId> Connections; - TPGListener(const TActorId& poller, const TActorId& databaseProxy, const TListenerSettings& settings) + TSocketListener(const TActorId& poller, const TListenerSettings& settings, const TConnectionCreator& connectionCreator, + NKikimrServices::EServiceKikimr service) : Poller(poller) - , DatabaseProxy(databaseProxy) , Settings(settings) - {} + , ConnectionCreator(connectionCreator) + , Service(service) { + } STATEFN(StateWorking) { switch (ev->GetTypeRewrite()) { @@ -54,14 +57,15 @@ public: err = Socket->Listen(LISTEN_QUEUE); if (err == 0) { - BLOG_D("Listening on " << bindAddress->ToString() << (endpoint->SecureContext ? " (ssl)" : "")); + LOG_INFO_S(*NActors::TlsActivationContext, Service, + "Listening on " << bindAddress->ToString() << (endpoint->SecureContext ? " (ssl)" : "")); Socket->SetNonBlock(); Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId())); Become(&TThis::StateWorking); return; } } - BLOG_ERROR("Failed to listen on " << bindAddress->ToString()); + LOG_ERROR_S(*NActors::TlsActivationContext, Service, "Failed to listen on " << bindAddress->ToString()); //abort(); PassAway(); } @@ -84,7 +88,7 @@ public: if (!socket) { break; } - NActors::IActor* connectionSocket = CreatePGConnection(socket, addr, DatabaseProxy); + NActors::IActor* connectionSocket = ConnectionCreator(socket, addr); NActors::TActorId connectionId = Register(connectionSocket); Send(Poller, new TEvPollerRegister(socket, connectionId, connectionId)); Connections.emplace(connectionId); @@ -97,9 +101,9 @@ public: } }; - -NActors::IActor* CreatePGListener(const TActorId& poller, const TActorId& databaseProxy, const TListenerSettings& settings) { - return new TPGListener(poller, databaseProxy, settings); +NActors::IActor* CreateSocketListener(const NActors::TActorId& poller, const TListenerSettings& settings, + TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service) { + return new TSocketListener(poller, settings, connectionCreator, service); } -} +} // namespace NKikimr::NRawSocket diff --git a/ydb/core/raw_socket/sock_listener.h b/ydb/core/raw_socket/sock_listener.h new file mode 100644 index 0000000000..7f8d5b464b --- /dev/null +++ b/ydb/core/raw_socket/sock_listener.h @@ -0,0 +1,23 @@ +#pragma once + +#include <library/cpp/actors/core/actor.h> +#include <ydb/core/protos/services.pb.h> + +#include "sock_impl.h" +#include "sock_config.h" + +namespace NKikimr::NRawSocket { + +struct TListenerSettings { + uint16_t Port; + TString CertificateFile; + TString PrivateKeyFile; + TString SslCertificatePem; +}; + +using TConnectionCreator = std::function<NActors::IActor* (TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address)>; + +NActors::IActor* CreateSocketListener(const NActors::TActorId& poller, const TListenerSettings& settings, + TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service); + +} // namespace NKikimr::NRawSocket diff --git a/ydb/core/pgproxy/pg_proxy_ssl.h b/ydb/core/raw_socket/sock_ssl.h index 3b043f6917..64612ccedd 100644 --- a/ydb/core/pgproxy/pg_proxy_ssl.h +++ b/ydb/core/raw_socket/sock_ssl.h @@ -7,7 +7,7 @@ #include <memory> #include <util/generic/string.h> -namespace NPG { +namespace NKikimr::NRawSocket { struct TSslHelpers { struct TSslDeleter { @@ -210,4 +210,4 @@ struct TSslLayer : TSslHelpers { } }; -} +} // namespace NKikimr::NRawSocket diff --git a/ydb/core/kafkaproxy/ya.make b/ydb/core/raw_socket/ya.make index fb108fb19a..c51a35deff 100644 --- a/ydb/core/kafkaproxy/ya.make +++ b/ydb/core/raw_socket/ya.make @@ -1,8 +1,12 @@ LIBRARY() SRCS( - kafka_messages.cpp - kafka_messages_int.cpp + sock64.h + sock_config.h + sock_impl.h + sock_listener.cpp + sock_listener.h + sock_ssl.h ) PEERDIR( @@ -15,5 +19,5 @@ PEERDIR( END() RECURSE_FOR_TESTS( - ut +# ut ) diff --git a/ydb/core/ya.make b/ydb/core/ya.make index 4547bd3308..4450028a82 100644 --- a/ydb/core/ya.make +++ b/ydb/core/ya.make @@ -23,7 +23,7 @@ RECURSE( health_check http_proxy io_formats - kafkaproxy + kafka_proxy kesus keyvalue kqp @@ -40,6 +40,7 @@ RECURSE( protos public_http quoter + raw_socket scheme scheme_types security |