aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-06-01 12:38:47 +0300
committerxenoxeno <xeno@ydb.tech>2023-06-01 12:38:47 +0300
commita3032a835ebe50ddfcc04988a0a204cab4c38df3 (patch)
tree496b122dd3b1f1f012945d9d12359a5b9ca322c4
parentf2ced50b473da6e6fe8c49c1fac64450b9c68755 (diff)
downloadydb-a3032a835ebe50ddfcc04988a0a204cab4c38df3.tar.gz
support describe using explain (parameter only)
-rw-r--r--ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/local_pgwire/local_pgwire_connection.cpp10
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.cpp93
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp56
7 files changed, 139 insertions, 24 deletions
diff --git a/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt
index bf8e444adf0..a602e5c6b51 100644
--- a/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC
core-grpc_services-local_rpc
ydb-core-protos
ydb-core-pgproxy
+ ydb-core-ydb_convert
api-grpc
lib-operation_id-protos
)
diff --git a/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt b/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt
index 405972616ae..f1413b31aaf 100644
--- a/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC
core-grpc_services-local_rpc
ydb-core-protos
ydb-core-pgproxy
+ ydb-core-ydb_convert
api-grpc
lib-operation_id-protos
)
diff --git a/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt
index 405972616ae..f1413b31aaf 100644
--- a/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt
@@ -23,6 +23,7 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC
core-grpc_services-local_rpc
ydb-core-protos
ydb-core-pgproxy
+ ydb-core-ydb_convert
api-grpc
lib-operation_id-protos
)
diff --git a/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt
index bf8e444adf0..a602e5c6b51 100644
--- a/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt
@@ -22,6 +22,7 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC
core-grpc_services-local_rpc
ydb-core-protos
ydb-core-pgproxy
+ ydb-core-ydb_convert
api-grpc
lib-operation_id-protos
)
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp
index d7f83922e84..8efa1efa069 100644
--- a/ydb/core/local_pgwire/local_pgwire_connection.cpp
+++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp
@@ -25,6 +25,7 @@ extern NActors::IActor* CreatePgwireKqpProxy(
);
NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery);
+NActors::IActor* CreatePgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script);
class TPgYdbConnection : public TActor<TPgYdbConnection> {
using TBase = TActor<TPgYdbConnection>;
@@ -101,6 +102,13 @@ public:
return;
}
+ auto query(ConvertQuery(it->second));
+ auto script(ToPgSyntax(query.Query, ConnectionParams));
+
+ TActorId actorId = Register(CreatePgwireKqpProxyDescribe(ConnectionParams, std::move(ev), script));
+ BLOG_D("Created pgwireKqpProxyQuery: " << actorId);
+ return;
+ /*
TActorSystem* actorSystem = TActivationContext::ActorSystem();
auto query = ConvertQuery(it->second);
Ydb::Scripting::ExecuteYqlRequest request;
@@ -164,7 +172,7 @@ public:
}
actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie);
- });
+ });*/
}
void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) {
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
index a2bf13a243d..35808996a2f 100644
--- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
@@ -9,6 +9,7 @@
#define INCLUDE_YDB_INTERNAL_H
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
+#include <ydb/core/ydb_convert/ydb_convert.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -328,6 +329,94 @@ public:
}
};
+class TPgwireKqpProxyDescribe : public TActorBootstrapped<TPgwireKqpProxyDescribe> {
+ using TBase = TActorBootstrapped<TPgwireKqpProxyDescribe>;
+
+ std::unordered_map<TString, TString> ConnectionParams_;
+ NPG::TEvPGEvents::TEvDescribe::TPtr EventDescribe_;
+ TString Script_;
+
+public:
+ TPgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script)
+ : ConnectionParams_(std::move(params))
+ , EventDescribe_(std::move(evDescribe))
+ , Script_(script)
+ {}
+
+ void Bootstrap() {
+ auto query(Script_);
+ TString database;
+ if (ConnectionParams_.count("database")) {
+ database = ConnectionParams_["database"];
+ }
+ TString token;
+ if (ConnectionParams_.count("ydb-serialized-token")) {
+ token = ConnectionParams_["ydb-serialized-token"];
+ }
+ auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
+ request.SetQuery(ToPgSyntax(query, ConnectionParams_));
+ request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN);
+ auto noScript = ConnectionParams_.find("no-script");
+ if (noScript == ConnectionParams_.end()) {
+ request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT);
+ } else {
+ request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
+ request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.MutableTxControl()->set_commit_tx(true);
+ }
+ request.SetKeepSession(false);
+ request.SetDatabase(database);
+ event->Record.SetUserToken(token);
+
+ ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
+ BLOG_D("Sent event to kqpProxy, RequestActorId = " << EventDescribe_->Sender << ", self: " << SelfId());
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
+
+ // TODO(xenoxeno): timeout
+ Become(&TPgwireKqpProxyDescribe::StateWork);
+ }
+
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
+ BLOG_D("Handling TEvKqp::TEvQueryResponse");
+ NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
+
+ auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>();
+ try {
+ if (record.HasYdbStatus()) {
+ if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ for (const auto& param : record.GetResponse().GetQueryParameters()) {
+ Ydb::Type ydbType;
+ ConvertMiniKQLTypeToYdbType(param.GetType(), ydbType);
+ response->ParameterTypes.push_back(GetPgOidFromYdbType(ydbType));
+ }
+ } else {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
+ NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues));
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << status});
+ }
+ } else {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', "No result received"});
+ }
+ } catch (const std::exception& e) {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', e.what()});
+ }
+ BLOG_D("Finally replying to " << EventDescribe_->Sender);
+ Send(EventDescribe_->Sender, response.release(), 0, EventDescribe_->Cookie);
+ PassAway();
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
+ }
+ }
+};
+
NActors::IActor* CreatePgwireKqpProxy(std::unordered_map<TString, TString> params) {
return new TPgwireKqpProxy(std::move(params));
@@ -337,4 +426,8 @@ NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString>
return new TPgwireKqpProxyQuery(std::move(params), std::move(evQuery));
}
+NActors::IActor* CreatePgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script) {
+ return new TPgwireKqpProxyDescribe(std::move(params), std::move(evDescribe), script);
+}
+
} //namespace NLocalPgwire
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index 002d200a40e..d7faa6234d9 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -531,32 +531,42 @@ protected:
void HandleConnected(TEvPGEvents::TEvDescribeResponse::TPtr& ev) {
if (IsEventExpected(ev)) {
- { // parameterDescription
- TPGStreamOutput<TPGParameterDescription> parameterDescription;
- parameterDescription << uint16_t(ev->Get()->ParameterTypes.size()); // number of fields
- for (auto type : ev->Get()->ParameterTypes) {
- parameterDescription << type;
+ if (ev->Get()->ErrorFields.empty()) {
+ { // parameterDescription
+ TPGStreamOutput<TPGParameterDescription> parameterDescription;
+ parameterDescription << uint16_t(ev->Get()->ParameterTypes.size()); // number of fields
+ for (auto type : ev->Get()->ParameterTypes) {
+ parameterDescription << type;
+ }
+ SendStream(parameterDescription);
}
- SendStream(parameterDescription);
- }
- if (ev->Get()->DataFields.size() > 0) {
- // rowDescription
- TPGStreamOutput<TPGRowDescription> rowDescription;
- rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields
- for (const auto& field : ev->Get()->DataFields) {
- rowDescription
- << TStringBuf(field.Name) << '\0'
- << uint32_t(field.TableId)
- << uint16_t(field.ColumnId)
- << uint32_t(field.DataType)
- << uint16_t(field.DataTypeSize)
- << uint32_t(0xffffffff) // type modifier
- << uint16_t(0) // format text
- ;
+ if (ev->Get()->DataFields.size() > 0) {
+ // rowDescription
+ TPGStreamOutput<TPGRowDescription> rowDescription;
+ rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields
+ for (const auto& field : ev->Get()->DataFields) {
+ rowDescription
+ << TStringBuf(field.Name) << '\0'
+ << uint32_t(field.TableId)
+ << uint16_t(field.ColumnId)
+ << uint32_t(field.DataType)
+ << uint16_t(field.DataTypeSize)
+ << uint32_t(0xffffffff) // type modifier
+ << uint16_t(0) // format text
+ ;
+ }
+ SendStream(rowDescription);
+ } else {
+ SendMessage(TPGNoData());
}
- SendStream(rowDescription);
} else {
- SendMessage(TPGNoData());
+ // error response
+ TPGStreamOutput<TPGErrorResponse> errorResponse;
+ for (const auto& field : ev->Get()->ErrorFields) {
+ errorResponse << field.first << field.second << '\0';
+ }
+ errorResponse << '\0';
+ SendStream(errorResponse);
}
++OutgoingSequenceNumber;
BecomeReadyForQuery();