diff options
author | qrort <qrort@yandex-team.com> | 2023-04-05 09:00:08 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2023-04-05 09:00:08 +0300 |
commit | 8de2bc90eb9833b3cb38e8b6625e7b676073e8eb (patch) | |
tree | 84516a1a83e2efaf1b022a97917a31981b1d1568 | |
parent | a6800d9e4bb42789f36e52866d29ddf0b97f8a71 (diff) | |
download | ydb-8de2bc90eb9833b3cb38e8b6625e7b676073e8eb.tar.gz |
move pgwire to ydb/apps
-rw-r--r-- | ydb/apps/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt | 35 | ||||
-rw-r--r-- | ydb/apps/pgwire/CMakeLists.linux-aarch64.txt | 38 | ||||
-rw-r--r-- | ydb/apps/pgwire/CMakeLists.linux-x86_64.txt | 40 | ||||
-rw-r--r-- | ydb/apps/pgwire/CMakeLists.txt | 17 | ||||
-rw-r--r-- | ydb/apps/pgwire/CMakeLists.windows-x86_64.txt | 28 | ||||
-rw-r--r-- | ydb/apps/pgwire/appdata.h | 17 | ||||
-rw-r--r-- | ydb/apps/pgwire/log_impl.h | 37 | ||||
-rw-r--r-- | ydb/apps/pgwire/main.cpp | 10 | ||||
-rw-r--r-- | ydb/apps/pgwire/pg_ydb_connection.cpp | 499 | ||||
-rw-r--r-- | ydb/apps/pgwire/pg_ydb_connection.h | 10 | ||||
-rw-r--r-- | ydb/apps/pgwire/pg_ydb_proxy.cpp | 163 | ||||
-rw-r--r-- | ydb/apps/pgwire/pg_ydb_proxy.h | 9 | ||||
-rw-r--r-- | ydb/apps/pgwire/pgwire.cpp | 148 | ||||
-rw-r--r-- | ydb/apps/pgwire/pgwire.h | 36 | ||||
-rw-r--r-- | ydb/apps/pgwire/signals.h | 35 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_proxy_types.h | 7 |
17 files changed, 1128 insertions, 2 deletions
diff --git a/ydb/apps/CMakeLists.txt b/ydb/apps/CMakeLists.txt index dee372cfa2..e1c6d2181e 100644 --- a/ydb/apps/CMakeLists.txt +++ b/ydb/apps/CMakeLists.txt @@ -6,5 +6,6 @@ # original buildsystem will not be accepted. +add_subdirectory(pgwire) add_subdirectory(ydb) add_subdirectory(ydbd) diff --git a/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt b/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..9460b9535e --- /dev/null +++ b/ydb/apps/pgwire/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,35 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(pgwire) +target_link_libraries(pgwire PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-system + library-cpp-cpuid_check + ydb-core-base + ydb-core-pgproxy + api-grpc + cpp-client-ydb_driver + cpp-client-draft +) +target_link_options(pgwire PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(pgwire PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/main.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_connection.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_proxy.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pgwire.cpp +) +vcs_info(pgwire) diff --git a/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt b/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..92f7ae169c --- /dev/null +++ b/ydb/apps/pgwire/CMakeLists.linux-aarch64.txt @@ -0,0 +1,38 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(pgwire) +target_link_libraries(pgwire PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-jemalloc + ydb-core-base + ydb-core-pgproxy + api-grpc + cpp-client-ydb_driver + cpp-client-draft +) +target_link_options(pgwire PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(pgwire PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/main.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_connection.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_proxy.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pgwire.cpp +) +vcs_info(pgwire) diff --git a/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt b/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..774015fe0b --- /dev/null +++ b/ydb/apps/pgwire/CMakeLists.linux-x86_64.txt @@ -0,0 +1,40 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(pgwire) +target_link_libraries(pgwire PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + ydb-core-base + ydb-core-pgproxy + api-grpc + cpp-client-ydb_driver + cpp-client-draft +) +target_link_options(pgwire PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(pgwire PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/main.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_connection.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_proxy.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pgwire.cpp +) +vcs_info(pgwire) diff --git a/ydb/apps/pgwire/CMakeLists.txt b/ydb/apps/pgwire/CMakeLists.txt new file mode 100644 index 0000000000..a692f90f36 --- /dev/null +++ b/ydb/apps/pgwire/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64") + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt b/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..cb5f739061 --- /dev/null +++ b/ydb/apps/pgwire/CMakeLists.windows-x86_64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(pgwire) +target_link_libraries(pgwire PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-system + library-cpp-cpuid_check + ydb-core-base + ydb-core-pgproxy + api-grpc + cpp-client-ydb_driver + cpp-client-draft +) +target_sources(pgwire PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/main.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_connection.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pg_ydb_proxy.cpp + ${CMAKE_SOURCE_DIR}/ydb/apps/pgwire/pgwire.cpp +) +vcs_info(pgwire) diff --git a/ydb/apps/pgwire/appdata.h b/ydb/apps/pgwire/appdata.h new file mode 100644 index 0000000000..bdff4c9ff4 --- /dev/null +++ b/ydb/apps/pgwire/appdata.h @@ -0,0 +1,17 @@ +#pragma once +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/executor_thread.h> +#include <library/cpp/monlib/metrics/metric_registry.h> + +namespace NPGW { + +struct TAppData { + std::shared_ptr<NMonitoring::TMetricRegistry> MetricRegistry; +}; + +inline TAppData* AppData() { + return NActors::TlsActivationContext->ExecutorThread.ActorSystem->template AppData<TAppData>(); +} + +} diff --git a/ydb/apps/pgwire/log_impl.h b/ydb/apps/pgwire/log_impl.h new file mode 100644 index 0000000000..2820335501 --- /dev/null +++ b/ydb/apps/pgwire/log_impl.h @@ -0,0 +1,37 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/log.h> + +namespace NPGW { + +enum EService : NActors::NLog::EComponent { + MIN = 1100, + Logger, + PGYDB, + MAX +}; + +inline const TString& GetEServiceName(NActors::NLog::EComponent component) { + static const TString loggerName("LOGGER"); + static const TString pgYdbName("PG-YDB"); + static const TString unknownName("UNKNOWN"); + switch (component) { + case EService::Logger: + return loggerName; + case EService::PGYDB: + return pgYdbName; + default: + return unknownName; + } +} + + +#define BLOG_D(stream) LOG_DEBUG_S(*NActors::TlsActivationContext, EService::PGYDB, stream) +#define BLOG_I(stream) LOG_INFO_S(*NActors::TlsActivationContext, EService::PGYDB, stream) +#define BLOG_W(stream) LOG_WARN_S(*NActors::TlsActivationContext, EService::PGYDB, stream) +#define BLOG_NOTICE(stream) LOG_NOTICE_S(*NActors::TlsActivationContext, EService::PGYDB, stream) +#define BLOG_ERROR(stream) LOG_ERROR_S(*NActors::TlsActivationContext, EService::PGYDB, stream) + + +} diff --git a/ydb/apps/pgwire/main.cpp b/ydb/apps/pgwire/main.cpp new file mode 100644 index 0000000000..5844b9821b --- /dev/null +++ b/ydb/apps/pgwire/main.cpp @@ -0,0 +1,10 @@ +#include "pgwire.h" + +int main(int argc, char **argv) { + try { + return NPGW::TPgWire(argc, argv).Run(); + } catch (const yexception& e) { + Cerr << "Caught exception: " << e.what() << Endl; + return 1; + } +} diff --git a/ydb/apps/pgwire/pg_ydb_connection.cpp b/ydb/apps/pgwire/pg_ydb_connection.cpp new file mode 100644 index 0000000000..f4539bfd64 --- /dev/null +++ b/ydb/apps/pgwire/pg_ydb_connection.cpp @@ -0,0 +1,499 @@ +#include "pg_ydb_proxy.h" +#include "pg_ydb_connection.h" +#include "log_impl.h" +#include <ydb/core/pgproxy/pg_proxy_events.h> +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> + +// temporarry borrowed from postgresql/src/backend/catalog/pg_type_d.h + +#define BOOLOID 16 +#define BYTEAOID 17 +#define CHAROID 18 +#define NAMEOID 19 +#define INT8OID 20 +#define INT2OID 21 +#define INT2VECTOROID 22 +#define INT4OID 23 +#define REGPROCOID 24 +#define TEXTOID 25 + +// + +namespace NPGW { + +using namespace NActors; + +class TPgYdbConnection : public TActorBootstrapped<TPgYdbConnection> { + using TBase = TActorBootstrapped<TPgYdbConnection>; + + struct TEvPrivate { + enum EEv { + EvParseComplete = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + struct TEvParseComplete : TEventLocal<TEvParseComplete, EvParseComplete> { + TString Name; + NYdb::NTable::TDataQuery DataQuery; + + TEvParseComplete(const TString& name, NYdb::NTable::TDataQuery&& dataQuery) + : Name(name) + , DataQuery(std::move(dataQuery)) + {} + }; + }; + + struct TParsedStatement { + NPG::TPGParse::TQueryData QueryData; + NPG::TPGBind::TBindData BindData; + }; + + NYdb::TDriver Driver; + std::unordered_map<TString, TString> ConnectionParams; + std::unordered_map<TString, TParsedStatement> ParsedStatements; + TString CurrentStatement; + std::unique_ptr<NYdb::NScripting::TScriptingClient> ScriptingClient; + //std::unique_ptr<NYdb::NTable::TTableClient> TableClient; + +public: + TPgYdbConnection(NYdb::TDriver driver, std::unordered_map<TString, TString> params) + : Driver(std::move(driver)) + , ConnectionParams(std::move(params)) + {} + + void Bootstrap() { + Become(&TPgYdbConnection::StateWork); + } + + static TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser) { + switch (valueParser.GetPrimitiveType()) { + case NYdb::EPrimitiveType::Bool: + return TStringBuilder() << valueParser.GetBool(); + case NYdb::EPrimitiveType::Int8: + return TStringBuilder() << valueParser.GetInt8(); + case NYdb::EPrimitiveType::Uint8: + return TStringBuilder() << valueParser.GetUint8(); + case NYdb::EPrimitiveType::Int16: + return TStringBuilder() << valueParser.GetInt16(); + case NYdb::EPrimitiveType::Uint16: + return TStringBuilder() << valueParser.GetUint16(); + case NYdb::EPrimitiveType::Int32: + return TStringBuilder() << valueParser.GetInt32(); + case NYdb::EPrimitiveType::Uint32: + return TStringBuilder() << valueParser.GetUint32(); + case NYdb::EPrimitiveType::Int64: + return TStringBuilder() << valueParser.GetInt64(); + case NYdb::EPrimitiveType::Uint64: + return TStringBuilder() << valueParser.GetUint64(); + case NYdb::EPrimitiveType::Float: + return TStringBuilder() << valueParser.GetFloat(); + case NYdb::EPrimitiveType::Double: + return TStringBuilder() << valueParser.GetDouble(); + case NYdb::EPrimitiveType::Utf8: + return TStringBuilder() << valueParser.GetUtf8(); + case NYdb::EPrimitiveType::Date: + return valueParser.GetDate().ToString(); + case NYdb::EPrimitiveType::Datetime: + return valueParser.GetDatetime().ToString(); + case NYdb::EPrimitiveType::Timestamp: + return valueParser.GetTimestamp().ToString(); + case NYdb::EPrimitiveType::Interval: + return TStringBuilder() << valueParser.GetInterval(); + case NYdb::EPrimitiveType::TzDate: + return valueParser.GetTzDate(); + case NYdb::EPrimitiveType::TzDatetime: + return valueParser.GetTzDatetime(); + case NYdb::EPrimitiveType::TzTimestamp: + return valueParser.GetTzTimestamp(); + case NYdb::EPrimitiveType::String: + return Base64Encode(valueParser.GetString()); + case NYdb::EPrimitiveType::Yson: + return valueParser.GetYson(); + case NYdb::EPrimitiveType::Json: + return valueParser.GetJson(); + case NYdb::EPrimitiveType::JsonDocument: + return valueParser.GetJsonDocument(); + case NYdb::EPrimitiveType::DyNumber: + return valueParser.GetDyNumber(); + case NYdb::EPrimitiveType::Uuid: + return {}; + } + return {}; + } + + static TString ColumnValueToString(NYdb::TValueParser& valueParser) { + switch (valueParser.GetKind()) { + case NYdb::TTypeParser::ETypeKind::Primitive: + return ColumnPrimitiveValueToString(valueParser); + case NYdb::TTypeParser::ETypeKind::Optional: { + TString value; + valueParser.OpenOptional(); + if (valueParser.IsNull()) { + value = "NULL"; + } else { + value = ColumnValueToString(valueParser); + } + valueParser.CloseOptional(); + return value; + } + case NYdb::TTypeParser::ETypeKind::Tuple: { + TString value; + valueParser.OpenTuple(); + while (valueParser.TryNextElement()) { + if (!value.empty()) { + value += ','; + } + value += ColumnValueToString(valueParser); + } + valueParser.CloseTuple(); + return value; + } + case NYdb::TTypeParser::ETypeKind::Pg: { + return valueParser.GetPg().Content_; + } + default: + return {}; + } + } + + static TString ToPgSyntax(TStringBuf query) { + return TStringBuilder() << "--!syntax_pg\n" << query; + } + + NYdb::NTable::TClientSettings GetClientSettings() { + NYdb::NTable::TClientSettings clientSettings; + if (ConnectionParams.count("database")) { + clientSettings.Database(ConnectionParams["database"]); + } + if (ConnectionParams.count("auth-token")) { + clientSettings.AuthToken(ConnectionParams["auth-token"]); + } + return clientSettings; + } + + void BootstrapScriptingClient() { + ScriptingClient = std::make_unique<NYdb::NScripting::TScriptingClient>(Driver, GetClientSettings()); + } + + void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) { + BLOG_D("TEvQuery " << ev->Sender); + + if (!ScriptingClient) { + BootstrapScriptingClient(); + } + + TActorSystem* actorSystem = TActivationContext::ActorSystem(); + + ScriptingClient->ExecuteYqlScript(ToPgSyntax(ev->Get()->Message->GetQuery())).Subscribe([actorSystem, ev](const NYdb::NScripting::TAsyncExecuteYqlResult& feature) { + auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); + try { + NYdb::NScripting::TExecuteYqlResult result = NYdb::NScripting::TAsyncExecuteYqlResult(feature).ExtractValue(); + if (result.IsSuccess()) { + const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets(); + if (!resultSets.empty()) { + NYdb::TResultSet resultSet = resultSets[0]; + + { + for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { + // TODO: fill data types and sizes + response->DataFields.push_back({ + .Name = column.Name, + }); + } + } + + { + NYdb::TResultSetParser parser(std::move(resultSet)); + while (parser.TryNextRow()) { + response->DataRows.emplace_back(); + auto& row = response->DataRows.back(); + row.resize(parser.ColumnsCount()); + for (size_t index = 0; index < parser.ColumnsCount(); ++index) { + row[index] = ColumnValueToString(parser.ColumnParser(index)); + } + } + } + } + } else { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)result}); + } + } + catch (const std::exception& e) { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', e.what()}); + } + + actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie); + }); + } + + /*void BootstrapTableClient() { + TableClient = std::make_unique<NYdb::NTable::TTableClient>(Driver, GetClientSettings()); + }*/ + + void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) { + BLOG_D("TEvParse " << ev->Sender); + + /*if (!TableClient) { + BootstrapTableClient(); + } + + TActorSystem* actorSystem = TActivationContext::ActorSystem(); + TActorId selfId = SelfId(); + + TableClient->RetryOperation([actorSystem, ev, selfId](NYdb::NTable::TSession session) { + auto queryData = ev->Get()->Message->GetQueryData(); + const TString& query = queryData.Query; + return session.PrepareDataQuery(query).Apply([actorSystem, ev, selfId, queryData](NYdb::NTable::TAsyncPrepareQueryResult result) mutable -> NThreading::TFuture<NYdb::TStatus> { + NYdb::NTable::TPrepareQueryResult res = result.ExtractValue(); + + if (res.IsSuccess()) { + auto privateParseComplete = std::make_unique<TEvPrivate::TEvParseComplete>(queryData.Name, res.GetQuery()); + actorSystem->Send(selfId, privateParseComplete.release()); + auto parseComplete = std::make_unique<NPG::TEvPGEvents::TEvParseComplete>(); + parseComplete->OriginalMessage = std::move(ev->Get()->Message); + actorSystem->Send(ev->Sender, parseComplete.release()); + } else { + auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvErrorResponse>(); + errorResponse->ErrorFields.push_back({'E', "ERROR"}); + errorResponse->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)res}); + actorSystem->Send(ev->Sender, errorResponse.release()); + } + return NThreading::MakeFuture<NYdb::TStatus>(res); + }); + });*/ + + /*auto result = ParsedQueries.emplace(std::piecewise_construct, std::make_tuple(ev->Get()->Name), std::make_tuple(ev->Get()->DataQuery)); + if (!result.second) { + result.first->second.DataQuery = std::move(ev->Get()->DataQuery); + }*/ + auto queryData = ev->Get()->Message->GetQueryData(); + ParsedStatements[queryData.Name].QueryData = queryData; + + auto parseComplete = ev->Get()->Reply(); + Send(ev->Sender, parseComplete.release()); + } + + void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) { + auto bindData = ev->Get()->Message->GetBindData(); + ParsedStatements[bindData.StatementName].BindData = bindData; + CurrentStatement = bindData.StatementName; + BLOG_D("TEvBind CurrentStatement changed to " << CurrentStatement); + + auto bindComplete = ev->Get()->Reply(); + Send(ev->Sender, bindComplete.release()); + } + + struct TConvertedQuery { + TString Query; + NYdb::TParams Params; + }; + + TConvertedQuery ConvertQuery(const TParsedStatement& statement) { + auto& bindData = statement.BindData; + const auto& queryData = statement.QueryData; + NYdb::TParamsBuilder paramsBuilder; + TStringBuilder injectedQuery; + + for (size_t idxParam = 0; idxParam < queryData.ParametersTypes.size(); ++idxParam) { + int32_t paramType = queryData.ParametersTypes[idxParam]; + TString paramValue; + if (idxParam < bindData.ParametersValue.size()) { + std::vector<uint8_t> paramVal = bindData.ParametersValue[idxParam]; + paramValue = TString(reinterpret_cast<char*>(paramVal.data()), paramVal.size()); + } + switch (paramType) { + case INT2OID: + paramsBuilder.AddParam(TStringBuilder() << "$_" << idxParam + 1).Int16(atoi(paramValue.data())).Build(); + injectedQuery << "DECLARE $_" << idxParam + 1 << " AS Int16;" << Endl; + break; + + } + } + return { + .Query = injectedQuery + queryData.Query, + .Params = paramsBuilder.Build(), + }; + } + + void Handle(NPG::TEvPGEvents::TEvDescribe::TPtr& ev) { + BLOG_D("TEvDescribe " << ev->Sender); + + TString statementName = ev->Get()->Message->GetDescribeData().Name; + if (statementName.empty()) { + statementName = CurrentStatement; + BLOG_W("TEvExecute changed empty statement to " << CurrentStatement); + } + auto it = ParsedStatements.find(statementName); + if (it == ParsedStatements.end()) { + auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>(); + errorResponse->ErrorFields.push_back({'E', "ERROR"}); + errorResponse->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << statementName << "\" not found"}); + Send(ev->Sender, errorResponse.release(), 0, ev->Cookie); + return; + } + + if (!ScriptingClient) { + BootstrapScriptingClient(); + } + + auto query = ConvertQuery(it->second); + TActorSystem* actorSystem = TActivationContext::ActorSystem(); + + // TODO: it's wrong. it should be done using explain to get all result meta. but it's not ready yet. + + ScriptingClient->ExecuteYqlScript(query.Query, query.Params).Subscribe([actorSystem, ev](const NYdb::NScripting::TAsyncExecuteYqlResult& feature) { + auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>(); + try { + NYdb::NScripting::TExecuteYqlResult result = NYdb::NScripting::TAsyncExecuteYqlResult(feature).ExtractValue(); + if (result.IsSuccess()) { + const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets(); + if (!resultSets.empty()) { + NYdb::TResultSet resultSet = resultSets[0]; + + { + for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { + // TODO: fill data types and sizes + response->DataFields.push_back({ + .Name = column.Name, + }); + } + } + } + } else { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)result}); + } + } + catch (const std::exception& e) { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', e.what()}); + } + + actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie); + }); + + /* + TableClient->RetryOperation([actorSystem, sender, dataQuery](NYdb::NTable::TSession session) mutable { + session. + return dataQuery.Execute(NYdb::NTable::TTxControl::BeginTx()).Apply([actorSystem, sender](NYdb::NTable::TAsyncDataQueryResult result) mutable -> NThreading::TFuture<NYdb::TStatus> { + NYdb::NTable::TDataQueryResult res = result.ExtractValue(); + + if (res.IsSuccess()) { + + const TVector<NYdb::TResultSet>& resultSets = res.GetResultSets(); + if (!resultSets.empty()) { + NYdb::TResultSet resultSet = resultSets[0]; + + { + auto rowDescription = std::make_unique<NPG::TEvPGEvents::TEvRowDescription>(); + for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { + // TODO: fill data types and sizes + rowDescription->Fields.push_back({ + .Name = column.Name, + }); + } + + actorSystem->Send(sender, rowDescription.release()); + } + } + } else { + auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvErrorResponse>(); + errorResponse->ErrorFields.push_back({'E', "ERROR"}); + errorResponse->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)res}); + actorSystem->Send(sender, errorResponse.release()); + } + return NThreading::MakeFuture<NYdb::TStatus>(res); + }); + });*/ + } + + void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) { + BLOG_D("TEvExecute " << ev->Sender); + + TString statementName = ev->Get()->Message->GetExecuteData().PortalName; + if (statementName.empty()) { + statementName = CurrentStatement; + BLOG_W("TEvExecute changed empty statement to " << CurrentStatement); + } + auto it = ParsedStatements.find(statementName); + if (it == ParsedStatements.end()) { + auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvExecuteResponse>(); + errorResponse->ErrorFields.push_back({'E', "ERROR"}); + errorResponse->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << statementName << "\" not found"}); + Send(ev->Sender, errorResponse.release(), 0, ev->Cookie); + return; + } + + if (!ScriptingClient) { + BootstrapScriptingClient(); + } + + TActorSystem* actorSystem = TActivationContext::ActorSystem(); + auto query = ConvertQuery(it->second); + + ScriptingClient->ExecuteYqlScript(query.Query, query.Params).Subscribe([actorSystem, ev](const NYdb::NScripting::TAsyncExecuteYqlResult& feature) mutable { + auto response = std::make_unique<NPG::TEvPGEvents::TEvExecuteResponse>(); + try { + NYdb::NScripting::TExecuteYqlResult result = NYdb::NScripting::TAsyncExecuteYqlResult(feature).ExtractValue(); + if (result.IsSuccess()) { + const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets(); + if (!resultSets.empty()) { + NYdb::TResultSet resultSet = resultSets[0]; + + { + auto maxRows = ev->Get()->Message->GetExecuteData().MaxRows; + NYdb::TResultSetParser parser(std::move(resultSet)); + while (parser.TryNextRow()) { + response->DataRows.emplace_back(); + auto& row = response->DataRows.back(); + row.resize(parser.ColumnsCount()); + for (size_t index = 0; index < parser.ColumnsCount(); ++index) { + row[index] = ColumnValueToString(parser.ColumnParser(index)); + } + if (maxRows != 0) { + if (--maxRows == 0) { + break; + } + } + } + } + } + } else { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)result}); + } + } + catch (const std::exception& e) { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', e.what()}); + } + + actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie); + }); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NPG::TEvPGEvents::TEvQuery, Handle); + hFunc(NPG::TEvPGEvents::TEvParse, Handle); + hFunc(NPG::TEvPGEvents::TEvBind, Handle); + hFunc(NPG::TEvPGEvents::TEvDescribe, Handle); + hFunc(NPG::TEvPGEvents::TEvExecute, Handle); + cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + } + } +}; + + +NActors::IActor* CreateConnection(NYdb::TDriver driver, std::unordered_map<TString, TString> params) { + return new TPgYdbConnection(std::move(driver), std::move(params)); +} + +} diff --git a/ydb/apps/pgwire/pg_ydb_connection.h b/ydb/apps/pgwire/pg_ydb_connection.h new file mode 100644 index 0000000000..5adf018ce5 --- /dev/null +++ b/ydb/apps/pgwire/pg_ydb_connection.h @@ -0,0 +1,10 @@ +#pragma once +#include <library/cpp/actors/core/actor.h> +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> +#include <ydb/public/sdk/cpp/client/ydb_common_client/settings.h> + +namespace NPGW { + +NActors::IActor* CreateConnection(NYdb::TDriver driver, std::unordered_map<TString, TString> params); + +} diff --git a/ydb/apps/pgwire/pg_ydb_proxy.cpp b/ydb/apps/pgwire/pg_ydb_proxy.cpp new file mode 100644 index 0000000000..836dab9ca7 --- /dev/null +++ b/ydb/apps/pgwire/pg_ydb_proxy.cpp @@ -0,0 +1,163 @@ +#include "pg_ydb_proxy.h" +#include "pg_ydb_connection.h" +#include "log_impl.h" +#include <ydb/core/pgproxy/pg_proxy_events.h> +#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> +#include <library/cpp/actors/core/actor.h> +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h> + +namespace NPGW { + +using namespace NActors; + +class TPgYdbProxy : public TActor<TPgYdbProxy> { + struct TSecurityState { + TString Token; + }; + + using TBase = TActor<TPgYdbProxy>; + NYdb::TDriver Driver; + std::shared_ptr<NYdb::TGRpcConnectionsImpl> Connections; + std::unordered_map<TActorId, TActorId> PgToYdbConnection; + std::unordered_map<TActorId, TSecurityState> SecurityState; + +public: + TPgYdbProxy(const NYdb::TDriverConfig& driverConfig) + : TBase(&TPgYdbProxy::StateWork) + , Driver(driverConfig) + , Connections(CreateInternalInterface(Driver)) + { + } + + void Handle(NPG::TEvPGEvents::TEvAuth::TPtr& ev) { + std::unordered_map<TString, TString> clientParams = ev->Get()->InitialMessage->GetClientParams(); + BLOG_D("TEvAuth " << ev->Get()->InitialMessage->Dump()); + Ydb::Auth::LoginRequest request; + request.set_user(clientParams["user"]); + if (ev->Get()->PasswordMessage) { + request.set_password(TString(ev->Get()->PasswordMessage->GetPassword())); + } + TActorSystem* actorSystem = TActivationContext::ActorSystem(); + TActorId sender = ev->Sender; + + auto responseCb = [this, actorSystem, sender](Ydb::Auth::LoginResponse* resp, NYdb::TPlainStatus status) { + auto authResponse = std::make_unique<NPG::TEvPGEvents::TEvAuthResponse>(); + if (status.Ok() && resp->operation().status() == Ydb::StatusIds::SUCCESS) { + Ydb::Auth::LoginResult result; + resp->operation().result().UnpackTo(&result); + this->SecurityState[sender].Token = result.token(); + } else { + if (!status.Ok()) { + authResponse->Error = status.Issues.ToOneLineString(); + } + if (resp != nullptr) { + if (resp->operation().issues_size() > 0) { + authResponse->Error = resp->operation().issues(0).message(); + } else { + authResponse->Error = Ydb::StatusIds_StatusCode_Name(resp->operation().status()); + } + } else { + if (status.Status == NYdb::EStatus::CLIENT_CALL_UNIMPLEMENTED) { + authResponse->Error = "The database doesn't support authentication"; + } else { + authResponse->Error = "Authentication error"; + } + } + } + actorSystem->Send(sender, authResponse.release()); + }; + + TString database = clientParams["database"]; + NYdb::TDbDriverStatePtr driverState = Connections->GetDriverState(database, {}, {}, {}, {}); + + NYdb::TRpcRequestSettings rpcSettings; + rpcSettings.ClientTimeout = TDuration::Seconds(60); + + NYdb::TGRpcConnectionsImpl::RunOnDiscoveryEndpoint<Ydb::Auth::V1::AuthService, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>( + driverState, + std::move(request), + std::move(responseCb), + &Ydb::Auth::V1::AuthService::Stub::AsyncLogin, + rpcSettings); + } + + void Handle(NPG::TEvPGEvents::TEvConnectionOpened::TPtr& ev) { + BLOG_D("TEvConnectionOpened " << ev->Sender); + auto params = ev->Get()->Message->GetClientParams(); + auto itSecurityState = SecurityState.find(ev->Sender); + if (itSecurityState != SecurityState.end() && !itSecurityState->second.Token.empty()) { + params["auth-token"] = itSecurityState->second.Token; + } + IActor* actor = CreateConnection(Driver, std::move(params)); + TActorId actorId = Register(actor); + PgToYdbConnection[ev->Sender] = actorId; + BLOG_D("Created ydb connection " << actorId); + } + + void Handle(NPG::TEvPGEvents::TEvConnectionClosed::TPtr& ev) { + BLOG_D("TEvConnectionClosed " << ev->Sender); + auto itConnection = PgToYdbConnection.find(ev->Sender); + if (itConnection != PgToYdbConnection.end()) { + Send(itConnection->second, new TEvents::TEvPoisonPill()); + BLOG_D("Destroyed ydb connection " << itConnection->second); + } + SecurityState.erase(ev->Sender); + } + + void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) { + auto itConnection = PgToYdbConnection.find(ev->Sender); + if (itConnection != PgToYdbConnection.end()) { + Forward(ev, itConnection->second); + } + } + + void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) { + auto itConnection = PgToYdbConnection.find(ev->Sender); + if (itConnection != PgToYdbConnection.end()) { + Forward(ev, itConnection->second); + } + } + + void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) { + auto itConnection = PgToYdbConnection.find(ev->Sender); + if (itConnection != PgToYdbConnection.end()) { + Forward(ev, itConnection->second); + } + } + + void Handle(NPG::TEvPGEvents::TEvDescribe::TPtr& ev) { + auto itConnection = PgToYdbConnection.find(ev->Sender); + if (itConnection != PgToYdbConnection.end()) { + Forward(ev, itConnection->second); + } + } + + void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) { + auto itConnection = PgToYdbConnection.find(ev->Sender); + if (itConnection != PgToYdbConnection.end()) { + Forward(ev, itConnection->second); + BLOG_D("Forwarded to ydb connection " << itConnection->second); + } + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NPG::TEvPGEvents::TEvAuth, Handle); + hFunc(NPG::TEvPGEvents::TEvConnectionOpened, Handle); + hFunc(NPG::TEvPGEvents::TEvConnectionClosed, Handle); + hFunc(NPG::TEvPGEvents::TEvQuery, Handle); + hFunc(NPG::TEvPGEvents::TEvParse, Handle); + hFunc(NPG::TEvPGEvents::TEvBind, Handle); + hFunc(NPG::TEvPGEvents::TEvDescribe, Handle); + hFunc(NPG::TEvPGEvents::TEvExecute, Handle); + } + } +}; + + +NActors::IActor* CreateDatabaseProxy(const NYdb::TDriverConfig& driverConfig) { + return new TPgYdbProxy(driverConfig); +} + +} diff --git a/ydb/apps/pgwire/pg_ydb_proxy.h b/ydb/apps/pgwire/pg_ydb_proxy.h new file mode 100644 index 0000000000..985013cd8e --- /dev/null +++ b/ydb/apps/pgwire/pg_ydb_proxy.h @@ -0,0 +1,9 @@ +#pragma once +#include <library/cpp/actors/core/actor.h> +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> + +namespace NPGW { + +NActors::IActor* CreateDatabaseProxy(const NYdb::TDriverConfig& driverConfig); + +} diff --git a/ydb/apps/pgwire/pgwire.cpp b/ydb/apps/pgwire/pgwire.cpp new file mode 100644 index 0000000000..2ce2b4adca --- /dev/null +++ b/ydb/apps/pgwire/pgwire.cpp @@ -0,0 +1,148 @@ +#include <ydb/core/pgproxy/pg_listener.h> +#include <ydb/core/pgproxy/pg_log.h> +#include <util/system/mlock.h> +#include <util/stream/file.h> +#include <library/cpp/getopt/last_getopt.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/scheduler_basic.h> +#include <library/cpp/actors/core/process_stats.h> +#include <library/cpp/actors/protos/services_common.pb.h> +#include <library/cpp/actors/interconnect/poller_actor.h> +#include "pgwire.h" +#include "log_impl.h" +#include "pg_ydb_proxy.h" + +namespace NPGW { + +std::atomic<bool> TPgWire::Quit; + +void TPgWire::OnTerminate(int) { + Quit = true; +} + +int TPgWire::Init() { + ActorSystem->Start(); + + ActorSystem->Register(NActors::CreateProcStatCollector(TDuration::Seconds(5), AppData.MetricRegistry)); + + Poller = ActorSystem->Register(NActors::CreatePollerActor()); + + NYdb::TDriverConfig driverConfig; + + driverConfig.SetEndpoint(Endpoint); + + DatabaseProxy = ActorSystem->Register(CreateDatabaseProxy(driverConfig)); + Listener = ActorSystem->Register(NPG::CreatePGListener(Poller, DatabaseProxy, { + .Port = ListeningPort, + .SslCertificatePem = SslCertificate, + })); + + return 0; +} + +int TPgWire::Run() { + try { + int res = Init(); + if (res != 0) { + return res; + } +#ifndef NDEBUG + Cout << "Started" << Endl; +#endif + while (!Quit) { + Sleep(TDuration::MilliSeconds(100)); + } +#ifndef NDEBUG + Cout << Endl << "Finished" << Endl; +#endif + Shutdown(); + } + catch (const yexception& e) { + Cerr << e.what() << Endl; + return 1; + } + return 0; +} + +int TPgWire::Shutdown() { + ActorSystem->Stop(); + return 0; +} + +TPgWire::TPgWire(int argc, char** argv) { + NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); + bool useStdErr = false; + bool mlock = false; + TString sslCertificateFile; + opts.AddLongOption("endpoint", "YDB endpoint to connect").Required().RequiredArgument("ENDPOINT").StoreResult(&Endpoint); + opts.AddLongOption("port", "Listening port").Optional().DefaultValue("5432").RequiredArgument("PORT").StoreResult(&ListeningPort); + opts.AddLongOption("stderr", "Redirect log to stderr").NoArgument().SetFlag(&useStdErr); + opts.AddLongOption("mlock", "Lock resident memory").NoArgument().SetFlag(&mlock); + opts.AddLongOption("ssl-cert-file", "Path to file with SSL certificate including private key").RequiredArgument("PATH").StoreResult(&sslCertificateFile); + + NLastGetopt::TOptsParseResult res(&opts, argc, argv); + + if (mlock) { + LockAllMemory(LockCurrentMemory); + } + + THolder<NActors::TActorSystemSetup> actorSystemSetup = BuildActorSystemSetup(); + + TIntrusivePtr<NActors::NLog::TSettings> loggerSettings = BuildLoggerSettings(); + NActors::TLoggerActor* loggerActor = new NActors::TLoggerActor( + loggerSettings, + useStdErr ? NActors::CreateStderrBackend() : NActors::CreateSysLogBackend("pgwire", false, true), + NMonitoring::TMetricRegistry::SharedInstance()); + actorSystemSetup->LocalServices.emplace_back(loggerSettings->LoggerActorId, NActors::TActorSetupCmd(loggerActor, NActors::TMailboxType::HTSwap, 0)); + + ActorSystem = std::make_unique<NActors::TActorSystem>(actorSystemSetup, &AppData, loggerSettings); + AppData.MetricRegistry = NMonitoring::TMetricRegistry::SharedInstance(); + + if (!sslCertificateFile.empty()) { + TString sslCertificate = TUnbufferedFileInput(sslCertificateFile).ReadAll(); + if (!sslCertificate.empty()) { + SslCertificate = sslCertificate; + } else { + ythrow yexception() << "Invalid SSL certificate file"; + } + } +} + +TIntrusivePtr<NActors::NLog::TSettings> TPgWire::BuildLoggerSettings() { + const NActors::TActorId loggerActorId = NActors::TActorId(1, "logger"); + TIntrusivePtr<NActors::NLog::TSettings> loggerSettings = new NActors::NLog::TSettings(loggerActorId, NPGW::EService::Logger, NActors::NLog::PRI_WARN); + loggerSettings->Append( + NActorsServices::EServiceCommon_MIN, + NActorsServices::EServiceCommon_MAX, + NActorsServices::EServiceCommon_Name + ); + loggerSettings->Append( + NPG::EService::MIN, + NPG::EService::MAX, + NPG::GetEServiceName + ); + loggerSettings->Append( + NPGW::EService::MIN, + NPGW::EService::MAX, + NPGW::GetEServiceName + ); + TString explanation; + loggerSettings->SetLevel(NActors::NLog::PRI_DEBUG, NPG::EService::PGWIRE, explanation); + loggerSettings->SetLevel(NActors::NLog::PRI_DEBUG, NPGW::EService::PGYDB, explanation); + return loggerSettings; +} + +THolder<NActors::TActorSystemSetup> TPgWire::BuildActorSystemSetup() { + THolder<NActors::TActorSystemSetup> setup = MakeHolder<NActors::TActorSystemSetup>(); + setup->NodeId = 1; + setup->Executors.Reset(new TAutoPtr<NActors::IExecutorPool>[1]); + setup->ExecutorsCount = 1; + setup->Executors[0] = new NActors::TBasicExecutorPool(0, 4, 10); + + setup->Scheduler = new NActors::TBasicSchedulerThread(NActors::TSchedulerConfig(512, 100)); + + return setup; +} + +} diff --git a/ydb/apps/pgwire/pgwire.h b/ydb/apps/pgwire/pgwire.h new file mode 100644 index 0000000000..62e7fa9da6 --- /dev/null +++ b/ydb/apps/pgwire/pgwire.h @@ -0,0 +1,36 @@ +#pragma once +#include <atomic> +#include <library/cpp/actors/core/actorsystem.h> +#include "signals.h" +#include "appdata.h" + +namespace NPGW { + +class TPgWire { +public: + static std::atomic<bool> Quit; + static void OnTerminate(int); + NSignals::TSignalHandler<SIGINT, &TPgWire::OnTerminate> SignalSIGINT; + NSignals::TSignalHandler<SIGTERM, &TPgWire::OnTerminate> SignalSIGTERM; + NSignals::TSignalIgnore<SIGPIPE> SignalSIGPIPE; + + TPgWire(int argc, char** argv); + int Init(); + int Run(); + int Shutdown(); + static THolder<NActors::TActorSystemSetup> BuildActorSystemSetup(); + static TIntrusivePtr<NActors::NLog::TSettings> BuildLoggerSettings(); + static const TString& GetEServiceName(NActors::NLog::EComponent component); + + TAppData AppData; + std::unique_ptr<NActors::TActorSystem> ActorSystem; + NActors::TActorId Poller; + NActors::TActorId Listener; + NActors::TActorId DatabaseProxy; + // arguments + TString Endpoint; + uint16_t ListeningPort = 5432; + TString SslCertificate; +}; + +} diff --git a/ydb/apps/pgwire/signals.h b/ydb/apps/pgwire/signals.h new file mode 100644 index 0000000000..359dc02250 --- /dev/null +++ b/ydb/apps/pgwire/signals.h @@ -0,0 +1,35 @@ +#pragma once + +#include <signal.h> + +namespace NSignals { + +using sighandler_t = void (*)(int); + +template <int signum, sighandler_t handler> +struct TSignalHandler { + sighandler_t SavedHandler = nullptr; + + TSignalHandler() + : SavedHandler(signal(signum, handler)) + {} + + ~TSignalHandler() { + signal(signum, SavedHandler); + } +}; + +template <int signum> +struct TSignalIgnore { + sighandler_t SavedHandler = nullptr; + + TSignalIgnore() + : SavedHandler(signal(signum, SIG_IGN)) + {} + + ~TSignalIgnore() { + signal(signum, SavedHandler); + } +}; + +} diff --git a/ydb/core/pgproxy/pg_proxy_types.h b/ydb/core/pgproxy/pg_proxy_types.h index 00ac72724a..be4f876f43 100644 --- a/ydb/core/pgproxy/pg_proxy_types.h +++ b/ydb/core/pgproxy/pg_proxy_types.h @@ -5,8 +5,11 @@ #include <vector> #include <util/stream/format.h> #include <util/string/builder.h> -#include <arpa/inet.h> - +#if defined (_win_) + #include <winsock2.h> +#elif defined (_unix_) + #include <arpa/inet.h> +#endif namespace NPG { #pragma pack(push, 1) |