diff options
author | qrort <qrort@yandex-team.com> | 2023-05-30 06:05:04 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2023-05-30 06:05:04 +0300 |
commit | b71fb3a796618a5a1f3ad735fc385c0653336007 (patch) | |
tree | 846bcc1ff88aec4bc7d9724f70e8d2487cccd33f | |
parent | 9a75beeb859ebdd2d5391f0b20cb0349f2fec5a3 (diff) | |
download | ydb-b71fb3a796618a5a1f3ad735fc385c0653336007.tar.gz |
local_pgwire to kqp proxy
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt | 7 | ||||
-rw-r--r-- | ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt | 7 | ||||
-rw-r--r-- | ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt | 7 | ||||
-rw-r--r-- | ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt | 7 | ||||
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_connection.cpp | 304 | ||||
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_util.h | 217 | ||||
-rw-r--r-- | ydb/core/local_pgwire/pgwire_kqp_proxy.cpp | 181 | ||||
-rw-r--r-- | ydb/tests/functional/postgresql/test_postgres.py | 3 |
9 files changed, 448 insertions, 286 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 8ed504fca4..c16b7b7f1a 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1136,6 +1136,7 @@ public: void HandleExecute(TEvKqpExecuter::TEvStreamData::TPtr& ev) { YQL_ENSURE(QueryState && QueryState->RequestActorId); + LOG_D("Forwarded TEvStreamData to " << QueryState->RequestActorId); TlsActivationContext->Send(ev->Forward(QueryState->RequestActorId)); } diff --git a/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt index ee78490159..bf8e444adf 100644 --- a/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt @@ -8,10 +8,16 @@ add_library(ydb-core-local_pgwire) +target_compile_options(ydb-core-local_pgwire PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_link_libraries(ydb-core-local_pgwire PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + yql-public-udf + core-kqp-common + core-kqp-executer_actor ydb-core-grpc_services core-grpc_services-local_rpc ydb-core-protos @@ -22,4 +28,5 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC target_sources(ydb-core-local_pgwire PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp ) diff --git a/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt b/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt index cce2ab3a26..405972616a 100644 --- a/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt +++ b/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt @@ -8,11 +8,17 @@ add_library(ydb-core-local_pgwire) +target_compile_options(ydb-core-local_pgwire PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_link_libraries(ydb-core-local_pgwire PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil cpp-actors-core + yql-public-udf + core-kqp-common + core-kqp-executer_actor ydb-core-grpc_services core-grpc_services-local_rpc ydb-core-protos @@ -23,4 +29,5 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC target_sources(ydb-core-local_pgwire PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp ) diff --git a/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt index cce2ab3a26..405972616a 100644 --- a/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt +++ b/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt @@ -8,11 +8,17 @@ add_library(ydb-core-local_pgwire) +target_compile_options(ydb-core-local_pgwire PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_link_libraries(ydb-core-local_pgwire PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil cpp-actors-core + yql-public-udf + core-kqp-common + core-kqp-executer_actor ydb-core-grpc_services core-grpc_services-local_rpc ydb-core-protos @@ -23,4 +29,5 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC target_sources(ydb-core-local_pgwire PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp ) diff --git a/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt index ee78490159..bf8e444adf 100644 --- a/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt +++ b/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt @@ -8,10 +8,16 @@ add_library(ydb-core-local_pgwire) +target_compile_options(ydb-core-local_pgwire PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_link_libraries(ydb-core-local_pgwire PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + yql-public-udf + core-kqp-common + core-kqp-executer_actor ydb-core-grpc_services core-grpc_services-local_rpc ydb-core-protos @@ -22,4 +28,5 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC target_sources(ydb-core-local_pgwire PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp ) diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp index aa2508bcd1..5c72000bf6 100644 --- a/ydb/core/local_pgwire/local_pgwire_connection.cpp +++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp @@ -1,6 +1,11 @@ #include "log_impl.h" +#include "local_pgwire_util.h" + #include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/core/pgproxy/pg_proxy_events.h> +#include <ydb/core/kqp/common/events/events.h> +#include <ydb/core/kqp/common/simple/services.h> +#include <ydb/core/kqp/executer_actor/kqp_executer.h> #define INCLUDE_YDB_INTERNAL_H #include <ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.h> #include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> @@ -10,38 +15,25 @@ #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <library/cpp/actors/core/actor_bootstrapped.h> -// temporary borrowed from postgresql/src/backend/catalog/pg_type_d.h - -#define BOOLOID 16 -#define BYTEAOID 17 -#define CHAROID 18 -#define NAMEOID 19 -#define INT8OID 20 -#define INT2OID 21 -#define INT2VECTOROID 22 -#define INT4OID 23 -#define REGPROCOID 24 -#define TEXTOID 25 - -// - namespace NLocalPgWire { using namespace NActors; using namespace NKikimr; +extern NActors::IActor* CreatePgwireKqpProxy( + NActors::TActorId actorId, + ui64 cookie, + std::unordered_map<TString, TString> params +); + class TPgYdbConnection : public TActorBootstrapped<TPgYdbConnection> { using TBase = TActorBootstrapped<TPgYdbConnection>; - struct TParsedStatement { - NPG::TPGParse::TQueryData QueryData; - NPG::TPGBind::TBindData BindData; - }; - std::unordered_map<TString, TString> ConnectionParams; std::unordered_map<TString, TParsedStatement> ParsedStatements; TString CurrentStatement; - + Ydb::StatusIds QueryStatus; + std::unordered_map<ui32, NYdb::TResultSet> ResultSets; public: TPgYdbConnection(std::unordered_map<TString, TString> params) : ConnectionParams(std::move(params)) @@ -51,221 +43,12 @@ public: Become(&TPgYdbConnection::StateWork); } - static TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser) { - switch (valueParser.GetPrimitiveType()) { - case NYdb::EPrimitiveType::Bool: - return TStringBuilder() << valueParser.GetBool(); - case NYdb::EPrimitiveType::Int8: - return TStringBuilder() << valueParser.GetInt8(); - case NYdb::EPrimitiveType::Uint8: - return TStringBuilder() << valueParser.GetUint8(); - case NYdb::EPrimitiveType::Int16: - return TStringBuilder() << valueParser.GetInt16(); - case NYdb::EPrimitiveType::Uint16: - return TStringBuilder() << valueParser.GetUint16(); - case NYdb::EPrimitiveType::Int32: - return TStringBuilder() << valueParser.GetInt32(); - case NYdb::EPrimitiveType::Uint32: - return TStringBuilder() << valueParser.GetUint32(); - case NYdb::EPrimitiveType::Int64: - return TStringBuilder() << valueParser.GetInt64(); - case NYdb::EPrimitiveType::Uint64: - return TStringBuilder() << valueParser.GetUint64(); - case NYdb::EPrimitiveType::Float: - return TStringBuilder() << valueParser.GetFloat(); - case NYdb::EPrimitiveType::Double: - return TStringBuilder() << valueParser.GetDouble(); - case NYdb::EPrimitiveType::Utf8: - return TStringBuilder() << valueParser.GetUtf8(); - case NYdb::EPrimitiveType::Date: - return valueParser.GetDate().ToString(); - case NYdb::EPrimitiveType::Datetime: - return valueParser.GetDatetime().ToString(); - case NYdb::EPrimitiveType::Timestamp: - return valueParser.GetTimestamp().ToString(); - case NYdb::EPrimitiveType::Interval: - return TStringBuilder() << valueParser.GetInterval(); - case NYdb::EPrimitiveType::TzDate: - return valueParser.GetTzDate(); - case NYdb::EPrimitiveType::TzDatetime: - return valueParser.GetTzDatetime(); - case NYdb::EPrimitiveType::TzTimestamp: - return valueParser.GetTzTimestamp(); - case NYdb::EPrimitiveType::String: - return Base64Encode(valueParser.GetString()); - case NYdb::EPrimitiveType::Yson: - return valueParser.GetYson(); - case NYdb::EPrimitiveType::Json: - return valueParser.GetJson(); - case NYdb::EPrimitiveType::JsonDocument: - return valueParser.GetJsonDocument(); - case NYdb::EPrimitiveType::DyNumber: - return valueParser.GetDyNumber(); - case NYdb::EPrimitiveType::Uuid: - return {}; - } - return {}; - } - - static TString ColumnValueToString(NYdb::TValueParser& valueParser) { - switch (valueParser.GetKind()) { - case NYdb::TTypeParser::ETypeKind::Primitive: - return ColumnPrimitiveValueToString(valueParser); - case NYdb::TTypeParser::ETypeKind::Optional: { - TString value; - valueParser.OpenOptional(); - if (valueParser.IsNull()) { - value = "NULL"; - } else { - value = ColumnValueToString(valueParser); - } - valueParser.CloseOptional(); - return value; - } - case NYdb::TTypeParser::ETypeKind::Tuple: { - TString value; - valueParser.OpenTuple(); - while (valueParser.TryNextElement()) { - if (!value.empty()) { - value += ','; - } - value += ColumnValueToString(valueParser); - } - valueParser.CloseTuple(); - return value; - } - case NYdb::TTypeParser::ETypeKind::Pg: { - return valueParser.GetPg().Content_; - } - default: - return {}; - } - } - - static uint32_t GetPgOidFromYdbType(NYdb::TType type) { - NYdb::TTypeParser parser(type); - switch (parser.GetKind()) { - case NYdb::TTypeParser::ETypeKind::Pg: { - return parser.GetPg().Oid; - default: - return {}; - } - } - } - - TString ToPgSyntax(TStringBuf query) { - auto itOptions = ConnectionParams.find("options"); - if (itOptions == ConnectionParams.end()) { - return TStringBuilder() << "--!syntax_pg\n" << query; // default - } - return TStringBuilder() << "--!" << itOptions->second << "\n" << query; - } - - static NYdb::NScripting::TExecuteYqlResult ConvertProtoResponseToSdkResult(Ydb::Scripting::ExecuteYqlResponse&& proto) { - TVector<NYdb::TResultSet> res; - TMaybe<NYdb::NTable::TQueryStats> queryStats; - { - Ydb::Scripting::ExecuteYqlResult result; - proto.mutable_operation()->mutable_result()->UnpackTo(&result); - for (int i = 0; i < result.result_sets_size(); i++) { - res.emplace_back(std::move(*result.mutable_result_sets(i))); - } - if (result.has_query_stats()) { - queryStats = NYdb::NTable::TQueryStats(std::move(*result.mutable_query_stats())); - } - } - NYdb::TPlainStatus alwaysSuccess; - return {NYdb::TStatus(std::move(alwaysSuccess)), std::move(res), queryStats}; - } - void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) { BLOG_D("TEvQuery " << ev->Sender); - auto query(ev->Get()->Message->GetQuery()); - if (IsQueryEmpty(query)) { - auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); - response->EmptyQuery = true; - Send(ev->Sender, response.release(), 0, ev->Cookie); - return; - } - Ydb::Scripting::ExecuteYqlRequest request; - request.set_script(ToPgSyntax(query)); - TString database; - if (ConnectionParams.count("database")) { - database = ConnectionParams["database"]; - } - TString token; - if (ConnectionParams.count("ydb-serialized-token")) { - token = ConnectionParams["ydb-serialized-token"]; - } - using TRpcEv = NGRpcService::TGrpcRequestOperationCall<Ydb::Scripting::ExecuteYqlRequest, Ydb::Scripting::ExecuteYqlResponse>; - TActorSystem* actorSystem = TActivationContext::ActorSystem(); - auto rpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), database, token, actorSystem); - rpcFuture.Subscribe([actorSystem, ev](NThreading::TFuture<Ydb::Scripting::ExecuteYqlResponse> future) { - auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); - // HACK - if (ev->Get()->Message->GetQuery().starts_with("BEGIN")) { - response->Tag = "BEGIN"; - response->TransactionStatus = 'T'; - } - // HACK - try { - Ydb::Scripting::ExecuteYqlResponse yqlResponse(future.ExtractValueSync()); - if (yqlResponse.has_operation()) { - if (yqlResponse.operation().status() == Ydb::StatusIds::SUCCESS) { - if (yqlResponse.operation().has_result()) { - NYdb::NScripting::TExecuteYqlResult result(ConvertProtoResponseToSdkResult(std::move(yqlResponse))); - if (result.IsSuccess()) { - const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets(); - if (!resultSets.empty()) { - NYdb::TResultSet resultSet = resultSets[0]; - - { - for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { - // TODO: fill data types and sizes - response->DataFields.push_back({ - .Name = column.Name, - .DataType = GetPgOidFromYdbType(column.Type), - }); - } - } - - { - NYdb::TResultSetParser parser(std::move(resultSet)); - while (parser.TryNextRow()) { - response->DataRows.emplace_back(); - auto& row = response->DataRows.back(); - row.resize(parser.ColumnsCount()); - for (size_t index = 0; index < parser.ColumnsCount(); ++index) { - row[index] = ColumnValueToString(parser.ColumnParser(index)); - } - } - } - - // HACK - response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size(); - // HACK - } - } - } - } else { - NYql::TIssues issues; - NYql::IssuesFromMessage(yqlResponse.operation().issues(), issues); - NYdb::TStatus status(NYdb::EStatus(yqlResponse.operation().status()), std::move(issues)); - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', TStringBuilder() << status}); - } - } else { - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', "No result received"}); - } - } - catch (const std::exception& e) { - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', e.what()}); - } - - actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie); - }); + IActor* actor = CreatePgwireKqpProxy(ev->Sender, ev->Cookie, ConnectionParams); + TActorId actorId = Register(actor); + BLOG_D("Created pgwireKqpProxy: " << actorId); + Forward(ev, actorId); } void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) { @@ -298,55 +81,6 @@ public: Send(ev->Sender, closeComplete.release()); } - struct TConvertedQuery { - TString Query; - NYdb::TParams Params; - }; - - TConvertedQuery ConvertQuery(const TParsedStatement& statement) { - auto& bindData = statement.BindData; - const auto& queryData = statement.QueryData; - NYdb::TParamsBuilder paramsBuilder; - TStringBuilder injectedQuery; - - for (size_t idxParam = 0; idxParam < queryData.ParametersTypes.size(); ++idxParam) { - int32_t paramType = queryData.ParametersTypes[idxParam]; - TString paramValue; - if (idxParam < bindData.ParametersValue.size()) { - std::vector<uint8_t> paramVal = bindData.ParametersValue[idxParam]; - paramValue = TString(reinterpret_cast<char*>(paramVal.data()), paramVal.size()); - } - switch (paramType) { - case INT2OID: - paramsBuilder.AddParam(TStringBuilder() << ":_" << idxParam + 1).Int16(atoi(paramValue.data())).Build(); - break; - - } - } - return { - .Query = injectedQuery + queryData.Query, - .Params = paramsBuilder.Build(), - }; - } - - inline static bool IsWhitespaceASCII(char c) - { - return c == ' ' || c == '\t' || c == '\n' || c == '\r' || c == '\f' || c == '\v'; - } - - static bool IsWhitespace(TStringBuf query) { - for (char c : query) { - if (!IsWhitespaceASCII(c)) { - return false; - } - } - return true; - } - - static bool IsQueryEmpty(TStringBuf query) { - return IsWhitespace(query); - } - void Handle(NPG::TEvPGEvents::TEvDescribe::TPtr& ev) { BLOG_D("TEvDescribe " << ev->Sender); @@ -367,7 +101,7 @@ public: TActorSystem* actorSystem = TActivationContext::ActorSystem(); auto query = ConvertQuery(it->second); Ydb::Scripting::ExecuteYqlRequest request; - request.set_script(ToPgSyntax(query.Query)); + request.set_script(ToPgSyntax(query.Query, ConnectionParams)); // TODO: //request.set_parameters(query.Params); TString database; @@ -457,7 +191,7 @@ public: TActorSystem* actorSystem = TActivationContext::ActorSystem(); auto query = ConvertQuery(it->second); Ydb::Scripting::ExecuteYqlRequest request; - request.set_script(ToPgSyntax(query.Query)); + request.set_script(ToPgSyntax(query.Query, ConnectionParams)); // TODO: //request.set_parameters(query.Params); TString database; diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h new file mode 100644 index 0000000000..110996c181 --- /dev/null +++ b/ydb/core/local_pgwire/local_pgwire_util.h @@ -0,0 +1,217 @@ +#pragma once +#include <library/cpp/string_utils/base64/base64.h> + +#include <ydb/core/pgproxy/pg_proxy_types.h> + +#include <ydb/library/yql/public/issue/yql_issue_message.h> + +#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h> +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h> +#include <ydb/public/sdk/cpp/client/ydb_value/value.h> +#include <ydb/public/api/grpc/ydb_scripting_v1.grpc.pb.h> + +#include <util/string/builder.h> +#include <util/generic/string.h> + +// temporary borrowed from postgresql/src/backend/catalog/pg_type_d.h + +#define BOOLOID 16 +#define BYTEAOID 17 +#define CHAROID 18 +#define NAMEOID 19 +#define INT8OID 20 +#define INT2OID 21 +#define INT2VECTOROID 22 +#define INT4OID 23 +#define REGPROCOID 24 +#define TEXTOID 25 + +// + +namespace NLocalPgWire { + +struct TParsedStatement { + NPG::TPGParse::TQueryData QueryData; + NPG::TPGBind::TBindData BindData; +}; + +inline TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser) { + switch (valueParser.GetPrimitiveType()) { + case NYdb::EPrimitiveType::Bool: + return TStringBuilder() << valueParser.GetBool(); + case NYdb::EPrimitiveType::Int8: + return TStringBuilder() << valueParser.GetInt8(); + case NYdb::EPrimitiveType::Uint8: + return TStringBuilder() << valueParser.GetUint8(); + case NYdb::EPrimitiveType::Int16: + return TStringBuilder() << valueParser.GetInt16(); + case NYdb::EPrimitiveType::Uint16: + return TStringBuilder() << valueParser.GetUint16(); + case NYdb::EPrimitiveType::Int32: + return TStringBuilder() << valueParser.GetInt32(); + case NYdb::EPrimitiveType::Uint32: + return TStringBuilder() << valueParser.GetUint32(); + case NYdb::EPrimitiveType::Int64: + return TStringBuilder() << valueParser.GetInt64(); + case NYdb::EPrimitiveType::Uint64: + return TStringBuilder() << valueParser.GetUint64(); + case NYdb::EPrimitiveType::Float: + return TStringBuilder() << valueParser.GetFloat(); + case NYdb::EPrimitiveType::Double: + return TStringBuilder() << valueParser.GetDouble(); + case NYdb::EPrimitiveType::Utf8: + return TStringBuilder() << valueParser.GetUtf8(); + case NYdb::EPrimitiveType::Date: + return valueParser.GetDate().ToString(); + case NYdb::EPrimitiveType::Datetime: + return valueParser.GetDatetime().ToString(); + case NYdb::EPrimitiveType::Timestamp: + return valueParser.GetTimestamp().ToString(); + case NYdb::EPrimitiveType::Interval: + return TStringBuilder() << valueParser.GetInterval(); + case NYdb::EPrimitiveType::TzDate: + return valueParser.GetTzDate(); + case NYdb::EPrimitiveType::TzDatetime: + return valueParser.GetTzDatetime(); + case NYdb::EPrimitiveType::TzTimestamp: + return valueParser.GetTzTimestamp(); + case NYdb::EPrimitiveType::String: + return Base64Encode(valueParser.GetString()); + case NYdb::EPrimitiveType::Yson: + return valueParser.GetYson(); + case NYdb::EPrimitiveType::Json: + return valueParser.GetJson(); + case NYdb::EPrimitiveType::JsonDocument: + return valueParser.GetJsonDocument(); + case NYdb::EPrimitiveType::DyNumber: + return valueParser.GetDyNumber(); + case NYdb::EPrimitiveType::Uuid: + return {}; + } + return {}; +} + +inline TString ColumnValueToString(NYdb::TValueParser& valueParser) { + switch (valueParser.GetKind()) { + case NYdb::TTypeParser::ETypeKind::Primitive: + return ColumnPrimitiveValueToString(valueParser); + case NYdb::TTypeParser::ETypeKind::Optional: { + TString value; + valueParser.OpenOptional(); + if (valueParser.IsNull()) { + value = "NULL"; + } else { + value = ColumnValueToString(valueParser); + } + valueParser.CloseOptional(); + return value; + } + case NYdb::TTypeParser::ETypeKind::Tuple: { + TString value; + valueParser.OpenTuple(); + while (valueParser.TryNextElement()) { + if (!value.empty()) { + value += ','; + } + value += ColumnValueToString(valueParser); + } + valueParser.CloseTuple(); + return value; + } + case NYdb::TTypeParser::ETypeKind::Pg: { + return valueParser.GetPg().Content_; + } + default: + return {}; + } +} + +inline uint32_t GetPgOidFromYdbType(NYdb::TType type) { + NYdb::TTypeParser parser(type); + switch (parser.GetKind()) { + case NYdb::TTypeParser::ETypeKind::Pg: { + return parser.GetPg().Oid; + default: + return {}; + } + } +} + +inline TString ToPgSyntax(TStringBuf query, const std::unordered_map<TString, TString>& ConnectionParams) { + auto itOptions = ConnectionParams.find("options"); + if (itOptions == ConnectionParams.end()) { + return TStringBuilder() << "--!syntax_pg\n" << query; // default + } + return TStringBuilder() << "--!" << itOptions->second << "\n" << query; +} + +inline NYdb::NScripting::TExecuteYqlResult ConvertProtoResponseToSdkResult(Ydb::Scripting::ExecuteYqlResponse&& proto) { + TVector<NYdb::TResultSet> res; + TMaybe<NYdb::NTable::TQueryStats> queryStats; + { + Ydb::Scripting::ExecuteYqlResult result; + proto.mutable_operation()->mutable_result()->UnpackTo(&result); + for (int i = 0; i < result.result_sets_size(); i++) { + res.emplace_back(std::move(*result.mutable_result_sets(i))); + } + if (result.has_query_stats()) { + queryStats = NYdb::NTable::TQueryStats(std::move(*result.mutable_query_stats())); + } + } + NYdb::TPlainStatus alwaysSuccess; + return {NYdb::TStatus(std::move(alwaysSuccess)), std::move(res), queryStats}; +} + +struct TConvertedQuery { + TString Query; + NYdb::TParams Params; +}; + +inline TConvertedQuery ConvertQuery(const TParsedStatement& statement) { + auto& bindData = statement.BindData; + const auto& queryData = statement.QueryData; + NYdb::TParamsBuilder paramsBuilder; + TStringBuilder injectedQuery; + + for (size_t idxParam = 0; idxParam < queryData.ParametersTypes.size(); ++idxParam) { + int32_t paramType = queryData.ParametersTypes[idxParam]; + TString paramValue; + if (idxParam < bindData.ParametersValue.size()) { + std::vector<uint8_t> paramVal = bindData.ParametersValue[idxParam]; + paramValue = TString(reinterpret_cast<char*>(paramVal.data()), paramVal.size()); + } + switch (paramType) { + case INT2OID: + paramsBuilder.AddParam(TStringBuilder() << ":_" << idxParam + 1).Int16(atoi(paramValue.data())).Build(); + break; + + } + } + return { + .Query = injectedQuery + queryData.Query, + .Params = paramsBuilder.Build(), + }; +} + +inline bool IsWhitespaceASCII(char c) +{ + return c == ' ' || c == '\t' || c == '\n' || c == '\r' || c == '\f' || c == '\v'; +} + +inline bool IsWhitespace(TStringBuf query) { + for (char c : query) { + if (!IsWhitespaceASCII(c)) { + return false; + } + } + return true; +} + +inline bool IsQueryEmpty(TStringBuf query) { + return IsWhitespace(query); +} + +} //namespace NLocalPgWire diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp new file mode 100644 index 0000000000..bce7d9f044 --- /dev/null +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -0,0 +1,181 @@ +#include "log_impl.h" +#include "local_pgwire_util.h" +#include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/kqp/common/events/events.h> +#include <ydb/core/kqp/common/simple/services.h> +#include <ydb/core/kqp/executer_actor/kqp_executer.h> +#include <ydb/core/pgproxy/pg_proxy_events.h> +#include <ydb/core/pgproxy/pg_proxy_types.h> +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/public/sdk/cpp/client/ydb_result/result.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> + + +namespace NLocalPgWire { + +using namespace NActors; +using namespace NKikimr; + +class TPgwireKqpProxy : public TActorBootstrapped<TPgwireKqpProxy> { + using TBase = TActorBootstrapped<TPgwireKqpProxy>; + + NActors::TActorId RequestActorId_; + ui64 RequestCookie_; + bool InTransaction_; + std::unordered_map<TString, TString> ConnectionParams_; + TMap<ui32, NYdb::TResultSet> ResultSets_; +public: + TPgwireKqpProxy(NActors::TActorId actorId, ui64 cookie, std::unordered_map<TString, TString> params) + : RequestActorId_(actorId) + , RequestCookie_(cookie) + , ConnectionParams_(std::move(params)) + {} + + void Bootstrap() { + Become(&TPgwireKqpProxy::StateWork); + } + + void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { + NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet())); + ResultSets_.emplace(ev->Get()->Record.GetQueryResultIndex(), resultSet); + + BLOG_D(this->SelfId() << "Send stream data ack" + << ", to: " << ev->Sender); + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + resp->Record.SetFreeSpace(std::numeric_limits<ui64>::max()); + Send(ev->Sender, resp.Release()); + } + + void FillResultSet(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvQueryResponse* response) { + { + for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { + // TODO: fill data types and sizes + response->DataFields.push_back({ + .Name = column.Name, + .DataType = GetPgOidFromYdbType(column.Type), + // .DataTypeSize = column.Type.GetProto().Getpg_type().Gettyplen() + }); + } + } + { + NYdb::TResultSetParser parser(std::move(resultSet)); + while (parser.TryNextRow()) { + response->DataRows.emplace_back(); + auto& row = response->DataRows.back(); + row.resize(parser.ColumnsCount()); + for (size_t index = 0; index < parser.ColumnsCount(); ++index) { + row[index] = ColumnValueToString(parser.ColumnParser(index)); + } + } + } + } + + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { + BLOG_D("Handling TEvKqp::TEvQueryResponse"); + NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); + + auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); + // HACK + if (InTransaction_) { + response->Tag = "BEGIN"; + response->TransactionStatus = 'T'; + } + try { + if (record.HasYdbStatus()) { + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + auto noScript = ConnectionParams_.find("no-script") != ConnectionParams_.end(); + if (noScript) { + Y_ENSURE(record.GetResponse().GetYdbResults().empty()); + if (!ResultSets_.empty()) { + FillResultSet(ResultSets_.begin()->second, response.get()); + } + } else { + Y_ENSURE(ResultSets_.empty()); + auto results = record.GetResponse().GetResults(); + if (!results.empty()) { + auto ydbResult = record.MutableResponse()->MutableYdbResults()->Add(); + NKqp::ConvertKqpQueryResultToDbResult(results.at(0), ydbResult); + FillResultSet(*ydbResult, response.get()); + } + } + + // HACK + response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size(); + // HACK + } else { + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); + NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues)); + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << status}); + } + } else { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', "No result received"}); + } + } catch (const std::exception& e) { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', e.what()}); + } + BLOG_D("Finally replying to " << RequestActorId_); + Send(RequestActorId_, response.release(), 0, RequestCookie_); + //PassAway(); + } + + void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) { + BLOG_D("TEvQuery, sender: " << ev->Sender << " , self: " << SelfId()); + auto query(ev->Get()->Message->GetQuery()); + if (IsQueryEmpty(query)) { + auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); + response->EmptyQuery = true; + Send(ev->Sender, response.release(), 0, ev->Cookie); + return; + } + TString database; + if (ConnectionParams_.count("database")) { + database = ConnectionParams_["database"]; + } + TString token; + if (ConnectionParams_.count("ydb-serialized-token")) { + token = ConnectionParams_["ydb-serialized-token"]; + } + + auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); + request.SetQuery(ToPgSyntax(query, ConnectionParams_)); + request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + auto noScript = ConnectionParams_.find("no-script"); + if (noScript == ConnectionParams_.end()) { + request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT); + } else { + request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY); + request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + request.MutableTxControl()->set_commit_tx(true); + } + request.SetKeepSession(false); + request.SetDatabase(database); + event->Record.SetUserToken(token); + InTransaction_ = ev->Get()->Message->GetQuery().starts_with("BEGIN"); + ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); + BLOG_D("Sent event to kqpProxy, RequestActorId = " << RequestActorId_ << ", self: " << SelfId()); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NPG::TEvPGEvents::TEvQuery, Handle); + hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); + hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); + } + } +}; + + +NActors::IActor* CreatePgwireKqpProxy(NActors::TActorId actorId, ui64 cookie, std::unordered_map<TString, TString> params) { + return new TPgwireKqpProxy(std::move(actorId), std::move(cookie), std::move(params)); +} + +} //namespace NLocalPgwire diff --git a/ydb/tests/functional/postgresql/test_postgres.py b/ydb/tests/functional/postgresql/test_postgres.py index 494fbee5ed..e95d39f556 100644 --- a/ydb/tests/functional/postgresql/test_postgres.py +++ b/ydb/tests/functional/postgresql/test_postgres.py @@ -76,7 +76,8 @@ class BasePostgresTest(object): 'LOCAL_PGWIRE': LogLevels.DEBUG, 'KQP_YQL': LogLevels.DEBUG, 'KQP_COMPILE_ACTOR': LogLevels.DEBUG, - 'KQP_COMPILE_REQUEST': LogLevels.DEBUG + 'KQP_COMPILE_REQUEST': LogLevels.DEBUG, + 'KQP_PROXY': LogLevels.DEBUG } )) cls.cluster.start() |