aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2023-04-05 09:00:08 +0300
committerqrort <qrort@yandex-team.com>2023-04-05 09:00:08 +0300
commit8de2bc90eb9833b3cb38e8b6625e7b676073e8eb (patch)
tree84516a1a83e2efaf1b022a97917a31981b1d1568
parenta6800d9e4bb42789f36e52866d29ddf0b97f8a71 (diff)
downloadydb-8de2bc90eb9833b3cb38e8b6625e7b676073e8eb.tar.gz
move pgwire to ydb/apps
-rw-r--r--ydb/apps/CMakeLists.txt1
-rw-r--r--ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt35
-rw-r--r--ydb/apps/pgwire/CMakeLists.linux-aarch64.txt38
-rw-r--r--ydb/apps/pgwire/CMakeLists.linux-x86_64.txt40
-rw-r--r--ydb/apps/pgwire/CMakeLists.txt17
-rw-r--r--ydb/apps/pgwire/CMakeLists.windows-x86_64.txt28
-rw-r--r--ydb/apps/pgwire/appdata.h17
-rw-r--r--ydb/apps/pgwire/log_impl.h37
-rw-r--r--ydb/apps/pgwire/main.cpp10
-rw-r--r--ydb/apps/pgwire/pg_ydb_connection.cpp499
-rw-r--r--ydb/apps/pgwire/pg_ydb_connection.h10
-rw-r--r--ydb/apps/pgwire/pg_ydb_proxy.cpp163
-rw-r--r--ydb/apps/pgwire/pg_ydb_proxy.h9
-rw-r--r--ydb/apps/pgwire/pgwire.cpp148
-rw-r--r--ydb/apps/pgwire/pgwire.h36
-rw-r--r--ydb/apps/pgwire/signals.h35
-rw-r--r--ydb/core/pgproxy/pg_proxy_types.h7
17 files changed, 1128 insertions, 2 deletions
diff --git a/ydb/apps/CMakeLists.txt b/ydb/apps/CMakeLists.txt
index dee372cfa2..e1c6d2181e 100644
--- a/ydb/apps/CMakeLists.txt
+++ b/ydb/apps/CMakeLists.txt
@@ -6,5 +6,6 @@
# original buildsystem will not be accepted.
+add_subdirectory(pgwire)
add_subdirectory(ydb)
add_subdirectory(ydbd)
diff --git a/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt b/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..9460b9535e
--- /dev/null
+++ b/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,35 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(pgwire)
+target_link_libraries(pgwire PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-system
+ library-cpp-cpuid_check
+ ydb-core-base
+ ydb-core-pgproxy
+ api-grpc
+ cpp-client-ydb_driver
+ cpp-client-draft
+)
+target_link_options(pgwire PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(pgwire PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/main.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_connection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_proxy.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pgwire.cpp
+)
+vcs_info(pgwire)
diff --git a/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt b/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..92f7ae169c
--- /dev/null
+++ b/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,38 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(pgwire)
+target_link_libraries(pgwire PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-jemalloc
+ ydb-core-base
+ ydb-core-pgproxy
+ api-grpc
+ cpp-client-ydb_driver
+ cpp-client-draft
+)
+target_link_options(pgwire PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(pgwire PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/main.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_connection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_proxy.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pgwire.cpp
+)
+vcs_info(pgwire)
diff --git a/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt b/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..774015fe0b
--- /dev/null
+++ b/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,40 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(pgwire)
+target_link_libraries(pgwire PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ ydb-core-base
+ ydb-core-pgproxy
+ api-grpc
+ cpp-client-ydb_driver
+ cpp-client-draft
+)
+target_link_options(pgwire PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(pgwire PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/main.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_connection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_proxy.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pgwire.cpp
+)
+vcs_info(pgwire)
diff --git a/ydb/apps/pgwire/CMakeLists.txt b/ydb/apps/pgwire/CMakeLists.txt
new file mode 100644
index 0000000000..a692f90f36
--- /dev/null
+++ b/ydb/apps/pgwire/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64")
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt b/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..cb5f739061
--- /dev/null
+++ b/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(pgwire)
+target_link_libraries(pgwire PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-system
+ library-cpp-cpuid_check
+ ydb-core-base
+ ydb-core-pgproxy
+ api-grpc
+ cpp-client-ydb_driver
+ cpp-client-draft
+)
+target_sources(pgwire PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/main.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_connection.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_proxy.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pgwire.cpp
+)
+vcs_info(pgwire)
diff --git a/ydb/apps/pgwire/appdata.h b/ydb/apps/pgwire/appdata.h
new file mode 100644
index 0000000000..bdff4c9ff4
--- /dev/null
+++ b/ydb/apps/pgwire/appdata.h
@@ -0,0 +1,17 @@
+#pragma once
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/executor_thread.h>
+#include <library/cpp/monlib/metrics/metric_registry.h>
+
+namespace NPGW {
+
+struct TAppData {
+ std::shared_ptr<NMonitoring::TMetricRegistry> MetricRegistry;
+};
+
+inline TAppData* AppData() {
+ return NActors::TlsActivationContext->ExecutorThread.ActorSystem->template AppData<TAppData>();
+}
+
+}
diff --git a/ydb/apps/pgwire/log_impl.h b/ydb/apps/pgwire/log_impl.h
new file mode 100644
index 0000000000..2820335501
--- /dev/null
+++ b/ydb/apps/pgwire/log_impl.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/log.h>
+
+namespace NPGW {
+
+enum EService : NActors::NLog::EComponent {
+ MIN = 1100,
+ Logger,
+ PGYDB,
+ MAX
+};
+
+inline const TString& GetEServiceName(NActors::NLog::EComponent component) {
+ static const TString loggerName("LOGGER");
+ static const TString pgYdbName("PG-YDB");
+ static const TString unknownName("UNKNOWN");
+ switch (component) {
+ case EService::Logger:
+ return loggerName;
+ case EService::PGYDB:
+ return pgYdbName;
+ default:
+ return unknownName;
+ }
+}
+
+
+#define BLOG_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, EService::PGYDB, stream)
+#define BLOG_I(stream) LOG_INFO_S(*NActors::TlsActivationContext, EService::PGYDB, stream)
+#define BLOG_W(stream) LOG_WARN_S(*NActors::TlsActivationContext, EService::PGYDB, stream)
+#define BLOG_NOTICE(stream) LOG_NOTICE_S(*NActors::TlsActivationContext, EService::PGYDB, stream)
+#define BLOG_ERROR(stream) LOG_ERROR_S(*NActors::TlsActivationContext, EService::PGYDB, stream)
+
+
+}
diff --git a/ydb/apps/pgwire/main.cpp b/ydb/apps/pgwire/main.cpp
new file mode 100644
index 0000000000..5844b9821b
--- /dev/null
+++ b/ydb/apps/pgwire/main.cpp
@@ -0,0 +1,10 @@
+#include "pgwire.h"
+
+int main(int argc, char **argv) {
+ try {
+ return NPGW::TPgWire(argc, argv).Run();
+ } catch (const yexception& e) {
+ Cerr << "Caught exception: " << e.what() << Endl;
+ return 1;
+ }
+}
diff --git a/ydb/apps/pgwire/pg_ydb_connection.cpp b/ydb/apps/pgwire/pg_ydb_connection.cpp
new file mode 100644
index 0000000000..f4539bfd64
--- /dev/null
+++ b/ydb/apps/pgwire/pg_ydb_connection.cpp
@@ -0,0 +1,499 @@
+#include "pg_ydb_proxy.h"
+#include "pg_ydb_connection.h"
+#include "log_impl.h"
+#include <ydb/core/pgproxy/pg_proxy_events.h>
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+// temporarry borrowed from postgresql/src/backend/catalog/pg_type_d.h
+
+#define BOOLOID 16
+#define BYTEAOID 17
+#define CHAROID 18
+#define NAMEOID 19
+#define INT8OID 20
+#define INT2OID 21
+#define INT2VECTOROID 22
+#define INT4OID 23
+#define REGPROCOID 24
+#define TEXTOID 25
+
+//
+
+namespace NPGW {
+
+using namespace NActors;
+
+class TPgYdbConnection : public TActorBootstrapped<TPgYdbConnection> {
+ using TBase = TActorBootstrapped<TPgYdbConnection>;
+
+ struct TEvPrivate {
+ enum EEv {
+ EvParseComplete = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
+
+ struct TEvParseComplete : TEventLocal<TEvParseComplete, EvParseComplete> {
+ TString Name;
+ NYdb::NTable::TDataQuery DataQuery;
+
+ TEvParseComplete(const TString& name, NYdb::NTable::TDataQuery&& dataQuery)
+ : Name(name)
+ , DataQuery(std::move(dataQuery))
+ {}
+ };
+ };
+
+ struct TParsedStatement {
+ NPG::TPGParse::TQueryData QueryData;
+ NPG::TPGBind::TBindData BindData;
+ };
+
+ NYdb::TDriver Driver;
+ std::unordered_map<TString, TString> ConnectionParams;
+ std::unordered_map<TString, TParsedStatement> ParsedStatements;
+ TString CurrentStatement;
+ std::unique_ptr<NYdb::NScripting::TScriptingClient> ScriptingClient;
+ //std::unique_ptr<NYdb::NTable::TTableClient> TableClient;
+
+public:
+ TPgYdbConnection(NYdb::TDriver driver, std::unordered_map<TString, TString> params)
+ : Driver(std::move(driver))
+ , ConnectionParams(std::move(params))
+ {}
+
+ void Bootstrap() {
+ Become(&TPgYdbConnection::StateWork);
+ }
+
+ static TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser) {
+ switch (valueParser.GetPrimitiveType()) {
+ case NYdb::EPrimitiveType::Bool:
+ return TStringBuilder() << valueParser.GetBool();
+ case NYdb::EPrimitiveType::Int8:
+ return TStringBuilder() << valueParser.GetInt8();
+ case NYdb::EPrimitiveType::Uint8:
+ return TStringBuilder() << valueParser.GetUint8();
+ case NYdb::EPrimitiveType::Int16:
+ return TStringBuilder() << valueParser.GetInt16();
+ case NYdb::EPrimitiveType::Uint16:
+ return TStringBuilder() << valueParser.GetUint16();
+ case NYdb::EPrimitiveType::Int32:
+ return TStringBuilder() << valueParser.GetInt32();
+ case NYdb::EPrimitiveType::Uint32:
+ return TStringBuilder() << valueParser.GetUint32();
+ case NYdb::EPrimitiveType::Int64:
+ return TStringBuilder() << valueParser.GetInt64();
+ case NYdb::EPrimitiveType::Uint64:
+ return TStringBuilder() << valueParser.GetUint64();
+ case NYdb::EPrimitiveType::Float:
+ return TStringBuilder() << valueParser.GetFloat();
+ case NYdb::EPrimitiveType::Double:
+ return TStringBuilder() << valueParser.GetDouble();
+ case NYdb::EPrimitiveType::Utf8:
+ return TStringBuilder() << valueParser.GetUtf8();
+ case NYdb::EPrimitiveType::Date:
+ return valueParser.GetDate().ToString();
+ case NYdb::EPrimitiveType::Datetime:
+ return valueParser.GetDatetime().ToString();
+ case NYdb::EPrimitiveType::Timestamp:
+ return valueParser.GetTimestamp().ToString();
+ case NYdb::EPrimitiveType::Interval:
+ return TStringBuilder() << valueParser.GetInterval();
+ case NYdb::EPrimitiveType::TzDate:
+ return valueParser.GetTzDate();
+ case NYdb::EPrimitiveType::TzDatetime:
+ return valueParser.GetTzDatetime();
+ case NYdb::EPrimitiveType::TzTimestamp:
+ return valueParser.GetTzTimestamp();
+ case NYdb::EPrimitiveType::String:
+ return Base64Encode(valueParser.GetString());
+ case NYdb::EPrimitiveType::Yson:
+ return valueParser.GetYson();
+ case NYdb::EPrimitiveType::Json:
+ return valueParser.GetJson();
+ case NYdb::EPrimitiveType::JsonDocument:
+ return valueParser.GetJsonDocument();
+ case NYdb::EPrimitiveType::DyNumber:
+ return valueParser.GetDyNumber();
+ case NYdb::EPrimitiveType::Uuid:
+ return {};
+ }
+ return {};
+ }
+
+ static TString ColumnValueToString(NYdb::TValueParser& valueParser) {
+ switch (valueParser.GetKind()) {
+ case NYdb::TTypeParser::ETypeKind::Primitive:
+ return ColumnPrimitiveValueToString(valueParser);
+ case NYdb::TTypeParser::ETypeKind::Optional: {
+ TString value;
+ valueParser.OpenOptional();
+ if (valueParser.IsNull()) {
+ value = "NULL";
+ } else {
+ value = ColumnValueToString(valueParser);
+ }
+ valueParser.CloseOptional();
+ return value;
+ }
+ case NYdb::TTypeParser::ETypeKind::Tuple: {
+ TString value;
+ valueParser.OpenTuple();
+ while (valueParser.TryNextElement()) {
+ if (!value.empty()) {
+ value += ',';
+ }
+ value += ColumnValueToString(valueParser);
+ }
+ valueParser.CloseTuple();
+ return value;
+ }
+ case NYdb::TTypeParser::ETypeKind::Pg: {
+ return valueParser.GetPg().Content_;
+ }
+ default:
+ return {};
+ }
+ }
+
+ static TString ToPgSyntax(TStringBuf query) {
+ return TStringBuilder() << "--!syntax_pg\n" << query;
+ }
+
+ NYdb::NTable::TClientSettings GetClientSettings() {
+ NYdb::NTable::TClientSettings clientSettings;
+ if (ConnectionParams.count("database")) {
+ clientSettings.Database(ConnectionParams["database"]);
+ }
+ if (ConnectionParams.count("auth-token")) {
+ clientSettings.AuthToken(ConnectionParams["auth-token"]);
+ }
+ return clientSettings;
+ }
+
+ void BootstrapScriptingClient() {
+ ScriptingClient = std::make_unique<NYdb::NScripting::TScriptingClient>(Driver, GetClientSettings());
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) {
+ BLOG_D("TEvQuery " << ev->Sender);
+
+ if (!ScriptingClient) {
+ BootstrapScriptingClient();
+ }
+
+ TActorSystem* actorSystem = TActivationContext::ActorSystem();
+
+ ScriptingClient->ExecuteYqlScript(ToPgSyntax(ev->Get()->Message->GetQuery())).Subscribe([actorSystem, ev](const NYdb::NScripting::TAsyncExecuteYqlResult& feature) {
+ auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
+ try {
+ NYdb::NScripting::TExecuteYqlResult result = NYdb::NScripting::TAsyncExecuteYqlResult(feature).ExtractValue();
+ if (result.IsSuccess()) {
+ const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets();
+ if (!resultSets.empty()) {
+ NYdb::TResultSet resultSet = resultSets[0];
+
+ {
+ for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
+ // TODO: fill data types and sizes
+ response->DataFields.push_back({
+ .Name = column.Name,
+ });
+ }
+ }
+
+ {
+ NYdb::TResultSetParser parser(std::move(resultSet));
+ while (parser.TryNextRow()) {
+ response->DataRows.emplace_back();
+ auto& row = response->DataRows.back();
+ row.resize(parser.ColumnsCount());
+ for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
+ row[index] = ColumnValueToString(parser.ColumnParser(index));
+ }
+ }
+ }
+ }
+ } else {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)result});
+ }
+ }
+ catch (const std::exception& e) {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', e.what()});
+ }
+
+ actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie);
+ });
+ }
+
+ /*void BootstrapTableClient() {
+ TableClient = std::make_unique<NYdb::NTable::TTableClient>(Driver, GetClientSettings());
+ }*/
+
+ void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) {
+ BLOG_D("TEvParse " << ev->Sender);
+
+ /*if (!TableClient) {
+ BootstrapTableClient();
+ }
+
+ TActorSystem* actorSystem = TActivationContext::ActorSystem();
+ TActorId selfId = SelfId();
+
+ TableClient->RetryOperation([actorSystem, ev, selfId](NYdb::NTable::TSession session) {
+ auto queryData = ev->Get()->Message->GetQueryData();
+ const TString& query = queryData.Query;
+ return session.PrepareDataQuery(query).Apply([actorSystem, ev, selfId, queryData](NYdb::NTable::TAsyncPrepareQueryResult result) mutable -> NThreading::TFuture<NYdb::TStatus> {
+ NYdb::NTable::TPrepareQueryResult res = result.ExtractValue();
+
+ if (res.IsSuccess()) {
+ auto privateParseComplete = std::make_unique<TEvPrivate::TEvParseComplete>(queryData.Name, res.GetQuery());
+ actorSystem->Send(selfId, privateParseComplete.release());
+ auto parseComplete = std::make_unique<NPG::TEvPGEvents::TEvParseComplete>();
+ parseComplete->OriginalMessage = std::move(ev->Get()->Message);
+ actorSystem->Send(ev->Sender, parseComplete.release());
+ } else {
+ auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvErrorResponse>();
+ errorResponse->ErrorFields.push_back({'E', "ERROR"});
+ errorResponse->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)res});
+ actorSystem->Send(ev->Sender, errorResponse.release());
+ }
+ return NThreading::MakeFuture<NYdb::TStatus>(res);
+ });
+ });*/
+
+ /*auto result = ParsedQueries.emplace(std::piecewise_construct, std::make_tuple(ev->Get()->Name), std::make_tuple(ev->Get()->DataQuery));
+ if (!result.second) {
+ result.first->second.DataQuery = std::move(ev->Get()->DataQuery);
+ }*/
+ auto queryData = ev->Get()->Message->GetQueryData();
+ ParsedStatements[queryData.Name].QueryData = queryData;
+
+ auto parseComplete = ev->Get()->Reply();
+ Send(ev->Sender, parseComplete.release());
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) {
+ auto bindData = ev->Get()->Message->GetBindData();
+ ParsedStatements[bindData.StatementName].BindData = bindData;
+ CurrentStatement = bindData.StatementName;
+ BLOG_D("TEvBind CurrentStatement changed to " << CurrentStatement);
+
+ auto bindComplete = ev->Get()->Reply();
+ Send(ev->Sender, bindComplete.release());
+ }
+
+ struct TConvertedQuery {
+ TString Query;
+ NYdb::TParams Params;
+ };
+
+ TConvertedQuery ConvertQuery(const TParsedStatement& statement) {
+ auto& bindData = statement.BindData;
+ const auto& queryData = statement.QueryData;
+ NYdb::TParamsBuilder paramsBuilder;
+ TStringBuilder injectedQuery;
+
+ for (size_t idxParam = 0; idxParam < queryData.ParametersTypes.size(); ++idxParam) {
+ int32_t paramType = queryData.ParametersTypes[idxParam];
+ TString paramValue;
+ if (idxParam < bindData.ParametersValue.size()) {
+ std::vector<uint8_t> paramVal = bindData.ParametersValue[idxParam];
+ paramValue = TString(reinterpret_cast<char*>(paramVal.data()), paramVal.size());
+ }
+ switch (paramType) {
+ case INT2OID:
+ paramsBuilder.AddParam(TStringBuilder() << "$_" << idxParam + 1).Int16(atoi(paramValue.data())).Build();
+ injectedQuery << "DECLARE $_" << idxParam + 1 << " AS Int16;" << Endl;
+ break;
+
+ }
+ }
+ return {
+ .Query = injectedQuery + queryData.Query,
+ .Params = paramsBuilder.Build(),
+ };
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvDescribe::TPtr& ev) {
+ BLOG_D("TEvDescribe " << ev->Sender);
+
+ TString statementName = ev->Get()->Message->GetDescribeData().Name;
+ if (statementName.empty()) {
+ statementName = CurrentStatement;
+ BLOG_W("TEvExecute changed empty statement to " << CurrentStatement);
+ }
+ auto it = ParsedStatements.find(statementName);
+ if (it == ParsedStatements.end()) {
+ auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>();
+ errorResponse->ErrorFields.push_back({'E', "ERROR"});
+ errorResponse->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << statementName << "\" not found"});
+ Send(ev->Sender, errorResponse.release(), 0, ev->Cookie);
+ return;
+ }
+
+ if (!ScriptingClient) {
+ BootstrapScriptingClient();
+ }
+
+ auto query = ConvertQuery(it->second);
+ TActorSystem* actorSystem = TActivationContext::ActorSystem();
+
+ // TODO: it's wrong. it should be done using explain to get all result meta. but it's not ready yet.
+
+ ScriptingClient->ExecuteYqlScript(query.Query, query.Params).Subscribe([actorSystem, ev](const NYdb::NScripting::TAsyncExecuteYqlResult& feature) {
+ auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>();
+ try {
+ NYdb::NScripting::TExecuteYqlResult result = NYdb::NScripting::TAsyncExecuteYqlResult(feature).ExtractValue();
+ if (result.IsSuccess()) {
+ const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets();
+ if (!resultSets.empty()) {
+ NYdb::TResultSet resultSet = resultSets[0];
+
+ {
+ for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
+ // TODO: fill data types and sizes
+ response->DataFields.push_back({
+ .Name = column.Name,
+ });
+ }
+ }
+ }
+ } else {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)result});
+ }
+ }
+ catch (const std::exception& e) {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', e.what()});
+ }
+
+ actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie);
+ });
+
+ /*
+ TableClient->RetryOperation([actorSystem, sender, dataQuery](NYdb::NTable::TSession session) mutable {
+ session.
+ return dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx()).Apply([actorSystem, sender](NYdb::NTable::TAsyncDataQueryResult result) mutable -> NThreading::TFuture<NYdb::TStatus> {
+ NYdb::NTable::TDataQueryResult res = result.ExtractValue();
+
+ if (res.IsSuccess()) {
+
+ const TVector<NYdb::TResultSet>& resultSets = res.GetResultSets();
+ if (!resultSets.empty()) {
+ NYdb::TResultSet resultSet = resultSets[0];
+
+ {
+ auto rowDescription = std::make_unique<NPG::TEvPGEvents::TEvRowDescription>();
+ for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
+ // TODO: fill data types and sizes
+ rowDescription->Fields.push_back({
+ .Name = column.Name,
+ });
+ }
+
+ actorSystem->Send(sender, rowDescription.release());
+ }
+ }
+ } else {
+ auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvErrorResponse>();
+ errorResponse->ErrorFields.push_back({'E', "ERROR"});
+ errorResponse->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)res});
+ actorSystem->Send(sender, errorResponse.release());
+ }
+ return NThreading::MakeFuture<NYdb::TStatus>(res);
+ });
+ });*/
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) {
+ BLOG_D("TEvExecute " << ev->Sender);
+
+ TString statementName = ev->Get()->Message->GetExecuteData().PortalName;
+ if (statementName.empty()) {
+ statementName = CurrentStatement;
+ BLOG_W("TEvExecute changed empty statement to " << CurrentStatement);
+ }
+ auto it = ParsedStatements.find(statementName);
+ if (it == ParsedStatements.end()) {
+ auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvExecuteResponse>();
+ errorResponse->ErrorFields.push_back({'E', "ERROR"});
+ errorResponse->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << statementName << "\" not found"});
+ Send(ev->Sender, errorResponse.release(), 0, ev->Cookie);
+ return;
+ }
+
+ if (!ScriptingClient) {
+ BootstrapScriptingClient();
+ }
+
+ TActorSystem* actorSystem = TActivationContext::ActorSystem();
+ auto query = ConvertQuery(it->second);
+
+ ScriptingClient->ExecuteYqlScript(query.Query, query.Params).Subscribe([actorSystem, ev](const NYdb::NScripting::TAsyncExecuteYqlResult& feature) mutable {
+ auto response = std::make_unique<NPG::TEvPGEvents::TEvExecuteResponse>();
+ try {
+ NYdb::NScripting::TExecuteYqlResult result = NYdb::NScripting::TAsyncExecuteYqlResult(feature).ExtractValue();
+ if (result.IsSuccess()) {
+ const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets();
+ if (!resultSets.empty()) {
+ NYdb::TResultSet resultSet = resultSets[0];
+
+ {
+ auto maxRows = ev->Get()->Message->GetExecuteData().MaxRows;
+ NYdb::TResultSetParser parser(std::move(resultSet));
+ while (parser.TryNextRow()) {
+ response->DataRows.emplace_back();
+ auto& row = response->DataRows.back();
+ row.resize(parser.ColumnsCount());
+ for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
+ row[index] = ColumnValueToString(parser.ColumnParser(index));
+ }
+ if (maxRows != 0) {
+ if (--maxRows == 0) {
+ break;
+ }
+ }
+ }
+ }
+ }
+ } else {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)result});
+ }
+ }
+ catch (const std::exception& e) {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', e.what()});
+ }
+
+ actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie);
+ });
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NPG::TEvPGEvents::TEvQuery, Handle);
+ hFunc(NPG::TEvPGEvents::TEvParse, Handle);
+ hFunc(NPG::TEvPGEvents::TEvBind, Handle);
+ hFunc(NPG::TEvPGEvents::TEvDescribe, Handle);
+ hFunc(NPG::TEvPGEvents::TEvExecute, Handle);
+ cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
+ }
+ }
+};
+
+
+NActors::IActor* CreateConnection(NYdb::TDriver driver, std::unordered_map<TString, TString> params) {
+ return new TPgYdbConnection(std::move(driver), std::move(params));
+}
+
+}
diff --git a/ydb/apps/pgwire/pg_ydb_connection.h b/ydb/apps/pgwire/pg_ydb_connection.h
new file mode 100644
index 0000000000..5adf018ce5
--- /dev/null
+++ b/ydb/apps/pgwire/pg_ydb_connection.h
@@ -0,0 +1,10 @@
+#pragma once
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+#include <ydb/public/sdk/cpp/client/ydb_common_client/settings.h>
+
+namespace NPGW {
+
+NActors::IActor* CreateConnection(NYdb::TDriver driver, std::unordered_map<TString, TString> params);
+
+}
diff --git a/ydb/apps/pgwire/pg_ydb_proxy.cpp b/ydb/apps/pgwire/pg_ydb_proxy.cpp
new file mode 100644
index 0000000000..836dab9ca7
--- /dev/null
+++ b/ydb/apps/pgwire/pg_ydb_proxy.cpp
@@ -0,0 +1,163 @@
+#include "pg_ydb_proxy.h"
+#include "pg_ydb_connection.h"
+#include "log_impl.h"
+#include <ydb/core/pgproxy/pg_proxy_events.h>
+#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
+#include <library/cpp/actors/core/actor.h>
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h>
+
+namespace NPGW {
+
+using namespace NActors;
+
+class TPgYdbProxy : public TActor<TPgYdbProxy> {
+ struct TSecurityState {
+ TString Token;
+ };
+
+ using TBase = TActor<TPgYdbProxy>;
+ NYdb::TDriver Driver;
+ std::shared_ptr<NYdb::TGRpcConnectionsImpl> Connections;
+ std::unordered_map<TActorId, TActorId> PgToYdbConnection;
+ std::unordered_map<TActorId, TSecurityState> SecurityState;
+
+public:
+ TPgYdbProxy(const NYdb::TDriverConfig& driverConfig)
+ : TBase(&TPgYdbProxy::StateWork)
+ , Driver(driverConfig)
+ , Connections(CreateInternalInterface(Driver))
+ {
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvAuth::TPtr& ev) {
+ std::unordered_map<TString, TString> clientParams = ev->Get()->InitialMessage->GetClientParams();
+ BLOG_D("TEvAuth " << ev->Get()->InitialMessage->Dump());
+ Ydb::Auth::LoginRequest request;
+ request.set_user(clientParams["user"]);
+ if (ev->Get()->PasswordMessage) {
+ request.set_password(TString(ev->Get()->PasswordMessage->GetPassword()));
+ }
+ TActorSystem* actorSystem = TActivationContext::ActorSystem();
+ TActorId sender = ev->Sender;
+
+ auto responseCb = [this, actorSystem, sender](Ydb::Auth::LoginResponse* resp, NYdb::TPlainStatus status) {
+ auto authResponse = std::make_unique<NPG::TEvPGEvents::TEvAuthResponse>();
+ if (status.Ok() && resp->operation().status() == Ydb::StatusIds::SUCCESS) {
+ Ydb::Auth::LoginResult result;
+ resp->operation().result().UnpackTo(&result);
+ this->SecurityState[sender].Token = result.token();
+ } else {
+ if (!status.Ok()) {
+ authResponse->Error = status.Issues.ToOneLineString();
+ }
+ if (resp != nullptr) {
+ if (resp->operation().issues_size() > 0) {
+ authResponse->Error = resp->operation().issues(0).message();
+ } else {
+ authResponse->Error = Ydb::StatusIds_StatusCode_Name(resp->operation().status());
+ }
+ } else {
+ if (status.Status == NYdb::EStatus::CLIENT_CALL_UNIMPLEMENTED) {
+ authResponse->Error = "The database doesn't support authentication";
+ } else {
+ authResponse->Error = "Authentication error";
+ }
+ }
+ }
+ actorSystem->Send(sender, authResponse.release());
+ };
+
+ TString database = clientParams["database"];
+ NYdb::TDbDriverStatePtr driverState = Connections->GetDriverState(database, {}, {}, {}, {});
+
+ NYdb::TRpcRequestSettings rpcSettings;
+ rpcSettings.ClientTimeout = TDuration::Seconds(60);
+
+ NYdb::TGRpcConnectionsImpl::RunOnDiscoveryEndpoint<Ydb::Auth::V1::AuthService, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>(
+ driverState,
+ std::move(request),
+ std::move(responseCb),
+ &Ydb::Auth::V1::AuthService::Stub::AsyncLogin,
+ rpcSettings);
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvConnectionOpened::TPtr& ev) {
+ BLOG_D("TEvConnectionOpened " << ev->Sender);
+ auto params = ev->Get()->Message->GetClientParams();
+ auto itSecurityState = SecurityState.find(ev->Sender);
+ if (itSecurityState != SecurityState.end() && !itSecurityState->second.Token.empty()) {
+ params["auth-token"] = itSecurityState->second.Token;
+ }
+ IActor* actor = CreateConnection(Driver, std::move(params));
+ TActorId actorId = Register(actor);
+ PgToYdbConnection[ev->Sender] = actorId;
+ BLOG_D("Created ydb connection " << actorId);
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvConnectionClosed::TPtr& ev) {
+ BLOG_D("TEvConnectionClosed " << ev->Sender);
+ auto itConnection = PgToYdbConnection.find(ev->Sender);
+ if (itConnection != PgToYdbConnection.end()) {
+ Send(itConnection->second, new TEvents::TEvPoisonPill());
+ BLOG_D("Destroyed ydb connection " << itConnection->second);
+ }
+ SecurityState.erase(ev->Sender);
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) {
+ auto itConnection = PgToYdbConnection.find(ev->Sender);
+ if (itConnection != PgToYdbConnection.end()) {
+ Forward(ev, itConnection->second);
+ }
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) {
+ auto itConnection = PgToYdbConnection.find(ev->Sender);
+ if (itConnection != PgToYdbConnection.end()) {
+ Forward(ev, itConnection->second);
+ }
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) {
+ auto itConnection = PgToYdbConnection.find(ev->Sender);
+ if (itConnection != PgToYdbConnection.end()) {
+ Forward(ev, itConnection->second);
+ }
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvDescribe::TPtr& ev) {
+ auto itConnection = PgToYdbConnection.find(ev->Sender);
+ if (itConnection != PgToYdbConnection.end()) {
+ Forward(ev, itConnection->second);
+ }
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) {
+ auto itConnection = PgToYdbConnection.find(ev->Sender);
+ if (itConnection != PgToYdbConnection.end()) {
+ Forward(ev, itConnection->second);
+ BLOG_D("Forwarded to ydb connection " << itConnection->second);
+ }
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NPG::TEvPGEvents::TEvAuth, Handle);
+ hFunc(NPG::TEvPGEvents::TEvConnectionOpened, Handle);
+ hFunc(NPG::TEvPGEvents::TEvConnectionClosed, Handle);
+ hFunc(NPG::TEvPGEvents::TEvQuery, Handle);
+ hFunc(NPG::TEvPGEvents::TEvParse, Handle);
+ hFunc(NPG::TEvPGEvents::TEvBind, Handle);
+ hFunc(NPG::TEvPGEvents::TEvDescribe, Handle);
+ hFunc(NPG::TEvPGEvents::TEvExecute, Handle);
+ }
+ }
+};
+
+
+NActors::IActor* CreateDatabaseProxy(const NYdb::TDriverConfig& driverConfig) {
+ return new TPgYdbProxy(driverConfig);
+}
+
+}
diff --git a/ydb/apps/pgwire/pg_ydb_proxy.h b/ydb/apps/pgwire/pg_ydb_proxy.h
new file mode 100644
index 0000000000..985013cd8e
--- /dev/null
+++ b/ydb/apps/pgwire/pg_ydb_proxy.h
@@ -0,0 +1,9 @@
+#pragma once
+#include <library/cpp/actors/core/actor.h>
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+
+namespace NPGW {
+
+NActors::IActor* CreateDatabaseProxy(const NYdb::TDriverConfig& driverConfig);
+
+}
diff --git a/ydb/apps/pgwire/pgwire.cpp b/ydb/apps/pgwire/pgwire.cpp
new file mode 100644
index 0000000000..2ce2b4adca
--- /dev/null
+++ b/ydb/apps/pgwire/pgwire.cpp
@@ -0,0 +1,148 @@
+#include <ydb/core/pgproxy/pg_listener.h>
+#include <ydb/core/pgproxy/pg_log.h>
+#include <util/system/mlock.h>
+#include <util/stream/file.h>
+#include <library/cpp/getopt/last_getopt.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+#include <library/cpp/actors/core/process_stats.h>
+#include <library/cpp/actors/protos/services_common.pb.h>
+#include <library/cpp/actors/interconnect/poller_actor.h>
+#include "pgwire.h"
+#include "log_impl.h"
+#include "pg_ydb_proxy.h"
+
+namespace NPGW {
+
+std::atomic<bool> TPgWire::Quit;
+
+void TPgWire::OnTerminate(int) {
+ Quit = true;
+}
+
+int TPgWire::Init() {
+ ActorSystem->Start();
+
+ ActorSystem->Register(NActors::CreateProcStatCollector(TDuration::Seconds(5), AppData.MetricRegistry));
+
+ Poller = ActorSystem->Register(NActors::CreatePollerActor());
+
+ NYdb::TDriverConfig driverConfig;
+
+ driverConfig.SetEndpoint(Endpoint);
+
+ DatabaseProxy = ActorSystem->Register(CreateDatabaseProxy(driverConfig));
+ Listener = ActorSystem->Register(NPG::CreatePGListener(Poller, DatabaseProxy, {
+ .Port = ListeningPort,
+ .SslCertificatePem = SslCertificate,
+ }));
+
+ return 0;
+}
+
+int TPgWire::Run() {
+ try {
+ int res = Init();
+ if (res != 0) {
+ return res;
+ }
+#ifndef NDEBUG
+ Cout << "Started" << Endl;
+#endif
+ while (!Quit) {
+ Sleep(TDuration::MilliSeconds(100));
+ }
+#ifndef NDEBUG
+ Cout << Endl << "Finished" << Endl;
+#endif
+ Shutdown();
+ }
+ catch (const yexception& e) {
+ Cerr << e.what() << Endl;
+ return 1;
+ }
+ return 0;
+}
+
+int TPgWire::Shutdown() {
+ ActorSystem->Stop();
+ return 0;
+}
+
+TPgWire::TPgWire(int argc, char** argv) {
+ NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default();
+ bool useStdErr = false;
+ bool mlock = false;
+ TString sslCertificateFile;
+ opts.AddLongOption("endpoint", "YDB endpoint to connect").Required().RequiredArgument("ENDPOINT").StoreResult(&Endpoint);
+ opts.AddLongOption("port", "Listening port").Optional().DefaultValue("5432").RequiredArgument("PORT").StoreResult(&ListeningPort);
+ opts.AddLongOption("stderr", "Redirect log to stderr").NoArgument().SetFlag(&useStdErr);
+ opts.AddLongOption("mlock", "Lock resident memory").NoArgument().SetFlag(&mlock);
+ opts.AddLongOption("ssl-cert-file", "Path to file with SSL certificate including private key").RequiredArgument("PATH").StoreResult(&sslCertificateFile);
+
+ NLastGetopt::TOptsParseResult res(&opts, argc, argv);
+
+ if (mlock) {
+ LockAllMemory(LockCurrentMemory);
+ }
+
+ THolder<NActors::TActorSystemSetup> actorSystemSetup = BuildActorSystemSetup();
+
+ TIntrusivePtr<NActors::NLog::TSettings> loggerSettings = BuildLoggerSettings();
+ NActors::TLoggerActor* loggerActor = new NActors::TLoggerActor(
+ loggerSettings,
+ useStdErr ? NActors::CreateStderrBackend() : NActors::CreateSysLogBackend("pgwire", false, true),
+ NMonitoring::TMetricRegistry::SharedInstance());
+ actorSystemSetup->LocalServices.emplace_back(loggerSettings->LoggerActorId, NActors::TActorSetupCmd(loggerActor, NActors::TMailboxType::HTSwap, 0));
+
+ ActorSystem = std::make_unique<NActors::TActorSystem>(actorSystemSetup, &AppData, loggerSettings);
+ AppData.MetricRegistry = NMonitoring::TMetricRegistry::SharedInstance();
+
+ if (!sslCertificateFile.empty()) {
+ TString sslCertificate = TUnbufferedFileInput(sslCertificateFile).ReadAll();
+ if (!sslCertificate.empty()) {
+ SslCertificate = sslCertificate;
+ } else {
+ ythrow yexception() << "Invalid SSL certificate file";
+ }
+ }
+}
+
+TIntrusivePtr<NActors::NLog::TSettings> TPgWire::BuildLoggerSettings() {
+ const NActors::TActorId loggerActorId = NActors::TActorId(1, "logger");
+ TIntrusivePtr<NActors::NLog::TSettings> loggerSettings = new NActors::NLog::TSettings(loggerActorId, NPGW::EService::Logger, NActors::NLog::PRI_WARN);
+ loggerSettings->Append(
+ NActorsServices::EServiceCommon_MIN,
+ NActorsServices::EServiceCommon_MAX,
+ NActorsServices::EServiceCommon_Name
+ );
+ loggerSettings->Append(
+ NPG::EService::MIN,
+ NPG::EService::MAX,
+ NPG::GetEServiceName
+ );
+ loggerSettings->Append(
+ NPGW::EService::MIN,
+ NPGW::EService::MAX,
+ NPGW::GetEServiceName
+ );
+ TString explanation;
+ loggerSettings->SetLevel(NActors::NLog::PRI_DEBUG, NPG::EService::PGWIRE, explanation);
+ loggerSettings->SetLevel(NActors::NLog::PRI_DEBUG, NPGW::EService::PGYDB, explanation);
+ return loggerSettings;
+}
+
+THolder<NActors::TActorSystemSetup> TPgWire::BuildActorSystemSetup() {
+ THolder<NActors::TActorSystemSetup> setup = MakeHolder<NActors::TActorSystemSetup>();
+ setup->NodeId = 1;
+ setup->Executors.Reset(new TAutoPtr<NActors::IExecutorPool>[1]);
+ setup->ExecutorsCount = 1;
+ setup->Executors[0] = new NActors::TBasicExecutorPool(0, 4, 10);
+
+ setup->Scheduler = new NActors::TBasicSchedulerThread(NActors::TSchedulerConfig(512, 100));
+
+ return setup;
+}
+
+}
diff --git a/ydb/apps/pgwire/pgwire.h b/ydb/apps/pgwire/pgwire.h
new file mode 100644
index 0000000000..62e7fa9da6
--- /dev/null
+++ b/ydb/apps/pgwire/pgwire.h
@@ -0,0 +1,36 @@
+#pragma once
+#include <atomic>
+#include <library/cpp/actors/core/actorsystem.h>
+#include "signals.h"
+#include "appdata.h"
+
+namespace NPGW {
+
+class TPgWire {
+public:
+ static std::atomic<bool> Quit;
+ static void OnTerminate(int);
+ NSignals::TSignalHandler<SIGINT, &TPgWire::OnTerminate> SignalSIGINT;
+ NSignals::TSignalHandler<SIGTERM, &TPgWire::OnTerminate> SignalSIGTERM;
+ NSignals::TSignalIgnore<SIGPIPE> SignalSIGPIPE;
+
+ TPgWire(int argc, char** argv);
+ int Init();
+ int Run();
+ int Shutdown();
+ static THolder<NActors::TActorSystemSetup> BuildActorSystemSetup();
+ static TIntrusivePtr<NActors::NLog::TSettings> BuildLoggerSettings();
+ static const TString& GetEServiceName(NActors::NLog::EComponent component);
+
+ TAppData AppData;
+ std::unique_ptr<NActors::TActorSystem> ActorSystem;
+ NActors::TActorId Poller;
+ NActors::TActorId Listener;
+ NActors::TActorId DatabaseProxy;
+ // arguments
+ TString Endpoint;
+ uint16_t ListeningPort = 5432;
+ TString SslCertificate;
+};
+
+}
diff --git a/ydb/apps/pgwire/signals.h b/ydb/apps/pgwire/signals.h
new file mode 100644
index 0000000000..359dc02250
--- /dev/null
+++ b/ydb/apps/pgwire/signals.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include <signal.h>
+
+namespace NSignals {
+
+using sighandler_t = void (*)(int);
+
+template <int signum, sighandler_t handler>
+struct TSignalHandler {
+ sighandler_t SavedHandler = nullptr;
+
+ TSignalHandler()
+ : SavedHandler(signal(signum, handler))
+ {}
+
+ ~TSignalHandler() {
+ signal(signum, SavedHandler);
+ }
+};
+
+template <int signum>
+struct TSignalIgnore {
+ sighandler_t SavedHandler = nullptr;
+
+ TSignalIgnore()
+ : SavedHandler(signal(signum, SIG_IGN))
+ {}
+
+ ~TSignalIgnore() {
+ signal(signum, SavedHandler);
+ }
+};
+
+}
diff --git a/ydb/core/pgproxy/pg_proxy_types.h b/ydb/core/pgproxy/pg_proxy_types.h
index 00ac72724a..be4f876f43 100644
--- a/ydb/core/pgproxy/pg_proxy_types.h
+++ b/ydb/core/pgproxy/pg_proxy_types.h
@@ -5,8 +5,11 @@
#include <vector>
#include <util/stream/format.h>
#include <util/string/builder.h>
-#include <arpa/inet.h>
-
+#if defined (_win_)
+ #include <winsock2.h>
+#elif defined (_unix_)
+ #include <arpa/inet.h>
+#endif
namespace NPG {
#pragma pack(push, 1)