aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-06-21 16:50:52 +0300
committertesseract <tesseract@yandex-team.com>2023-06-21 16:50:52 +0300
commit8039da344cd945e51a620e1c7b1c9810ab771c0b (patch)
treeeb63c46d69d03a2b4c15f3facfe3fea9fc5acf2e
parent7a2f532d8834a9ca4331bed840a0a8c09624d33b (diff)
downloadydb-8039da344cd945e51a620e1c7b1c9810ab771c0b.tar.gz
Listen to port for kafka protocol
-rw-r--r--ydb/core/CMakeLists.txt3
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/config.h1
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp27
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h7
-rw-r--r--ydb/core/driver_lib/run/run.cpp4
-rw-r--r--ydb/core/driver_lib/run/ya.make1
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt25
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt26
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt26
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.txt (renamed from ydb/core/kafkaproxy/CMakeLists.txt)0
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt25
-rw-r--r--ydb/core/kafka_proxy/kafka.h (renamed from ydb/core/kafkaproxy/kafka.h)0
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp427
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.h13
-rw-r--r--ydb/core/kafka_proxy/kafka_listener.h19
-rw-r--r--ydb/core/kafka_proxy/kafka_log.h11
-rw-r--r--ydb/core/kafka_proxy/kafka_log_impl.h11
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.cpp (renamed from ydb/core/kafkaproxy/kafka_messages.cpp)0
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.h (renamed from ydb/core/kafkaproxy/kafka_messages.h)0
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.cpp (renamed from ydb/core/kafkaproxy/kafka_messages_int.cpp)0
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.h (renamed from ydb/core/kafkaproxy/kafka_messages_int.h)6
-rw-r--r--ydb/core/kafka_proxy/kafka_proxy.h3
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt (renamed from ydb/core/kafkaproxy/ut/CMakeLists.darwin-x86_64.txt)26
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt (renamed from ydb/core/kafkaproxy/ut/CMakeLists.linux-aarch64.txt)26
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt (renamed from ydb/core/kafkaproxy/ut/CMakeLists.linux-x86_64.txt)26
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.txt (renamed from ydb/core/kafkaproxy/ut/CMakeLists.txt)0
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt (renamed from ydb/core/kafkaproxy/ut/CMakeLists.windows-x86_64.txt)24
-rw-r--r--ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp (renamed from ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp)0
-rw-r--r--ydb/core/kafka_proxy/ut/ut_serialization.cpp (renamed from ydb/core/kafkaproxy/ut/ut_serialization.cpp)14
-rw-r--r--ydb/core/kafka_proxy/ut/ya.make (renamed from ydb/core/kafkaproxy/ut/ya.make)2
-rw-r--r--ydb/core/kafka_proxy/ya.make29
-rw-r--r--ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/pgproxy/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/pgproxy/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/pgproxy/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp2
-rw-r--r--ydb/core/pgproxy/pg_connection.h11
-rw-r--r--ydb/core/pgproxy/pg_listener.h23
-rw-r--r--ydb/core/pgproxy/pg_proxy.cpp5
-rw-r--r--ydb/core/pgproxy/pg_proxy_events.h4
-rw-r--r--ydb/core/pgproxy/pg_proxy_ut.cpp5
-rw-r--r--ydb/core/pgproxy/ya.make6
-rw-r--r--ydb/core/protos/config.proto8
-rw-r--r--ydb/core/protos/services.proto2
-rw-r--r--ydb/core/raw_socket/CMakeLists.darwin-x86_64.txt (renamed from ydb/core/kafkaproxy/CMakeLists.darwin-x86_64.txt)10
-rw-r--r--ydb/core/raw_socket/CMakeLists.linux-aarch64.txt (renamed from ydb/core/kafkaproxy/CMakeLists.linux-aarch64.txt)10
-rw-r--r--ydb/core/raw_socket/CMakeLists.linux-x86_64.txt (renamed from ydb/core/kafkaproxy/CMakeLists.linux-x86_64.txt)10
-rw-r--r--ydb/core/raw_socket/CMakeLists.txt17
-rw-r--r--ydb/core/raw_socket/CMakeLists.windows-x86_64.txt (renamed from ydb/core/kafkaproxy/CMakeLists.windows-x86_64.txt)10
-rw-r--r--ydb/core/raw_socket/sock64.h (renamed from ydb/core/pgproxy/pg_sock64.h)6
-rw-r--r--ydb/core/raw_socket/sock_config.h (renamed from ydb/core/pgproxy/pg_proxy_config.h)8
-rw-r--r--ydb/core/raw_socket/sock_impl.h (renamed from ydb/core/pgproxy/pg_proxy_impl.h)10
-rw-r--r--ydb/core/raw_socket/sock_listener.cpp (renamed from ydb/core/pgproxy/pg_listener.cpp)50
-rw-r--r--ydb/core/raw_socket/sock_listener.h23
-rw-r--r--ydb/core/raw_socket/sock_ssl.h (renamed from ydb/core/pgproxy/pg_proxy_ssl.h)4
-rw-r--r--ydb/core/raw_socket/ya.make (renamed from ydb/core/kafkaproxy/ya.make)10
-rw-r--r--ydb/core/ya.make3
61 files changed, 881 insertions, 150 deletions
diff --git a/ydb/core/CMakeLists.txt b/ydb/core/CMakeLists.txt
index 7e953aaa5d..474cc8c050 100644
--- a/ydb/core/CMakeLists.txt
+++ b/ydb/core/CMakeLists.txt
@@ -32,7 +32,7 @@ add_subdirectory(grpc_streaming)
add_subdirectory(health_check)
add_subdirectory(http_proxy)
add_subdirectory(io_formats)
-add_subdirectory(kafkaproxy)
+add_subdirectory(kafka_proxy)
add_subdirectory(kesus)
add_subdirectory(keyvalue)
add_subdirectory(kqp)
@@ -49,6 +49,7 @@ add_subdirectory(pgproxy)
add_subdirectory(protos)
add_subdirectory(public_http)
add_subdirectory(quoter)
+add_subdirectory(raw_socket)
add_subdirectory(scheme)
add_subdirectory(scheme_types)
add_subdirectory(security)
diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
index a08bd34a7b..fd8b8d879d 100644
--- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
@@ -65,6 +65,7 @@ target_link_libraries(run PUBLIC
core-kesus-proxy
core-kesus-tablet
ydb-core-keyvalue
+ ydb-core-kafka_proxy
ydb-core-kqp
core-kqp-rm_service
ydb-core-load_test
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
index 444d21918a..0f090796cf 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
@@ -66,6 +66,7 @@ target_link_libraries(run PUBLIC
core-kesus-proxy
core-kesus-tablet
ydb-core-keyvalue
+ ydb-core-kafka_proxy
ydb-core-kqp
core-kqp-rm_service
ydb-core-load_test
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
index 444d21918a..0f090796cf 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
@@ -66,6 +66,7 @@ target_link_libraries(run PUBLIC
core-kesus-proxy
core-kesus-tablet
ydb-core-keyvalue
+ ydb-core-kafka_proxy
ydb-core-kqp
core-kqp-rm_service
ydb-core-load_test
diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
index a08bd34a7b..fd8b8d879d 100644
--- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
@@ -65,6 +65,7 @@ target_link_libraries(run PUBLIC
core-kesus-proxy
core-kesus-tablet
ydb-core-keyvalue
+ ydb-core-kafka_proxy
ydb-core-kqp
core-kqp-rm_service
ydb-core-load_test
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h
index c486031d78..10467bd824 100644
--- a/ydb/core/driver_lib/run/config.h
+++ b/ydb/core/driver_lib/run/config.h
@@ -72,6 +72,7 @@ union TBasicKikimrServicesMask {
bool EnableExternalIndex: 1;
bool EnableConveyor: 1;
bool EnableLocalPgWire:1;
+ bool EnableKafkaProxy:1;
};
ui64 Raw;
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index f0ce5d1870..b5c2944af0 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -63,6 +63,8 @@
#include <ydb/core/health_check/health_check.h>
+#include <ydb/core/kafka_proxy/kafka_proxy.h>
+
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/proxy_service/kqp_proxy_service.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
@@ -2840,9 +2842,7 @@ void TLocalPgWireServiceInitializer::InitializeServices(NActors::TActorSystemSet
);
NPG::TListenerSettings settings;
- if (Config.GetLocalPgWireConfig().HasListeningPort()) {
- settings.Port = Config.GetLocalPgWireConfig().GetListeningPort();
- }
+ settings.Port = Config.GetLocalPgWireConfig().GetListeningPort();
if (Config.GetLocalPgWireConfig().HasSslCertificate()) {
settings.SslCertificatePem = Config.GetLocalPgWireConfig().GetSslCertificate();
}
@@ -2854,5 +2854,26 @@ void TLocalPgWireServiceInitializer::InitializeServices(NActors::TActorSystemSet
);
}
+TKafkaProxyServiceInitializer::TKafkaProxyServiceInitializer(const TKikimrRunConfig& runConfig)
+ : IKikimrServicesInitializer(runConfig)
+{
+}
+
+void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+ if (Config.GetKafkaProxyConfig().GetEnableKafkaProxy()) {
+ NKafka::TListenerSettings settings;
+ settings.Port = Config.GetKafkaProxyConfig().GetListeningPort();
+ if (Config.GetKafkaProxyConfig().HasSslCertificate()) {
+ settings.SslCertificatePem = Config.GetKafkaProxyConfig().GetSslCertificate();
+ }
+
+ setup->LocalServices.emplace_back(
+ TActorId(),
+ TActorSetupCmd(NKafka::CreateKafkaListener(MakePollerActorId(), settings),
+ TMailboxType::HTSwap, appData->UserPoolId)
+ );
+ }
+}
+
} // namespace NKikimrServicesInitializers
} // namespace NKikimr
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index 20b109ae4c..cc6ed3a9cf 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -559,5 +559,12 @@ public:
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};
+class TKafkaProxyServiceInitializer : public IKikimrServicesInitializer {
+public:
+ TKafkaProxyServiceInitializer(const TKikimrRunConfig& runConfig);
+
+ void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
} // namespace NKikimrServicesInitializers
} // namespace NKikimr
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index d8c5c4a25b..f150aeb4aa 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1592,6 +1592,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TLocalPgWireServiceInitializer(runConfig));
}
+ if (serviceMask.EnableKafkaProxy) {
+ sil->AddServiceInitializer(new TKafkaProxyServiceInitializer(runConfig));
+ }
+
return sil;
}
diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make
index 48d0b30951..3be8ed95c3 100644
--- a/ydb/core/driver_lib/run/ya.make
+++ b/ydb/core/driver_lib/run/ya.make
@@ -80,6 +80,7 @@ PEERDIR(
ydb/core/kesus/proxy
ydb/core/kesus/tablet
ydb/core/keyvalue
+ ydb/core/kafka_proxy
ydb/core/kqp
ydb/core/kqp/rm_service
ydb/core/load_test
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..d7e93e12b0
--- /dev/null
+++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(ydb-core-kafka_proxy)
+target_link_libraries(ydb-core-kafka_proxy PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-protos
+ ydb-core-base
+ ydb-core-protos
+ ydb-core-raw_socket
+)
+target_sources(ydb-core-kafka_proxy PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
+)
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..e2dc3a0969
--- /dev/null
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(ydb-core-kafka_proxy)
+target_link_libraries(ydb-core-kafka_proxy PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-protos
+ ydb-core-base
+ ydb-core-protos
+ ydb-core-raw_socket
+)
+target_sources(ydb-core-kafka_proxy PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
+)
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..e2dc3a0969
--- /dev/null
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(ydb-core-kafka_proxy)
+target_link_libraries(ydb-core-kafka_proxy PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-protos
+ ydb-core-base
+ ydb-core-protos
+ ydb-core-raw_socket
+)
+target_sources(ydb-core-kafka_proxy PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
+)
diff --git a/ydb/core/kafkaproxy/CMakeLists.txt b/ydb/core/kafka_proxy/CMakeLists.txt
index f8b31df0c1..f8b31df0c1 100644
--- a/ydb/core/kafkaproxy/CMakeLists.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.txt
diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..d7e93e12b0
--- /dev/null
+++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(ut)
+
+add_library(ydb-core-kafka_proxy)
+target_link_libraries(ydb-core-kafka_proxy PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ cpp-actors-protos
+ ydb-core-base
+ ydb-core-protos
+ ydb-core-raw_socket
+)
+target_sources(ydb-core-kafka_proxy PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
+)
diff --git a/ydb/core/kafkaproxy/kafka.h b/ydb/core/kafka_proxy/kafka.h
index 8e4f5651b2..8e4f5651b2 100644
--- a/ydb/core/kafkaproxy/kafka.h
+++ b/ydb/core/kafka_proxy/kafka.h
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
new file mode 100644
index 0000000000..f9934a1a8d
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -0,0 +1,427 @@
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/core/raw_socket/sock_config.h>
+
+#include "kafka_connection.h"
+#include "kafka_messages.h"
+#include "kafka_log_impl.h"
+
+#include <strstream>
+#include <sstream>
+#include <iosfwd>
+
+namespace NKafka {
+
+using namespace NActors;
+
+char Hex(const unsigned char c) {
+ return c < 10 ? '0' + c : 'A' + c - 10;
+}
+
+void Print(const TString& marker, TBuffer& buffer, ssize_t length) {
+ TStringBuilder sb;
+ for (ssize_t i = 0; i < length; ++i) {
+ char c = buffer.Data()[i];
+ if (i > 0) {
+ sb << ", ";
+ }
+ sb << "0x" << Hex(c >> 4) << Hex(c & 0x0F);
+ }
+ KAFKA_LOG_T("Packet " << marker << ": " << sb);
+}
+
+class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNetworkConfig {
+public:
+ using TBase = TActorBootstrapped<TKafkaConnection>;
+
+ // TODO check standard ip packet MTU. On my desktop it is 1500 on eth and wlp interfaces. It is 1300 on the tun interface. It
+ // is 1500 and 8950 on the dev server interfaces.
+ static constexpr size_t BufferSize = 8950;
+ static constexpr size_t MinDirectSize = 256;
+
+ struct Msg {
+ size_t Size = 0;
+ TKafkaInt32 ExpectedSize = 0;
+ TRequestHeaderData Header;
+ std::unique_ptr<TMessage> Message;
+ };
+
+ static constexpr TDuration InactivityTimeout = TDuration::Minutes(10);
+ TEvPollerReady* InactivityEvent = nullptr;
+ TPollerToken::TPtr PollerToken;
+
+ TIntrusivePtr<TSocketDescriptor> Socket;
+ TSocketAddressType Address;
+ THPTimer InactivityTimer;
+
+ bool IsAuthRequired = true;
+ bool IsSslSupported = true;
+
+ bool ConnectionEstablished = false;
+ bool CloseConnection = false;
+
+ TBuffer Buffer;
+ size_t Length;
+ size_t Position;
+
+ Msg Request;
+ bool HeaderSizeWasRead;
+ bool HeaderWasRead;
+ bool MessageSizeWasRead;
+ bool MessageWasRead;
+ std::unique_ptr<TReadContext> Ctx;
+
+ TReadDemand Demand;
+
+ TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address)
+ : Socket(std::move(socket))
+ , Address(address)
+ , Length(0)
+ , Position(0)
+ , HeaderSizeWasRead(false)
+ , HeaderWasRead(false)
+ , MessageSizeWasRead(false)
+ , MessageWasRead(false)
+ , Demand(NoDemand) {
+ SetNonBlock();
+ IsSslSupported = IsSslSupported && Socket->IsSslSupported();
+ Buffer.Resize(BufferSize);
+ }
+
+ void Bootstrap() {
+ Become(&TKafkaConnection::StateAccepting);
+ Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false));
+ KAFKA_LOG_D("incoming connection opened");
+ OnAccept();
+ }
+
+ void PassAway() override {
+ if (ConnectionEstablished) {
+ ConnectionEstablished = false;
+ }
+ Shutdown();
+ TBase::PassAway();
+ }
+
+protected:
+ void SetNonBlock() noexcept {
+ Socket->SetNonBlock();
+ }
+
+ void Shutdown() {
+ if (Socket) {
+ Socket->Shutdown();
+ }
+ }
+
+ ssize_t SocketSend(const void* data, size_t size) {
+ return Socket->Send(data, size);
+ }
+
+ ssize_t SocketReceive(void* data, size_t size) {
+ return Socket->Receive(data, size);
+ }
+
+ void RequestPoller() {
+ Socket->RequestPoller(PollerToken);
+ }
+
+ SOCKET GetRawSocket() const {
+ return Socket->GetRawSocket();
+ }
+
+ TString LogPrefix() const {
+ return TStringBuilder() << "(#" << GetRawSocket() << "," << Address->ToString() << ") ";
+ }
+
+ void OnAccept() {
+ InactivityTimer.Reset();
+ TBase::Become(&TKafkaConnection::StateConnected);
+ Send(SelfId(), new TEvPollerReady(nullptr, true, true));
+ }
+
+ void HandleAccepting(TEvPollerRegisterResult::TPtr ev) {
+ PollerToken = std::move(ev->Get()->PollerToken);
+ OnAccept();
+ }
+
+ void HandleAccepting(NActors::TEvPollerReady::TPtr) {
+ OnAccept();
+ }
+
+ STATEFN(StateAccepting) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvPollerReady, HandleAccepting);
+ hFunc(TEvPollerRegisterResult, HandleAccepting);
+ }
+ }
+
+ void HandleMessage(TRequestHeaderData* header, TApiVersionsRequestData* /*message*/) {
+ TApiVersionsResponseData response;
+ response.ApiKeys.resize(4);
+
+ response.ApiKeys[0].ApiKey = PRODUCE;
+ response.ApiKeys[0].MinVersion = TProduceRequestData::MessageMeta::PresentVersionMin;
+ response.ApiKeys[0].MaxVersion = TProduceRequestData::MessageMeta::PresentVersionMax;
+
+ response.ApiKeys[1].ApiKey = API_VERSIONS;
+ response.ApiKeys[1].MinVersion = TApiVersionsRequestData::MessageMeta::PresentVersionMin;
+ response.ApiKeys[1].MaxVersion = TApiVersionsRequestData::MessageMeta::PresentVersionMax;
+
+ response.ApiKeys[2].ApiKey = METADATA;
+ response.ApiKeys[2].MinVersion = TMetadataRequestData::MessageMeta::PresentVersionMin;
+ response.ApiKeys[2].MaxVersion = TMetadataRequestData::MessageMeta::PresentVersionMax;
+
+ response.ApiKeys[3].ApiKey = INIT_PRODUCER_ID;
+ response.ApiKeys[3].MinVersion = TInitProducerIdRequestData::MessageMeta::PresentVersionMin;
+ response.ApiKeys[3].MaxVersion = TInitProducerIdRequestData::MessageMeta::PresentVersionMax;
+
+ Reply(header, &response);
+ }
+
+ void HandleMessage(TRequestHeaderData* header, TProduceRequestData* message) {
+ TProduceResponseData response;
+ response.Responses.resize(message->TopicData.size());
+ int i = 0;
+ for (auto& data : message->TopicData) {
+ response.Responses[i].Name = data.Name;
+ response.Responses[i].PartitionResponses.resize(data.PartitionData.size());
+ int j = 0;
+ for (auto& p : data.PartitionData) {
+ response.Responses[i].PartitionResponses[j].Index = p.Index;
+ response.Responses[i].PartitionResponses[j].BaseOffset = 40;
+
+ ++j;
+ }
+ ++i;
+ }
+
+ Reply(header, &response);
+ }
+
+ void HandleMessage(TRequestHeaderData* header, TInitProducerIdRequestData* /*message*/) {
+ TInitProducerIdResponseData response;
+ response.ProducerEpoch = 1;
+ response.ProducerId = 1;
+ response.ErrorCode = 0;
+ response.ThrottleTimeMs = 0;
+
+ Reply(header, &response);
+ }
+
+ void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* /*message*/) {
+ TMetadataResponseData response;
+ response.ThrottleTimeMs = 0;
+ response.ClusterId = "cluster-ahjgk";
+ response.ControllerId = 1;
+
+ response.Brokers.resize(1);
+ response.Brokers[0].NodeId = 1;
+ response.Brokers[0].Host = "lbk-dev-02.search.yandex.net";
+ response.Brokers[0].Port = 9092;
+
+ response.Topics.resize(1);
+ response.Topics[0].TopicId = TKafkaUuid(0, 1);
+ response.Topics[0].Name = "topic-1";
+ response.Topics[0].Partitions.resize(1);
+ response.Topics[0].Partitions[0].LeaderId = 1; // response.Brokers[0].NodeId
+ response.Topics[0].Partitions[0].ReplicaNodes.resize(1);
+ response.Topics[0].Partitions[0].ReplicaNodes[0] = 1;
+ response.Topics[0].Partitions[0].IsrNodes.resize(1);
+ response.Topics[0].Partitions[0].IsrNodes[0] = 1;
+
+ Reply(header, &response);
+ }
+
+ void ProcessRequest() {
+ KAFKA_LOG_D("process message: ApiKey=" << Request.Header.RequestApiKey << ", ExpectedSize=" << Request.ExpectedSize
+ << ", Size=" << Request.Size);
+ switch (Request.Header.RequestApiKey) {
+ case PRODUCE:
+ HandleMessage(&Request.Header, dynamic_cast<TProduceRequestData*>(Request.Message.get()));
+ return;
+
+ case API_VERSIONS:
+ HandleMessage(&Request.Header, dynamic_cast<TApiVersionsRequestData*>(Request.Message.get()));
+ return;
+
+ case INIT_PRODUCER_ID:
+ HandleMessage(&Request.Header, dynamic_cast<TInitProducerIdRequestData*>(Request.Message.get()));
+ return;
+
+ case METADATA:
+ HandleMessage(&Request.Header, dynamic_cast<TMetadataRequestData*>(Request.Message.get()));
+ return;
+
+ default:
+ KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request.Header.RequestApiKey);
+ }
+ }
+
+ void Reply(TRequestHeaderData* header, TApiMessage* reply) {
+ // TODO improve allocation
+ TKafkaVersion headerVersion = ResponseHeaderVersion(header->RequestApiKey, header->RequestApiVersion);
+ TKafkaVersion version = header->RequestApiVersion;
+
+ TResponseHeaderData responseHeader;
+ responseHeader.CorrelationId = header->CorrelationId;
+
+ TKafkaInt32 size = responseHeader.Size(headerVersion) + reply->Size(version);
+
+ std::stringstream sb;
+ TKafkaWritable writable(sb);
+ writable << size;
+ responseHeader.Write(writable, headerVersion);
+ reply->Write(writable, version);
+
+ TBuffer b;
+ b.Reserve(size + sizeof(size));
+ sb.read(b.data(), size + sizeof(size));
+
+ Print("sent", b, size + sizeof(size));
+
+ SocketSend(b.Data(), size + sizeof(size));
+ }
+
+ void DoRead() {
+ for (;;) {
+ while (Demand) {
+ ssize_t received = 0;
+ if (Position < Length) {
+ KAFKA_LOG_T("Read from buffer: Position=" << Position << ", Length=" << Length
+ << ", Demand=" << Demand.GetLength());
+ received = std::min(Demand.Length, Length - Position);
+ if (!Demand.Skip()) {
+ memcpy(Demand.Buffer, Buffer.Data() + Position, received);
+ }
+ Position += received;
+ } else if (!Demand.Skip() && Demand.Length >= MinDirectSize) {
+ ssize_t res = SocketReceive(Demand.Buffer, Demand.GetLength());
+ if (-res == EAGAIN || -res == EWOULDBLOCK) {
+ return;
+ } else if (-res == EINTR) {
+ continue;
+ } else if (!res) {
+ KAFKA_LOG_ERROR("connection closed");
+ return PassAway();
+ } else if (res < 0) {
+ KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res));
+ return PassAway();
+ }
+ received = res;
+ if (!received) {
+ return;
+ }
+ } else {
+ Position = 0;
+ Length = 0;
+ ssize_t res = SocketReceive(Buffer.Data(), BufferSize);
+ if (-res == EAGAIN || -res == EWOULDBLOCK) {
+ return;
+ } else if (-res == EINTR) {
+ continue;
+ } else if (!res) {
+ KAFKA_LOG_ERROR("connection closed");
+ return PassAway();
+ } else if (res < 0) {
+ KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res));
+ return PassAway();
+ }
+ Length = res;
+ Print("received", Buffer, Length);
+ if (!Length) {
+ return;
+ }
+
+ continue;
+ }
+
+ Request.Size += received;
+ Demand.Buffer += received;
+ Demand.Length -= received;
+ }
+ if (Ctx) {
+ Demand = Ctx->Next();
+ }
+ if (!Demand) {
+ if (MessageWasRead) {
+ HeaderSizeWasRead = false;
+ MessageWasRead = false;
+ HeaderWasRead = false;
+
+ ProcessRequest();
+
+ Request = Msg();
+ Ctx = nullptr;
+ }
+ if (!HeaderSizeWasRead) {
+ Demand = TReadDemand((char*)&(Request.ExpectedSize), sizeof(Request.ExpectedSize));
+ HeaderSizeWasRead = true;
+ Ctx = nullptr;
+ } else if (!HeaderWasRead) {
+ NPrivate::NormalizeNumber(Request.ExpectedSize);
+
+ KAFKA_LOG_T("start read new message. ExpectedSize=" << Request.ExpectedSize);
+
+ Ctx = Request.Header.CreateReadContext(2);
+
+ HeaderWasRead = true;
+ } else {
+ KAFKA_LOG_T("received header. ApiKey=" << Request.Header.RequestApiKey
+ << ", Version=" << Request.Header.RequestApiVersion);
+
+ i16 apiKey = Request.Header.RequestApiKey;
+ TKafkaVersion version = Request.Header.RequestApiVersion;
+
+ Request.Message = CreateRequest(apiKey);
+ Ctx = Request.Message->CreateReadContext(version);
+
+ MessageWasRead = true;
+ }
+ }
+ }
+ }
+
+ void HandleConnected(TEvPollerReady::TPtr event) {
+ if (event->Get()->Read) {
+ DoRead();
+
+ if (event->Get() == InactivityEvent) {
+ const TDuration passed = TDuration::Seconds(std::abs(InactivityTimer.Passed()));
+ if (passed >= InactivityTimeout) {
+ KAFKA_LOG_D("connection closed by inactivity timeout");
+ return PassAway(); // timeout
+ } else {
+ Schedule(InactivityTimeout - passed, InactivityEvent = new TEvPollerReady(nullptr, false, false));
+ }
+ }
+ }
+ if (event->Get()->Write) {
+ if (!FlushOutput()) {
+ return;
+ }
+ }
+ RequestPoller();
+ }
+
+ bool FlushOutput() {
+ return true; // TODO
+ }
+
+ void HandleConnected(TEvPollerRegisterResult::TPtr ev) {
+ PollerToken = std::move(ev->Get()->PollerToken);
+ PollerToken->Request(true, true);
+ }
+
+ STATEFN(StateConnected) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvPollerReady, HandleConnected);
+ hFunc(TEvPollerRegisterResult, HandleConnected);
+ }
+ }
+};
+
+NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
+ return new TKafkaConnection(std::move(socket), std::move(address));
+}
+
+} // namespace NKafka \ No newline at end of file
diff --git a/ydb/core/kafka_proxy/kafka_connection.h b/ydb/core/kafka_proxy/kafka_connection.h
new file mode 100644
index 0000000000..e23cf9995f
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_connection.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/core/raw_socket/sock_config.h>
+#include <ydb/core/raw_socket/sock_impl.h>
+
+namespace NKafka {
+
+using namespace NKikimr::NRawSocket;
+
+NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address);
+
+}
diff --git a/ydb/core/kafka_proxy/kafka_listener.h b/ydb/core/kafka_proxy/kafka_listener.h
new file mode 100644
index 0000000000..a5a36cfc3c
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_listener.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <ydb/core/raw_socket/sock_listener.h>
+#include "kafka_connection.h"
+
+namespace NKafka {
+
+using namespace NKikimr::NRawSocket;
+
+inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings = {.Port = 9092}) {
+ return CreateSocketListener(
+ poller, settings,
+ [](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
+ return CreateKafkaConnection(socket, address);
+ },
+ NKikimrServices::EServiceKikimr::KAFKA_PROXY);
+}
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_log.h b/ydb/core/kafka_proxy/kafka_log.h
new file mode 100644
index 0000000000..8547259b36
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_log.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/log.h>
+#include <ydb/core/protos/services.pb.h>
+
+namespace NKafka {
+
+inline TString LogPrefix() { return {}; }
+
+}
diff --git a/ydb/core/kafka_proxy/kafka_log_impl.h b/ydb/core/kafka_proxy/kafka_log_impl.h
new file mode 100644
index 0000000000..2aa54cd111
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_log_impl.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include "kafka_log.h"
+
+#define KAFKA_LOG_T(stream) LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream)
+#define KAFKA_LOG_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream)
+#define KAFKA_LOG_I(stream) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream)
+#define KAFKA_LOG_W(stream) LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream)
+#define KAFKA_LOG_NOTICE(stream) LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream)
+#define KAFKA_LOG_ERROR(stream) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KAFKA_PROXY, LogPrefix() << stream)
+
diff --git a/ydb/core/kafkaproxy/kafka_messages.cpp b/ydb/core/kafka_proxy/kafka_messages.cpp
index 7c45f6719c..7c45f6719c 100644
--- a/ydb/core/kafkaproxy/kafka_messages.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages.cpp
diff --git a/ydb/core/kafkaproxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h
index c070177e7f..c070177e7f 100644
--- a/ydb/core/kafkaproxy/kafka_messages.h
+++ b/ydb/core/kafka_proxy/kafka_messages.h
diff --git a/ydb/core/kafkaproxy/kafka_messages_int.cpp b/ydb/core/kafka_proxy/kafka_messages_int.cpp
index 9604fdaa76..9604fdaa76 100644
--- a/ydb/core/kafkaproxy/kafka_messages_int.cpp
+++ b/ydb/core/kafka_proxy/kafka_messages_int.cpp
diff --git a/ydb/core/kafkaproxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h
index 077d8b46b0..3f02644272 100644
--- a/ydb/core/kafkaproxy/kafka_messages_int.h
+++ b/ydb/core/kafka_proxy/kafka_messages_int.h
@@ -14,6 +14,8 @@
namespace NKafka {
namespace NPrivate {
+static constexpr TKafkaInt32 MAX_RECORDS_SIZE = 1 << 28; // 256Mb
+
struct TKafkaBoolDesc {
static constexpr bool Default = true;
static constexpr bool Nullable = false;
@@ -484,12 +486,14 @@ public:
} else {
ythrow yexception() << "non-nullable field " << Meta::Name << " was serialized as null";
}
+ } else if (length > MAX_RECORDS_SIZE) {
+ ythrow yexception() << "records fields " << Meta::Name << " has invalid length " << length;
}
value = TKafkaRawBytes();
value->Resize(length);
return TReadDemand(value->data(), length);
- }
+ }
private:
bool WasRead;
diff --git a/ydb/core/kafka_proxy/kafka_proxy.h b/ydb/core/kafka_proxy/kafka_proxy.h
new file mode 100644
index 0000000000..d1a5213373
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_proxy.h
@@ -0,0 +1,3 @@
+#pragma once
+
+#include "kafka_listener.h"
diff --git a/ydb/core/kafkaproxy/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt
index 5a13fbe24f..b20b2c0a03 100644
--- a/ydb/core/kafkaproxy/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt
@@ -7,47 +7,47 @@
-add_executable(ydb-core-kafkaproxy-ut)
-target_link_libraries(ydb-core-kafkaproxy-ut PUBLIC
+add_executable(ydb-core-kafka_proxy-ut)
+target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC
contrib-libs-cxxsupp
yutil
library-cpp-cpuid_check
cpp-testing-gtest
cpp-testing-gtest_main
- ydb-core-kafkaproxy
+ ydb-core-kafka_proxy
)
-target_link_options(ydb-core-kafkaproxy-ut PRIVATE
+target_link_options(ydb-core-kafka_proxy-ut PRIVATE
-Wl,-platform_version,macos,11.0,11.0
-fPIC
-fPIC
-framework
CoreFoundation
)
-target_sources(ydb-core-kafkaproxy-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_serialization.cpp
+target_sources(ydb-core-kafka_proxy-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
)
add_test(
NAME
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
COMMAND
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
)
set_property(
TEST
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
PROPERTY
LABELS
SMALL
)
set_property(
TEST
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
PROPERTY
PROCESSORS
1
)
-target_allocator(ydb-core-kafkaproxy-ut
+target_allocator(ydb-core-kafka_proxy-ut
system_allocator
)
-vcs_info(ydb-core-kafkaproxy-ut)
+vcs_info(ydb-core-kafka_proxy-ut)
diff --git a/ydb/core/kafkaproxy/ut/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt
index c03a1c10ed..38872fb558 100644
--- a/ydb/core/kafkaproxy/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt
@@ -7,16 +7,16 @@
-add_executable(ydb-core-kafkaproxy-ut)
-target_link_libraries(ydb-core-kafkaproxy-ut PUBLIC
+add_executable(ydb-core-kafka_proxy-ut)
+target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
cpp-testing-gtest
cpp-testing-gtest_main
- ydb-core-kafkaproxy
+ ydb-core-kafka_proxy
)
-target_link_options(ydb-core-kafkaproxy-ut PRIVATE
+target_link_options(ydb-core-kafka_proxy-ut PRIVATE
-ldl
-lrt
-Wl,--no-as-needed
@@ -26,31 +26,31 @@ target_link_options(ydb-core-kafkaproxy-ut PRIVATE
-lrt
-ldl
)
-target_sources(ydb-core-kafkaproxy-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_serialization.cpp
+target_sources(ydb-core-kafka_proxy-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
)
add_test(
NAME
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
COMMAND
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
)
set_property(
TEST
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
PROPERTY
LABELS
SMALL
)
set_property(
TEST
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
PROPERTY
PROCESSORS
1
)
-target_allocator(ydb-core-kafkaproxy-ut
+target_allocator(ydb-core-kafka_proxy-ut
cpp-malloc-jemalloc
)
-vcs_info(ydb-core-kafkaproxy-ut)
+vcs_info(ydb-core-kafka_proxy-ut)
diff --git a/ydb/core/kafkaproxy/ut/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt
index 9089cb7458..a75ef634f8 100644
--- a/ydb/core/kafkaproxy/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt
@@ -7,17 +7,17 @@
-add_executable(ydb-core-kafkaproxy-ut)
-target_link_libraries(ydb-core-kafkaproxy-ut PUBLIC
+add_executable(ydb-core-kafka_proxy-ut)
+target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
library-cpp-cpuid_check
cpp-testing-gtest
cpp-testing-gtest_main
- ydb-core-kafkaproxy
+ ydb-core-kafka_proxy
)
-target_link_options(ydb-core-kafkaproxy-ut PRIVATE
+target_link_options(ydb-core-kafka_proxy-ut PRIVATE
-ldl
-lrt
-Wl,--no-as-needed
@@ -27,32 +27,32 @@ target_link_options(ydb-core-kafkaproxy-ut PRIVATE
-lrt
-ldl
)
-target_sources(ydb-core-kafkaproxy-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_serialization.cpp
+target_sources(ydb-core-kafka_proxy-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
)
add_test(
NAME
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
COMMAND
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
)
set_property(
TEST
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
PROPERTY
LABELS
SMALL
)
set_property(
TEST
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
PROPERTY
PROCESSORS
1
)
-target_allocator(ydb-core-kafkaproxy-ut
+target_allocator(ydb-core-kafka_proxy-ut
cpp-malloc-tcmalloc
libs-tcmalloc-no_percpu_cache
)
-vcs_info(ydb-core-kafkaproxy-ut)
+vcs_info(ydb-core-kafka_proxy-ut)
diff --git a/ydb/core/kafkaproxy/ut/CMakeLists.txt b/ydb/core/kafka_proxy/ut/CMakeLists.txt
index f8b31df0c1..f8b31df0c1 100644
--- a/ydb/core/kafkaproxy/ut/CMakeLists.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.txt
diff --git a/ydb/core/kafkaproxy/ut/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt
index 1b948214ae..5bb08d1134 100644
--- a/ydb/core/kafkaproxy/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt
@@ -7,40 +7,40 @@
-add_executable(ydb-core-kafkaproxy-ut)
-target_link_libraries(ydb-core-kafkaproxy-ut PUBLIC
+add_executable(ydb-core-kafka_proxy-ut)
+target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC
contrib-libs-cxxsupp
yutil
library-cpp-cpuid_check
cpp-testing-gtest
cpp-testing-gtest_main
- ydb-core-kafkaproxy
+ ydb-core-kafka_proxy
)
-target_sources(ydb-core-kafkaproxy-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/ut/ut_serialization.cpp
+target_sources(ydb-core-kafka_proxy-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
)
add_test(
NAME
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
COMMAND
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
)
set_property(
TEST
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
PROPERTY
LABELS
SMALL
)
set_property(
TEST
- ydb-core-kafkaproxy-ut
+ ydb-core-kafka_proxy-ut
PROPERTY
PROCESSORS
1
)
-target_allocator(ydb-core-kafkaproxy-ut
+target_allocator(ydb-core-kafka_proxy-ut
system_allocator
)
-vcs_info(ydb-core-kafkaproxy-ut)
+vcs_info(ydb-core-kafka_proxy-ut)
diff --git a/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp b/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
index c66ba20a03..c66ba20a03 100644
--- a/ydb/core/kafkaproxy/ut/ut_kafka_functions.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
diff --git a/ydb/core/kafkaproxy/ut/ut_serialization.cpp b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
index 3bca84033b..ed8185637a 100644
--- a/ydb/core/kafkaproxy/ut/ut_serialization.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_serialization.cpp
@@ -585,7 +585,21 @@ TEST(Serialization, TKafkaFloat64_PresentVersion_NotTaggedVersion) {
}
}
+TEST(Serialization, RequestHeader_reference) {
+ ui8 reference[] = {0x00, 0x12, 0x00, 0x00, 0x7F, 0x6F, 0x6F, 0x68, 0x00, 0x0A, 0x70, 0x72, 0x6F, 0x64, 0x75, 0x63,
+ 0x65, 0x72, 0x2D, 0x31};
+ std::stringstream sb;
+ sb.write((char*)reference, sizeof(reference));
+
+ TReadProcessor processor(sb);
+ TRequestHeaderData result;
+ processor.Read(&result, 1);
+
+ EXPECT_EQ(result.RequestApiKey, 0x12);
+ EXPECT_EQ(result.RequestApiVersion, 0x00);
+ EXPECT_EQ(result.ClientId, "producer-1");
+}
TEST(Serialization, ProduceRequestData_reference) {
// original kafka serialized value (java implementation)
diff --git a/ydb/core/kafkaproxy/ut/ya.make b/ydb/core/kafka_proxy/ut/ya.make
index 1a384fdd1d..4626bbd0dc 100644
--- a/ydb/core/kafkaproxy/ut/ya.make
+++ b/ydb/core/kafka_proxy/ut/ya.make
@@ -6,7 +6,7 @@ SRCS(
)
PEERDIR(
- ydb/core/kafkaproxy
+ ydb/core/kafka_proxy
)
END()
diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make
new file mode 100644
index 0000000000..b99a60dd3d
--- /dev/null
+++ b/ydb/core/kafka_proxy/ya.make
@@ -0,0 +1,29 @@
+LIBRARY()
+
+SRCS(
+ kafka_connection.cpp
+ kafka_connection.h
+ kafka_listener.h
+ kafka.h
+ kafka_log.h
+ kafka_log_impl.h
+ kafka_messages.cpp
+ kafka_messages.h
+ kafka_messages_int.cpp
+ kafka_messages_int.h
+ kafka_proxy.h
+)
+
+PEERDIR(
+ library/cpp/actors/core
+ library/cpp/actors/protos
+ ydb/core/base
+ ydb/core/protos
+ ydb/core/raw_socket
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt b/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt
index 7e189a4cbf..15fa663dc7 100644
--- a/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt
@@ -16,10 +16,10 @@ target_link_libraries(ydb-core-pgproxy PUBLIC
cpp-actors-protos
ydb-core-base
ydb-core-protos
+ ydb-core-raw_socket
)
target_sources(ydb-core-pgproxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_connection.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_listener.cpp
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy_types.cpp
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy.cpp
)
diff --git a/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt b/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt
index 8d5de35255..21e428babe 100644
--- a/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt
@@ -17,10 +17,10 @@ target_link_libraries(ydb-core-pgproxy PUBLIC
cpp-actors-protos
ydb-core-base
ydb-core-protos
+ ydb-core-raw_socket
)
target_sources(ydb-core-pgproxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_connection.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_listener.cpp
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy_types.cpp
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy.cpp
)
diff --git a/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt b/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt
index 8d5de35255..21e428babe 100644
--- a/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt
@@ -17,10 +17,10 @@ target_link_libraries(ydb-core-pgproxy PUBLIC
cpp-actors-protos
ydb-core-base
ydb-core-protos
+ ydb-core-raw_socket
)
target_sources(ydb-core-pgproxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_connection.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_listener.cpp
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy_types.cpp
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy.cpp
)
diff --git a/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt b/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt
index 7e189a4cbf..15fa663dc7 100644
--- a/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt
@@ -16,10 +16,10 @@ target_link_libraries(ydb-core-pgproxy PUBLIC
cpp-actors-protos
ydb-core-base
ydb-core-protos
+ ydb-core-raw_socket
)
target_sources(ydb-core-pgproxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_connection.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_listener.cpp
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy_types.cpp
${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/pg_proxy.cpp
)
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index 323858c628..e9529d2389 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -1,10 +1,10 @@
#include "pg_connection.h"
-#include "pg_proxy_config.h"
#include "pg_proxy_types.h"
#include "pg_proxy_events.h"
#include "pg_stream.h"
#include "pg_log_impl.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/core/raw_socket/sock_config.h>
namespace NPG {
diff --git a/ydb/core/pgproxy/pg_connection.h b/ydb/core/pgproxy/pg_connection.h
index ee41c18ea8..a9448b4a88 100644
--- a/ydb/core/pgproxy/pg_connection.h
+++ b/ydb/core/pgproxy/pg_connection.h
@@ -1,11 +1,14 @@
#pragma once
#include <library/cpp/actors/core/actor.h>
-#include "pg_proxy_config.h"
-#include "pg_proxy_impl.h"
+#include <ydb/core/raw_socket/sock_config.h>
+#include <ydb/core/raw_socket/sock_impl.h>
namespace NPG {
-NActors::IActor* CreatePGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const NActors::TActorId& databaseProxy);
+using namespace NKikimr::NRawSocket;
-}
+NActors::IActor* CreatePGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
+ const NActors::TActorId& databaseProxy);
+
+} // namespace NPG
diff --git a/ydb/core/pgproxy/pg_listener.h b/ydb/core/pgproxy/pg_listener.h
index e5c578414f..31488c0dd1 100644
--- a/ydb/core/pgproxy/pg_listener.h
+++ b/ydb/core/pgproxy/pg_listener.h
@@ -1,19 +1,20 @@
#pragma once
-#include <library/cpp/actors/core/actor.h>
-#include "pg_proxy_config.h"
-#include "pg_proxy_impl.h"
+#include <ydb/core/raw_socket/sock_listener.h>
+#include "pg_connection.h"
namespace NPG {
-struct TListenerSettings {
- uint16_t Port = 5432;
- TString CertificateFile;
- TString PrivateKeyFile;
- TString SslCertificatePem;
-};
-
-NActors::IActor* CreatePGListener(const NActors::TActorId& poller, const NActors::TActorId& databaseProxy, const TListenerSettings& settings = {});
+using namespace NKikimr::NRawSocket;
+inline NActors::IActor* CreatePGListener(const NActors::TActorId& poller, const NActors::TActorId databaseProxy,
+ const TListenerSettings& settings = {.Port = 5432}) {
+ return CreateSocketListener(
+ poller, settings,
+ [=](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
+ return CreatePGConnection(socket, address, databaseProxy);
+ },
+ NKikimrServices::EServiceKikimr::PGWIRE);
}
+} // namespace NPG
diff --git a/ydb/core/pgproxy/pg_proxy.cpp b/ydb/core/pgproxy/pg_proxy.cpp
index 242dab9269..9376f65af8 100644
--- a/ydb/core/pgproxy/pg_proxy.cpp
+++ b/ydb/core/pgproxy/pg_proxy.cpp
@@ -1,5 +1,6 @@
#include "pg_proxy.h"
+#include "pg_connection.h"
#include "pg_listener.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/interconnect/poller_actor.h>
@@ -7,6 +8,7 @@
namespace NPG {
using namespace NActors;
+using namespace NKikimr::NRawSocket;
class TPGProxy : public TActorBootstrapped<TPGProxy> {
public:
@@ -29,9 +31,8 @@ public:
TActorId Listener;
};
-
NActors::IActor* CreatePGProxy() {
return new TPGProxy();
}
-} \ No newline at end of file
+} // namespace NPG
diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h
index 32ad9662db..acb31d0d0d 100644
--- a/ydb/core/pgproxy/pg_proxy_events.h
+++ b/ydb/core/pgproxy/pg_proxy_events.h
@@ -2,11 +2,13 @@
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/event_local.h>
-#include "pg_proxy_config.h"
+#include <ydb/core/raw_socket/sock_config.h>
#include "pg_proxy_types.h"
namespace NPG {
+using namespace NKikimr::NRawSocket;
+
struct TEvPGEvents {
enum EEv {
EvConnectionOpened = EventSpaceBegin(NActors::TEvents::ES_PGWIRE),
diff --git a/ydb/core/pgproxy/pg_proxy_ut.cpp b/ydb/core/pgproxy/pg_proxy_ut.cpp
index 0387a1719a..5d92b948d2 100644
--- a/ydb/core/pgproxy/pg_proxy_ut.cpp
+++ b/ydb/core/pgproxy/pg_proxy_ut.cpp
@@ -5,7 +5,6 @@
#include <library/cpp/actors/testlib/test_runtime.h>
#include <ydb/core/pgproxy/pg_proxy.h>
-#include <ydb/core/pgproxy/pg_listener.h>
#include <ydb/core/pgproxy/pg_log.h>
#include <ydb/core/pgproxy/pg_proxy_events.h>
#include <ydb/core/protos/services.pb.h>
@@ -13,12 +12,16 @@
#include <util/network/socket.h>
#include <util/string/hex.h>
+#include "pg_listener.h"
+
#ifdef NDEBUG
#define Ctest Cnull
#else
#define Ctest Cerr
#endif
+using namespace NKikimr::NRawSocket;
+
class TTestActorRuntime : public NActors::TTestActorRuntimeBase {
public:
void InitNodeImpl(TNodeDataBase* node, size_t nodeIndex) override {
diff --git a/ydb/core/pgproxy/ya.make b/ydb/core/pgproxy/ya.make
index e84082c662..50492de467 100644
--- a/ydb/core/pgproxy/ya.make
+++ b/ydb/core/pgproxy/ya.make
@@ -3,19 +3,14 @@ LIBRARY()
SRCS(
pg_connection.cpp
pg_connection.h
- pg_listener.cpp
pg_listener.h
pg_log_impl.h
pg_log.h
- pg_proxy_config.h
pg_proxy_events.h
- pg_proxy_impl.h
- pg_proxy_ssl.h
pg_proxy_types.cpp
pg_proxy_types.h
pg_proxy.cpp
pg_proxy.h
- pg_sock64.h
pg_stream.h
)
@@ -24,6 +19,7 @@ PEERDIR(
library/cpp/actors/protos
ydb/core/base
ydb/core/protos
+ ydb/core/raw_socket
)
END()
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index fcd260195f..e09a4bd41b 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1788,6 +1788,13 @@ message TLocalPgWireConfig {
optional string SslCertificate = 5;
}
+message TKafkaProxyConfig {
+ optional bool EnableKafkaProxy = 1 [default = true];
+
+ optional int32 ListeningPort = 2 [default = 9092];
+ optional string SslCertificate = 3;
+}
+
message TAwsCompatibilityConfig {
optional string AwsRegion = 1;
}
@@ -1861,6 +1868,7 @@ message TAppConfig {
optional TColumnShardConfig ColumnShardConfig = 66;
optional TLocalPgWireConfig LocalPgWireConfig = 69;
optional TAwsCompatibilityConfig AwsCompatibilityConfig = 70;
+ optional TKafkaProxyConfig KafkaProxyConfig = 71;
repeated TNamedConfig NamedConfigs = 100;
optional string ClusterYamlConfig = 101;
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 241d0d5091..56f35e787b 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -369,6 +369,8 @@ enum EServiceKikimr {
EXT_INDEX = 1900;
TX_CONVEYOR = 2000;
ARROW_HELPER = 2100;
+
+ KAFKA_PROXY = 2200;
};
message TActivity {
diff --git a/ydb/core/kafkaproxy/CMakeLists.darwin-x86_64.txt b/ydb/core/raw_socket/CMakeLists.darwin-x86_64.txt
index 3e701fd246..f01952a82b 100644
--- a/ydb/core/kafkaproxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/raw_socket/CMakeLists.darwin-x86_64.txt
@@ -6,10 +6,9 @@
# original buildsystem will not be accepted.
-add_subdirectory(ut)
-add_library(ydb-core-kafkaproxy)
-target_link_libraries(ydb-core-kafkaproxy PUBLIC
+add_library(ydb-core-raw_socket)
+target_link_libraries(ydb-core-raw_socket PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
@@ -17,7 +16,6 @@ target_link_libraries(ydb-core-kafkaproxy PUBLIC
ydb-core-base
ydb-core-protos
)
-target_sources(ydb-core-kafkaproxy PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages_int.cpp
+target_sources(ydb-core-raw_socket PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/raw_socket/sock_listener.cpp
)
diff --git a/ydb/core/kafkaproxy/CMakeLists.linux-aarch64.txt b/ydb/core/raw_socket/CMakeLists.linux-aarch64.txt
index 59a73cf06e..7ddee08602 100644
--- a/ydb/core/kafkaproxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/raw_socket/CMakeLists.linux-aarch64.txt
@@ -6,10 +6,9 @@
# original buildsystem will not be accepted.
-add_subdirectory(ut)
-add_library(ydb-core-kafkaproxy)
-target_link_libraries(ydb-core-kafkaproxy PUBLIC
+add_library(ydb-core-raw_socket)
+target_link_libraries(ydb-core-raw_socket PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
@@ -18,7 +17,6 @@ target_link_libraries(ydb-core-kafkaproxy PUBLIC
ydb-core-base
ydb-core-protos
)
-target_sources(ydb-core-kafkaproxy PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages_int.cpp
+target_sources(ydb-core-raw_socket PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/raw_socket/sock_listener.cpp
)
diff --git a/ydb/core/kafkaproxy/CMakeLists.linux-x86_64.txt b/ydb/core/raw_socket/CMakeLists.linux-x86_64.txt
index 59a73cf06e..7ddee08602 100644
--- a/ydb/core/kafkaproxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/raw_socket/CMakeLists.linux-x86_64.txt
@@ -6,10 +6,9 @@
# original buildsystem will not be accepted.
-add_subdirectory(ut)
-add_library(ydb-core-kafkaproxy)
-target_link_libraries(ydb-core-kafkaproxy PUBLIC
+add_library(ydb-core-raw_socket)
+target_link_libraries(ydb-core-raw_socket PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
@@ -18,7 +17,6 @@ target_link_libraries(ydb-core-kafkaproxy PUBLIC
ydb-core-base
ydb-core-protos
)
-target_sources(ydb-core-kafkaproxy PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages_int.cpp
+target_sources(ydb-core-raw_socket PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/raw_socket/sock_listener.cpp
)
diff --git a/ydb/core/raw_socket/CMakeLists.txt b/ydb/core/raw_socket/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/raw_socket/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/kafkaproxy/CMakeLists.windows-x86_64.txt b/ydb/core/raw_socket/CMakeLists.windows-x86_64.txt
index 3e701fd246..f01952a82b 100644
--- a/ydb/core/kafkaproxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/raw_socket/CMakeLists.windows-x86_64.txt
@@ -6,10 +6,9 @@
# original buildsystem will not be accepted.
-add_subdirectory(ut)
-add_library(ydb-core-kafkaproxy)
-target_link_libraries(ydb-core-kafkaproxy PUBLIC
+add_library(ydb-core-raw_socket)
+target_link_libraries(ydb-core-raw_socket PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
@@ -17,7 +16,6 @@ target_link_libraries(ydb-core-kafkaproxy PUBLIC
ydb-core-base
ydb-core-protos
)
-target_sources(ydb-core-kafkaproxy PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/kafkaproxy/kafka_messages_int.cpp
+target_sources(ydb-core-raw_socket PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/raw_socket/sock_listener.cpp
)
diff --git a/ydb/core/pgproxy/pg_sock64.h b/ydb/core/raw_socket/sock64.h
index da12e4d0fd..db05a6ab15 100644
--- a/ydb/core/pgproxy/pg_sock64.h
+++ b/ydb/core/raw_socket/sock64.h
@@ -3,9 +3,9 @@
#include <optional>
#include <util/network/sock.h>
#include <library/cpp/actors/interconnect/poller_actor.h>
-#include "pg_proxy_ssl.h"
+#include "sock_ssl.h"
-namespace NPG {
+namespace NKikimr::NRawSocket {
class TInet64StreamSocket : public TStreamSocket {
using TBase = TStreamSocket;
@@ -194,4 +194,4 @@ public:
}
};
-}
+} // namespace NKikimr::NRawSocket
diff --git a/ydb/core/pgproxy/pg_proxy_config.h b/ydb/core/raw_socket/sock_config.h
index 364ea7c832..011049d03a 100644
--- a/ydb/core/pgproxy/pg_proxy_config.h
+++ b/ydb/core/raw_socket/sock_config.h
@@ -3,21 +3,23 @@
#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/protos/services_common.pb.h>
-#include "pg_sock64.h"
-namespace NPG {
+#include "sock64.h"
+
+namespace NKikimr::NRawSocket {
struct TNetworkConfig {
static constexpr size_t BUFFER_SIZE = 1 * 1024;
static constexpr int LISTEN_QUEUE = 10;
static constexpr TDuration SOCKET_TIMEOUT = TDuration::MilliSeconds(60000);
static constexpr TDuration CONNECTION_TIMEOUT = TDuration::MilliSeconds(60000);
+
using TSocketType = TInet64StreamSocket;
using TSecureSocketType = TInet64SecureStreamSocket;
using TSocketAddressType = std::shared_ptr<ISockAddr>;
};
-}
+} // namespace NKikimr::NRawSocket
inline IOutputStream& operator <<(IOutputStream& out, const std::shared_ptr<ISockAddr>& addr) {
return out << addr->ToString();
diff --git a/ydb/core/pgproxy/pg_proxy_impl.h b/ydb/core/raw_socket/sock_impl.h
index 60bd67ddef..632acaa1cf 100644
--- a/ydb/core/pgproxy/pg_proxy_impl.h
+++ b/ydb/core/raw_socket/sock_impl.h
@@ -1,11 +1,11 @@
#pragma once
#include <library/cpp/actors/interconnect/poller_actor.h>
-#include "pg_proxy_config.h"
-#include "pg_sock64.h"
-#include "pg_proxy_ssl.h"
+#include "sock_config.h"
+#include "sock64.h"
+#include "sock_ssl.h"
-namespace NPG {
+namespace NKikimr::NRawSocket {
struct TEndpointInfo {
TSslHelpers::TSslHolder<SSL_CTX> SecureContext;
@@ -98,4 +98,4 @@ public:
}
};
-}
+} // namespace NKikimr::NRawSocket
diff --git a/ydb/core/pgproxy/pg_listener.cpp b/ydb/core/raw_socket/sock_listener.cpp
index da74ad095e..9a443ce021 100644
--- a/ydb/core/pgproxy/pg_listener.cpp
+++ b/ydb/core/raw_socket/sock_listener.cpp
@@ -3,34 +3,37 @@
#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/interconnect/poller_actor.h>
#include <util/network/sock.h>
-#include "pg_proxy.h"
-#include "pg_listener.h"
-#include "pg_proxy_impl.h"
-#include "pg_proxy_config.h"
-#include "pg_sock64.h"
-#include "pg_connection.h"
-#include "pg_log_impl.h"
+#include <ydb/core/protos/services.pb.h>
-namespace NPG {
+#include "sock_listener.h"
+#include "sock_config.h"
+#include "sock64.h"
+
+namespace NKikimr::NRawSocket {
using namespace NActors;
-class TPGListener : public TActorBootstrapped<TPGListener>, TNetworkConfig {
+class TSocketListener: public TActorBootstrapped<TSocketListener>, TNetworkConfig {
public:
- using TBase = NActors::TActor<TPGListener>;
- using TThis = TPGListener;
+ using TBase = TActor<TSocketListener>;
+ using TThis = TSocketListener;
+
TActorId Poller;
- TActorId DatabaseProxy;
TListenerSettings Settings;
+ TConnectionCreator ConnectionCreator;
+ NKikimrServices::EServiceKikimr Service;
+
TIntrusivePtr<TSocketDescriptor> Socket;
- NActors::TPollerToken::TPtr PollerToken;
+ TPollerToken::TPtr PollerToken;
THashSet<TActorId> Connections;
- TPGListener(const TActorId& poller, const TActorId& databaseProxy, const TListenerSettings& settings)
+ TSocketListener(const TActorId& poller, const TListenerSettings& settings, const TConnectionCreator& connectionCreator,
+ NKikimrServices::EServiceKikimr service)
: Poller(poller)
- , DatabaseProxy(databaseProxy)
, Settings(settings)
- {}
+ , ConnectionCreator(connectionCreator)
+ , Service(service) {
+ }
STATEFN(StateWorking) {
switch (ev->GetTypeRewrite()) {
@@ -54,14 +57,15 @@ public:
err = Socket->Listen(LISTEN_QUEUE);
if (err == 0) {
- BLOG_D("Listening on " << bindAddress->ToString() << (endpoint->SecureContext ? " (ssl)" : ""));
+ LOG_INFO_S(*NActors::TlsActivationContext, Service,
+ "Listening on " << bindAddress->ToString() << (endpoint->SecureContext ? " (ssl)" : ""));
Socket->SetNonBlock();
Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
Become(&TThis::StateWorking);
return;
}
}
- BLOG_ERROR("Failed to listen on " << bindAddress->ToString());
+ LOG_ERROR_S(*NActors::TlsActivationContext, Service, "Failed to listen on " << bindAddress->ToString());
//abort();
PassAway();
}
@@ -84,7 +88,7 @@ public:
if (!socket) {
break;
}
- NActors::IActor* connectionSocket = CreatePGConnection(socket, addr, DatabaseProxy);
+ NActors::IActor* connectionSocket = ConnectionCreator(socket, addr);
NActors::TActorId connectionId = Register(connectionSocket);
Send(Poller, new TEvPollerRegister(socket, connectionId, connectionId));
Connections.emplace(connectionId);
@@ -97,9 +101,9 @@ public:
}
};
-
-NActors::IActor* CreatePGListener(const TActorId& poller, const TActorId& databaseProxy, const TListenerSettings& settings) {
- return new TPGListener(poller, databaseProxy, settings);
+NActors::IActor* CreateSocketListener(const NActors::TActorId& poller, const TListenerSettings& settings,
+ TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service) {
+ return new TSocketListener(poller, settings, connectionCreator, service);
}
-}
+} // namespace NKikimr::NRawSocket
diff --git a/ydb/core/raw_socket/sock_listener.h b/ydb/core/raw_socket/sock_listener.h
new file mode 100644
index 0000000000..7f8d5b464b
--- /dev/null
+++ b/ydb/core/raw_socket/sock_listener.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/core/protos/services.pb.h>
+
+#include "sock_impl.h"
+#include "sock_config.h"
+
+namespace NKikimr::NRawSocket {
+
+struct TListenerSettings {
+ uint16_t Port;
+ TString CertificateFile;
+ TString PrivateKeyFile;
+ TString SslCertificatePem;
+};
+
+using TConnectionCreator = std::function<NActors::IActor* (TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address)>;
+
+NActors::IActor* CreateSocketListener(const NActors::TActorId& poller, const TListenerSettings& settings,
+ TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service);
+
+} // namespace NKikimr::NRawSocket
diff --git a/ydb/core/pgproxy/pg_proxy_ssl.h b/ydb/core/raw_socket/sock_ssl.h
index 3b043f6917..64612ccedd 100644
--- a/ydb/core/pgproxy/pg_proxy_ssl.h
+++ b/ydb/core/raw_socket/sock_ssl.h
@@ -7,7 +7,7 @@
#include <memory>
#include <util/generic/string.h>
-namespace NPG {
+namespace NKikimr::NRawSocket {
struct TSslHelpers {
struct TSslDeleter {
@@ -210,4 +210,4 @@ struct TSslLayer : TSslHelpers {
}
};
-}
+} // namespace NKikimr::NRawSocket
diff --git a/ydb/core/kafkaproxy/ya.make b/ydb/core/raw_socket/ya.make
index fb108fb19a..c51a35deff 100644
--- a/ydb/core/kafkaproxy/ya.make
+++ b/ydb/core/raw_socket/ya.make
@@ -1,8 +1,12 @@
LIBRARY()
SRCS(
- kafka_messages.cpp
- kafka_messages_int.cpp
+ sock64.h
+ sock_config.h
+ sock_impl.h
+ sock_listener.cpp
+ sock_listener.h
+ sock_ssl.h
)
PEERDIR(
@@ -15,5 +19,5 @@ PEERDIR(
END()
RECURSE_FOR_TESTS(
- ut
+# ut
)
diff --git a/ydb/core/ya.make b/ydb/core/ya.make
index 4547bd3308..4450028a82 100644
--- a/ydb/core/ya.make
+++ b/ydb/core/ya.make
@@ -23,7 +23,7 @@ RECURSE(
health_check
http_proxy
io_formats
- kafkaproxy
+ kafka_proxy
kesus
keyvalue
kqp
@@ -40,6 +40,7 @@ RECURSE(
protos
public_http
quoter
+ raw_socket
scheme
scheme_types
security