aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorqrort <qrort@yandex-team.com>2023-05-30 06:05:04 +0300
committerqrort <qrort@yandex-team.com>2023-05-30 06:05:04 +0300
commitb71fb3a796618a5a1f3ad735fc385c0653336007 (patch)
tree846bcc1ff88aec4bc7d9724f70e8d2487cccd33f
parent9a75beeb859ebdd2d5391f0b20cb0349f2fec5a3 (diff)
downloadydb-b71fb3a796618a5a1f3ad735fc385c0653336007.tar.gz
local_pgwire to kqp proxy
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp1
-rw-r--r--ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt7
-rw-r--r--ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt7
-rw-r--r--ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt7
-rw-r--r--ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt7
-rw-r--r--ydb/core/local_pgwire/local_pgwire_connection.cpp304
-rw-r--r--ydb/core/local_pgwire/local_pgwire_util.h217
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.cpp181
-rw-r--r--ydb/tests/functional/postgresql/test_postgres.py3
9 files changed, 448 insertions, 286 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 8ed504fca4..c16b7b7f1a 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1136,6 +1136,7 @@ public:
void HandleExecute(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
YQL_ENSURE(QueryState && QueryState->RequestActorId);
+ LOG_D("Forwarded TEvStreamData to " << QueryState->RequestActorId);
TlsActivationContext->Send(ev->Forward(QueryState->RequestActorId));
}
diff --git a/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt
index ee78490159..bf8e444adf 100644
--- a/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt
@@ -8,10 +8,16 @@
add_library(ydb-core-local_pgwire)
+target_compile_options(ydb-core-local_pgwire PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_link_libraries(ydb-core-local_pgwire PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ yql-public-udf
+ core-kqp-common
+ core-kqp-executer_actor
ydb-core-grpc_services
core-grpc_services-local_rpc
ydb-core-protos
@@ -22,4 +28,5 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC
target_sources(ydb-core-local_pgwire PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
)
diff --git a/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt b/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt
index cce2ab3a26..405972616a 100644
--- a/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt
@@ -8,11 +8,17 @@
add_library(ydb-core-local_pgwire)
+target_compile_options(ydb-core-local_pgwire PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_link_libraries(ydb-core-local_pgwire PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ yql-public-udf
+ core-kqp-common
+ core-kqp-executer_actor
ydb-core-grpc_services
core-grpc_services-local_rpc
ydb-core-protos
@@ -23,4 +29,5 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC
target_sources(ydb-core-local_pgwire PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
)
diff --git a/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt
index cce2ab3a26..405972616a 100644
--- a/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt
@@ -8,11 +8,17 @@
add_library(ydb-core-local_pgwire)
+target_compile_options(ydb-core-local_pgwire PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_link_libraries(ydb-core-local_pgwire PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ yql-public-udf
+ core-kqp-common
+ core-kqp-executer_actor
ydb-core-grpc_services
core-grpc_services-local_rpc
ydb-core-protos
@@ -23,4 +29,5 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC
target_sources(ydb-core-local_pgwire PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
)
diff --git a/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt
index ee78490159..bf8e444adf 100644
--- a/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt
@@ -8,10 +8,16 @@
add_library(ydb-core-local_pgwire)
+target_compile_options(ydb-core-local_pgwire PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_link_libraries(ydb-core-local_pgwire PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ yql-public-udf
+ core-kqp-common
+ core-kqp-executer_actor
ydb-core-grpc_services
core-grpc_services-local_rpc
ydb-core-protos
@@ -22,4 +28,5 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC
target_sources(ydb-core-local_pgwire PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/local_pgwire.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
)
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp
index aa2508bcd1..5c72000bf6 100644
--- a/ydb/core/local_pgwire/local_pgwire_connection.cpp
+++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp
@@ -1,6 +1,11 @@
#include "log_impl.h"
+#include "local_pgwire_util.h"
+
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/core/pgproxy/pg_proxy_events.h>
+#include <ydb/core/kqp/common/events/events.h>
+#include <ydb/core/kqp/common/simple/services.h>
+#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#define INCLUDE_YDB_INTERNAL_H
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.h>
#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
@@ -10,38 +15,25 @@
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
-// temporary borrowed from postgresql/src/backend/catalog/pg_type_d.h
-
-#define BOOLOID 16
-#define BYTEAOID 17
-#define CHAROID 18
-#define NAMEOID 19
-#define INT8OID 20
-#define INT2OID 21
-#define INT2VECTOROID 22
-#define INT4OID 23
-#define REGPROCOID 24
-#define TEXTOID 25
-
-//
-
namespace NLocalPgWire {
using namespace NActors;
using namespace NKikimr;
+extern NActors::IActor* CreatePgwireKqpProxy(
+ NActors::TActorId actorId,
+ ui64 cookie,
+ std::unordered_map<TString, TString> params
+);
+
class TPgYdbConnection : public TActorBootstrapped<TPgYdbConnection> {
using TBase = TActorBootstrapped<TPgYdbConnection>;
- struct TParsedStatement {
- NPG::TPGParse::TQueryData QueryData;
- NPG::TPGBind::TBindData BindData;
- };
-
std::unordered_map<TString, TString> ConnectionParams;
std::unordered_map<TString, TParsedStatement> ParsedStatements;
TString CurrentStatement;
-
+ Ydb::StatusIds QueryStatus;
+ std::unordered_map<ui32, NYdb::TResultSet> ResultSets;
public:
TPgYdbConnection(std::unordered_map<TString, TString> params)
: ConnectionParams(std::move(params))
@@ -51,221 +43,12 @@ public:
Become(&TPgYdbConnection::StateWork);
}
- static TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser) {
- switch (valueParser.GetPrimitiveType()) {
- case NYdb::EPrimitiveType::Bool:
- return TStringBuilder() << valueParser.GetBool();
- case NYdb::EPrimitiveType::Int8:
- return TStringBuilder() << valueParser.GetInt8();
- case NYdb::EPrimitiveType::Uint8:
- return TStringBuilder() << valueParser.GetUint8();
- case NYdb::EPrimitiveType::Int16:
- return TStringBuilder() << valueParser.GetInt16();
- case NYdb::EPrimitiveType::Uint16:
- return TStringBuilder() << valueParser.GetUint16();
- case NYdb::EPrimitiveType::Int32:
- return TStringBuilder() << valueParser.GetInt32();
- case NYdb::EPrimitiveType::Uint32:
- return TStringBuilder() << valueParser.GetUint32();
- case NYdb::EPrimitiveType::Int64:
- return TStringBuilder() << valueParser.GetInt64();
- case NYdb::EPrimitiveType::Uint64:
- return TStringBuilder() << valueParser.GetUint64();
- case NYdb::EPrimitiveType::Float:
- return TStringBuilder() << valueParser.GetFloat();
- case NYdb::EPrimitiveType::Double:
- return TStringBuilder() << valueParser.GetDouble();
- case NYdb::EPrimitiveType::Utf8:
- return TStringBuilder() << valueParser.GetUtf8();
- case NYdb::EPrimitiveType::Date:
- return valueParser.GetDate().ToString();
- case NYdb::EPrimitiveType::Datetime:
- return valueParser.GetDatetime().ToString();
- case NYdb::EPrimitiveType::Timestamp:
- return valueParser.GetTimestamp().ToString();
- case NYdb::EPrimitiveType::Interval:
- return TStringBuilder() << valueParser.GetInterval();
- case NYdb::EPrimitiveType::TzDate:
- return valueParser.GetTzDate();
- case NYdb::EPrimitiveType::TzDatetime:
- return valueParser.GetTzDatetime();
- case NYdb::EPrimitiveType::TzTimestamp:
- return valueParser.GetTzTimestamp();
- case NYdb::EPrimitiveType::String:
- return Base64Encode(valueParser.GetString());
- case NYdb::EPrimitiveType::Yson:
- return valueParser.GetYson();
- case NYdb::EPrimitiveType::Json:
- return valueParser.GetJson();
- case NYdb::EPrimitiveType::JsonDocument:
- return valueParser.GetJsonDocument();
- case NYdb::EPrimitiveType::DyNumber:
- return valueParser.GetDyNumber();
- case NYdb::EPrimitiveType::Uuid:
- return {};
- }
- return {};
- }
-
- static TString ColumnValueToString(NYdb::TValueParser& valueParser) {
- switch (valueParser.GetKind()) {
- case NYdb::TTypeParser::ETypeKind::Primitive:
- return ColumnPrimitiveValueToString(valueParser);
- case NYdb::TTypeParser::ETypeKind::Optional: {
- TString value;
- valueParser.OpenOptional();
- if (valueParser.IsNull()) {
- value = "NULL";
- } else {
- value = ColumnValueToString(valueParser);
- }
- valueParser.CloseOptional();
- return value;
- }
- case NYdb::TTypeParser::ETypeKind::Tuple: {
- TString value;
- valueParser.OpenTuple();
- while (valueParser.TryNextElement()) {
- if (!value.empty()) {
- value += ',';
- }
- value += ColumnValueToString(valueParser);
- }
- valueParser.CloseTuple();
- return value;
- }
- case NYdb::TTypeParser::ETypeKind::Pg: {
- return valueParser.GetPg().Content_;
- }
- default:
- return {};
- }
- }
-
- static uint32_t GetPgOidFromYdbType(NYdb::TType type) {
- NYdb::TTypeParser parser(type);
- switch (parser.GetKind()) {
- case NYdb::TTypeParser::ETypeKind::Pg: {
- return parser.GetPg().Oid;
- default:
- return {};
- }
- }
- }
-
- TString ToPgSyntax(TStringBuf query) {
- auto itOptions = ConnectionParams.find("options");
- if (itOptions == ConnectionParams.end()) {
- return TStringBuilder() << "--!syntax_pg\n" << query; // default
- }
- return TStringBuilder() << "--!" << itOptions->second << "\n" << query;
- }
-
- static NYdb::NScripting::TExecuteYqlResult ConvertProtoResponseToSdkResult(Ydb::Scripting::ExecuteYqlResponse&& proto) {
- TVector<NYdb::TResultSet> res;
- TMaybe<NYdb::NTable::TQueryStats> queryStats;
- {
- Ydb::Scripting::ExecuteYqlResult result;
- proto.mutable_operation()->mutable_result()->UnpackTo(&result);
- for (int i = 0; i < result.result_sets_size(); i++) {
- res.emplace_back(std::move(*result.mutable_result_sets(i)));
- }
- if (result.has_query_stats()) {
- queryStats = NYdb::NTable::TQueryStats(std::move(*result.mutable_query_stats()));
- }
- }
- NYdb::TPlainStatus alwaysSuccess;
- return {NYdb::TStatus(std::move(alwaysSuccess)), std::move(res), queryStats};
- }
-
void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) {
BLOG_D("TEvQuery " << ev->Sender);
- auto query(ev->Get()->Message->GetQuery());
- if (IsQueryEmpty(query)) {
- auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
- response->EmptyQuery = true;
- Send(ev->Sender, response.release(), 0, ev->Cookie);
- return;
- }
- Ydb::Scripting::ExecuteYqlRequest request;
- request.set_script(ToPgSyntax(query));
- TString database;
- if (ConnectionParams.count("database")) {
- database = ConnectionParams["database"];
- }
- TString token;
- if (ConnectionParams.count("ydb-serialized-token")) {
- token = ConnectionParams["ydb-serialized-token"];
- }
- using TRpcEv = NGRpcService::TGrpcRequestOperationCall<Ydb::Scripting::ExecuteYqlRequest, Ydb::Scripting::ExecuteYqlResponse>;
- TActorSystem* actorSystem = TActivationContext::ActorSystem();
- auto rpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), database, token, actorSystem);
- rpcFuture.Subscribe([actorSystem, ev](NThreading::TFuture<Ydb::Scripting::ExecuteYqlResponse> future) {
- auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
- // HACK
- if (ev->Get()->Message->GetQuery().starts_with("BEGIN")) {
- response->Tag = "BEGIN";
- response->TransactionStatus = 'T';
- }
- // HACK
- try {
- Ydb::Scripting::ExecuteYqlResponse yqlResponse(future.ExtractValueSync());
- if (yqlResponse.has_operation()) {
- if (yqlResponse.operation().status() == Ydb::StatusIds::SUCCESS) {
- if (yqlResponse.operation().has_result()) {
- NYdb::NScripting::TExecuteYqlResult result(ConvertProtoResponseToSdkResult(std::move(yqlResponse)));
- if (result.IsSuccess()) {
- const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets();
- if (!resultSets.empty()) {
- NYdb::TResultSet resultSet = resultSets[0];
-
- {
- for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
- // TODO: fill data types and sizes
- response->DataFields.push_back({
- .Name = column.Name,
- .DataType = GetPgOidFromYdbType(column.Type),
- });
- }
- }
-
- {
- NYdb::TResultSetParser parser(std::move(resultSet));
- while (parser.TryNextRow()) {
- response->DataRows.emplace_back();
- auto& row = response->DataRows.back();
- row.resize(parser.ColumnsCount());
- for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
- row[index] = ColumnValueToString(parser.ColumnParser(index));
- }
- }
- }
-
- // HACK
- response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size();
- // HACK
- }
- }
- }
- } else {
- NYql::TIssues issues;
- NYql::IssuesFromMessage(yqlResponse.operation().issues(), issues);
- NYdb::TStatus status(NYdb::EStatus(yqlResponse.operation().status()), std::move(issues));
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', TStringBuilder() << status});
- }
- } else {
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', "No result received"});
- }
- }
- catch (const std::exception& e) {
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', e.what()});
- }
-
- actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie);
- });
+ IActor* actor = CreatePgwireKqpProxy(ev->Sender, ev->Cookie, ConnectionParams);
+ TActorId actorId = Register(actor);
+ BLOG_D("Created pgwireKqpProxy: " << actorId);
+ Forward(ev, actorId);
}
void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) {
@@ -298,55 +81,6 @@ public:
Send(ev->Sender, closeComplete.release());
}
- struct TConvertedQuery {
- TString Query;
- NYdb::TParams Params;
- };
-
- TConvertedQuery ConvertQuery(const TParsedStatement& statement) {
- auto& bindData = statement.BindData;
- const auto& queryData = statement.QueryData;
- NYdb::TParamsBuilder paramsBuilder;
- TStringBuilder injectedQuery;
-
- for (size_t idxParam = 0; idxParam < queryData.ParametersTypes.size(); ++idxParam) {
- int32_t paramType = queryData.ParametersTypes[idxParam];
- TString paramValue;
- if (idxParam < bindData.ParametersValue.size()) {
- std::vector<uint8_t> paramVal = bindData.ParametersValue[idxParam];
- paramValue = TString(reinterpret_cast<char*>(paramVal.data()), paramVal.size());
- }
- switch (paramType) {
- case INT2OID:
- paramsBuilder.AddParam(TStringBuilder() << ":_" << idxParam + 1).Int16(atoi(paramValue.data())).Build();
- break;
-
- }
- }
- return {
- .Query = injectedQuery + queryData.Query,
- .Params = paramsBuilder.Build(),
- };
- }
-
- inline static bool IsWhitespaceASCII(char c)
- {
- return c == ' ' || c == '\t' || c == '\n' || c == '\r' || c == '\f' || c == '\v';
- }
-
- static bool IsWhitespace(TStringBuf query) {
- for (char c : query) {
- if (!IsWhitespaceASCII(c)) {
- return false;
- }
- }
- return true;
- }
-
- static bool IsQueryEmpty(TStringBuf query) {
- return IsWhitespace(query);
- }
-
void Handle(NPG::TEvPGEvents::TEvDescribe::TPtr& ev) {
BLOG_D("TEvDescribe " << ev->Sender);
@@ -367,7 +101,7 @@ public:
TActorSystem* actorSystem = TActivationContext::ActorSystem();
auto query = ConvertQuery(it->second);
Ydb::Scripting::ExecuteYqlRequest request;
- request.set_script(ToPgSyntax(query.Query));
+ request.set_script(ToPgSyntax(query.Query, ConnectionParams));
// TODO:
//request.set_parameters(query.Params);
TString database;
@@ -457,7 +191,7 @@ public:
TActorSystem* actorSystem = TActivationContext::ActorSystem();
auto query = ConvertQuery(it->second);
Ydb::Scripting::ExecuteYqlRequest request;
- request.set_script(ToPgSyntax(query.Query));
+ request.set_script(ToPgSyntax(query.Query, ConnectionParams));
// TODO:
//request.set_parameters(query.Params);
TString database;
diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h
new file mode 100644
index 0000000000..110996c181
--- /dev/null
+++ b/ydb/core/local_pgwire/local_pgwire_util.h
@@ -0,0 +1,217 @@
+#pragma once
+#include <library/cpp/string_utils/base64/base64.h>
+
+#include <ydb/core/pgproxy/pg_proxy_types.h>
+
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+
+#include <ydb/public/sdk/cpp/client/draft/ydb_scripting.h>
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/plain_status/status.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/operation/operation.h>
+#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
+#include <ydb/public/api/grpc/ydb_scripting_v1.grpc.pb.h>
+
+#include <util/string/builder.h>
+#include <util/generic/string.h>
+
+// temporary borrowed from postgresql/src/backend/catalog/pg_type_d.h
+
+#define BOOLOID 16
+#define BYTEAOID 17
+#define CHAROID 18
+#define NAMEOID 19
+#define INT8OID 20
+#define INT2OID 21
+#define INT2VECTOROID 22
+#define INT4OID 23
+#define REGPROCOID 24
+#define TEXTOID 25
+
+//
+
+namespace NLocalPgWire {
+
+struct TParsedStatement {
+ NPG::TPGParse::TQueryData QueryData;
+ NPG::TPGBind::TBindData BindData;
+};
+
+inline TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser) {
+ switch (valueParser.GetPrimitiveType()) {
+ case NYdb::EPrimitiveType::Bool:
+ return TStringBuilder() << valueParser.GetBool();
+ case NYdb::EPrimitiveType::Int8:
+ return TStringBuilder() << valueParser.GetInt8();
+ case NYdb::EPrimitiveType::Uint8:
+ return TStringBuilder() << valueParser.GetUint8();
+ case NYdb::EPrimitiveType::Int16:
+ return TStringBuilder() << valueParser.GetInt16();
+ case NYdb::EPrimitiveType::Uint16:
+ return TStringBuilder() << valueParser.GetUint16();
+ case NYdb::EPrimitiveType::Int32:
+ return TStringBuilder() << valueParser.GetInt32();
+ case NYdb::EPrimitiveType::Uint32:
+ return TStringBuilder() << valueParser.GetUint32();
+ case NYdb::EPrimitiveType::Int64:
+ return TStringBuilder() << valueParser.GetInt64();
+ case NYdb::EPrimitiveType::Uint64:
+ return TStringBuilder() << valueParser.GetUint64();
+ case NYdb::EPrimitiveType::Float:
+ return TStringBuilder() << valueParser.GetFloat();
+ case NYdb::EPrimitiveType::Double:
+ return TStringBuilder() << valueParser.GetDouble();
+ case NYdb::EPrimitiveType::Utf8:
+ return TStringBuilder() << valueParser.GetUtf8();
+ case NYdb::EPrimitiveType::Date:
+ return valueParser.GetDate().ToString();
+ case NYdb::EPrimitiveType::Datetime:
+ return valueParser.GetDatetime().ToString();
+ case NYdb::EPrimitiveType::Timestamp:
+ return valueParser.GetTimestamp().ToString();
+ case NYdb::EPrimitiveType::Interval:
+ return TStringBuilder() << valueParser.GetInterval();
+ case NYdb::EPrimitiveType::TzDate:
+ return valueParser.GetTzDate();
+ case NYdb::EPrimitiveType::TzDatetime:
+ return valueParser.GetTzDatetime();
+ case NYdb::EPrimitiveType::TzTimestamp:
+ return valueParser.GetTzTimestamp();
+ case NYdb::EPrimitiveType::String:
+ return Base64Encode(valueParser.GetString());
+ case NYdb::EPrimitiveType::Yson:
+ return valueParser.GetYson();
+ case NYdb::EPrimitiveType::Json:
+ return valueParser.GetJson();
+ case NYdb::EPrimitiveType::JsonDocument:
+ return valueParser.GetJsonDocument();
+ case NYdb::EPrimitiveType::DyNumber:
+ return valueParser.GetDyNumber();
+ case NYdb::EPrimitiveType::Uuid:
+ return {};
+ }
+ return {};
+}
+
+inline TString ColumnValueToString(NYdb::TValueParser& valueParser) {
+ switch (valueParser.GetKind()) {
+ case NYdb::TTypeParser::ETypeKind::Primitive:
+ return ColumnPrimitiveValueToString(valueParser);
+ case NYdb::TTypeParser::ETypeKind::Optional: {
+ TString value;
+ valueParser.OpenOptional();
+ if (valueParser.IsNull()) {
+ value = "NULL";
+ } else {
+ value = ColumnValueToString(valueParser);
+ }
+ valueParser.CloseOptional();
+ return value;
+ }
+ case NYdb::TTypeParser::ETypeKind::Tuple: {
+ TString value;
+ valueParser.OpenTuple();
+ while (valueParser.TryNextElement()) {
+ if (!value.empty()) {
+ value += ',';
+ }
+ value += ColumnValueToString(valueParser);
+ }
+ valueParser.CloseTuple();
+ return value;
+ }
+ case NYdb::TTypeParser::ETypeKind::Pg: {
+ return valueParser.GetPg().Content_;
+ }
+ default:
+ return {};
+ }
+}
+
+inline uint32_t GetPgOidFromYdbType(NYdb::TType type) {
+ NYdb::TTypeParser parser(type);
+ switch (parser.GetKind()) {
+ case NYdb::TTypeParser::ETypeKind::Pg: {
+ return parser.GetPg().Oid;
+ default:
+ return {};
+ }
+ }
+}
+
+inline TString ToPgSyntax(TStringBuf query, const std::unordered_map<TString, TString>& ConnectionParams) {
+ auto itOptions = ConnectionParams.find("options");
+ if (itOptions == ConnectionParams.end()) {
+ return TStringBuilder() << "--!syntax_pg\n" << query; // default
+ }
+ return TStringBuilder() << "--!" << itOptions->second << "\n" << query;
+}
+
+inline NYdb::NScripting::TExecuteYqlResult ConvertProtoResponseToSdkResult(Ydb::Scripting::ExecuteYqlResponse&& proto) {
+ TVector<NYdb::TResultSet> res;
+ TMaybe<NYdb::NTable::TQueryStats> queryStats;
+ {
+ Ydb::Scripting::ExecuteYqlResult result;
+ proto.mutable_operation()->mutable_result()->UnpackTo(&result);
+ for (int i = 0; i < result.result_sets_size(); i++) {
+ res.emplace_back(std::move(*result.mutable_result_sets(i)));
+ }
+ if (result.has_query_stats()) {
+ queryStats = NYdb::NTable::TQueryStats(std::move(*result.mutable_query_stats()));
+ }
+ }
+ NYdb::TPlainStatus alwaysSuccess;
+ return {NYdb::TStatus(std::move(alwaysSuccess)), std::move(res), queryStats};
+}
+
+struct TConvertedQuery {
+ TString Query;
+ NYdb::TParams Params;
+};
+
+inline TConvertedQuery ConvertQuery(const TParsedStatement& statement) {
+ auto& bindData = statement.BindData;
+ const auto& queryData = statement.QueryData;
+ NYdb::TParamsBuilder paramsBuilder;
+ TStringBuilder injectedQuery;
+
+ for (size_t idxParam = 0; idxParam < queryData.ParametersTypes.size(); ++idxParam) {
+ int32_t paramType = queryData.ParametersTypes[idxParam];
+ TString paramValue;
+ if (idxParam < bindData.ParametersValue.size()) {
+ std::vector<uint8_t> paramVal = bindData.ParametersValue[idxParam];
+ paramValue = TString(reinterpret_cast<char*>(paramVal.data()), paramVal.size());
+ }
+ switch (paramType) {
+ case INT2OID:
+ paramsBuilder.AddParam(TStringBuilder() << ":_" << idxParam + 1).Int16(atoi(paramValue.data())).Build();
+ break;
+
+ }
+ }
+ return {
+ .Query = injectedQuery + queryData.Query,
+ .Params = paramsBuilder.Build(),
+ };
+}
+
+inline bool IsWhitespaceASCII(char c)
+{
+ return c == ' ' || c == '\t' || c == '\n' || c == '\r' || c == '\f' || c == '\v';
+}
+
+inline bool IsWhitespace(TStringBuf query) {
+ for (char c : query) {
+ if (!IsWhitespaceASCII(c)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+inline bool IsQueryEmpty(TStringBuf query) {
+ return IsWhitespace(query);
+}
+
+} //namespace NLocalPgWire
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
new file mode 100644
index 0000000000..bce7d9f044
--- /dev/null
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
@@ -0,0 +1,181 @@
+#include "log_impl.h"
+#include "local_pgwire_util.h"
+#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/kqp/common/events/events.h>
+#include <ydb/core/kqp/common/simple/services.h>
+#include <ydb/core/kqp/executer_actor/kqp_executer.h>
+#include <ydb/core/pgproxy/pg_proxy_events.h>
+#include <ydb/core/pgproxy/pg_proxy_types.h>
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+
+namespace NLocalPgWire {
+
+using namespace NActors;
+using namespace NKikimr;
+
+class TPgwireKqpProxy : public TActorBootstrapped<TPgwireKqpProxy> {
+ using TBase = TActorBootstrapped<TPgwireKqpProxy>;
+
+ NActors::TActorId RequestActorId_;
+ ui64 RequestCookie_;
+ bool InTransaction_;
+ std::unordered_map<TString, TString> ConnectionParams_;
+ TMap<ui32, NYdb::TResultSet> ResultSets_;
+public:
+ TPgwireKqpProxy(NActors::TActorId actorId, ui64 cookie, std::unordered_map<TString, TString> params)
+ : RequestActorId_(actorId)
+ , RequestCookie_(cookie)
+ , ConnectionParams_(std::move(params))
+ {}
+
+ void Bootstrap() {
+ Become(&TPgwireKqpProxy::StateWork);
+ }
+
+ void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
+ NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet()));
+ ResultSets_.emplace(ev->Get()->Record.GetQueryResultIndex(), resultSet);
+
+ BLOG_D(this->SelfId() << "Send stream data ack"
+ << ", to: " << ev->Sender);
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(std::numeric_limits<ui64>::max());
+ Send(ev->Sender, resp.Release());
+ }
+
+ void FillResultSet(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvQueryResponse* response) {
+ {
+ for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
+ // TODO: fill data types and sizes
+ response->DataFields.push_back({
+ .Name = column.Name,
+ .DataType = GetPgOidFromYdbType(column.Type),
+ // .DataTypeSize = column.Type.GetProto().Getpg_type().Gettyplen()
+ });
+ }
+ }
+ {
+ NYdb::TResultSetParser parser(std::move(resultSet));
+ while (parser.TryNextRow()) {
+ response->DataRows.emplace_back();
+ auto& row = response->DataRows.back();
+ row.resize(parser.ColumnsCount());
+ for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
+ row[index] = ColumnValueToString(parser.ColumnParser(index));
+ }
+ }
+ }
+ }
+
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
+ BLOG_D("Handling TEvKqp::TEvQueryResponse");
+ NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
+
+ auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
+ // HACK
+ if (InTransaction_) {
+ response->Tag = "BEGIN";
+ response->TransactionStatus = 'T';
+ }
+ try {
+ if (record.HasYdbStatus()) {
+ if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ auto noScript = ConnectionParams_.find("no-script") != ConnectionParams_.end();
+ if (noScript) {
+ Y_ENSURE(record.GetResponse().GetYdbResults().empty());
+ if (!ResultSets_.empty()) {
+ FillResultSet(ResultSets_.begin()->second, response.get());
+ }
+ } else {
+ Y_ENSURE(ResultSets_.empty());
+ auto results = record.GetResponse().GetResults();
+ if (!results.empty()) {
+ auto ydbResult = record.MutableResponse()->MutableYdbResults()->Add();
+ NKqp::ConvertKqpQueryResultToDbResult(results.at(0), ydbResult);
+ FillResultSet(*ydbResult, response.get());
+ }
+ }
+
+ // HACK
+ response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size();
+ // HACK
+ } else {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
+ NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues));
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << status});
+ }
+ } else {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', "No result received"});
+ }
+ } catch (const std::exception& e) {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', e.what()});
+ }
+ BLOG_D("Finally replying to " << RequestActorId_);
+ Send(RequestActorId_, response.release(), 0, RequestCookie_);
+ //PassAway();
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) {
+ BLOG_D("TEvQuery, sender: " << ev->Sender << " , self: " << SelfId());
+ auto query(ev->Get()->Message->GetQuery());
+ if (IsQueryEmpty(query)) {
+ auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
+ response->EmptyQuery = true;
+ Send(ev->Sender, response.release(), 0, ev->Cookie);
+ return;
+ }
+ TString database;
+ if (ConnectionParams_.count("database")) {
+ database = ConnectionParams_["database"];
+ }
+ TString token;
+ if (ConnectionParams_.count("ydb-serialized-token")) {
+ token = ConnectionParams_["ydb-serialized-token"];
+ }
+
+ auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
+ request.SetQuery(ToPgSyntax(query, ConnectionParams_));
+ request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ auto noScript = ConnectionParams_.find("no-script");
+ if (noScript == ConnectionParams_.end()) {
+ request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT);
+ } else {
+ request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
+ request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.MutableTxControl()->set_commit_tx(true);
+ }
+ request.SetKeepSession(false);
+ request.SetDatabase(database);
+ event->Record.SetUserToken(token);
+ InTransaction_ = ev->Get()->Message->GetQuery().starts_with("BEGIN");
+ ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
+ BLOG_D("Sent event to kqpProxy, RequestActorId = " << RequestActorId_ << ", self: " << SelfId());
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NPG::TEvPGEvents::TEvQuery, Handle);
+ hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
+ hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
+ }
+ }
+};
+
+
+NActors::IActor* CreatePgwireKqpProxy(NActors::TActorId actorId, ui64 cookie, std::unordered_map<TString, TString> params) {
+ return new TPgwireKqpProxy(std::move(actorId), std::move(cookie), std::move(params));
+}
+
+} //namespace NLocalPgwire
diff --git a/ydb/tests/functional/postgresql/test_postgres.py b/ydb/tests/functional/postgresql/test_postgres.py
index 494fbee5ed..e95d39f556 100644
--- a/ydb/tests/functional/postgresql/test_postgres.py
+++ b/ydb/tests/functional/postgresql/test_postgres.py
@@ -76,7 +76,8 @@ class BasePostgresTest(object):
'LOCAL_PGWIRE': LogLevels.DEBUG,
'KQP_YQL': LogLevels.DEBUG,
'KQP_COMPILE_ACTOR': LogLevels.DEBUG,
- 'KQP_COMPILE_REQUEST': LogLevels.DEBUG
+ 'KQP_COMPILE_REQUEST': LogLevels.DEBUG,
+ 'KQP_PROXY': LogLevels.DEBUG
}
))
cls.cluster.start()