diff options
author | xenoxeno <xeno@ydb.tech> | 2023-06-01 12:38:47 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-06-01 12:38:47 +0300 |
commit | a3032a835ebe50ddfcc04988a0a204cab4c38df3 (patch) | |
tree | 496b122dd3b1f1f012945d9d12359a5b9ca322c4 | |
parent | f2ced50b473da6e6fe8c49c1fac64450b9c68755 (diff) | |
download | ydb-a3032a835ebe50ddfcc04988a0a204cab4c38df3.tar.gz |
support describe using explain (parameter only)
-rw-r--r-- | ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_connection.cpp | 10 | ||||
-rw-r--r-- | ydb/core/local_pgwire/pgwire_kqp_proxy.cpp | 93 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_connection.cpp | 56 |
7 files changed, 139 insertions, 24 deletions
diff --git a/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt index bf8e444adf0..a602e5c6b51 100644 --- a/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/local_pgwire/CMakeLists.darwin-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC core-grpc_services-local_rpc ydb-core-protos ydb-core-pgproxy + ydb-core-ydb_convert api-grpc lib-operation_id-protos ) diff --git a/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt b/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt index 405972616ae..f1413b31aaf 100644 --- a/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt +++ b/ydb/core/local_pgwire/CMakeLists.linux-aarch64.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC core-grpc_services-local_rpc ydb-core-protos ydb-core-pgproxy + ydb-core-ydb_convert api-grpc lib-operation_id-protos ) diff --git a/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt index 405972616ae..f1413b31aaf 100644 --- a/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt +++ b/ydb/core/local_pgwire/CMakeLists.linux-x86_64.txt @@ -23,6 +23,7 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC core-grpc_services-local_rpc ydb-core-protos ydb-core-pgproxy + ydb-core-ydb_convert api-grpc lib-operation_id-protos ) diff --git a/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt b/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt index bf8e444adf0..a602e5c6b51 100644 --- a/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt +++ b/ydb/core/local_pgwire/CMakeLists.windows-x86_64.txt @@ -22,6 +22,7 @@ target_link_libraries(ydb-core-local_pgwire PUBLIC core-grpc_services-local_rpc ydb-core-protos ydb-core-pgproxy + ydb-core-ydb_convert api-grpc lib-operation_id-protos ) diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp index d7f83922e84..8efa1efa069 100644 --- a/ydb/core/local_pgwire/local_pgwire_connection.cpp +++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp @@ -25,6 +25,7 @@ extern NActors::IActor* CreatePgwireKqpProxy( ); NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery); +NActors::IActor* CreatePgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script); class TPgYdbConnection : public TActor<TPgYdbConnection> { using TBase = TActor<TPgYdbConnection>; @@ -101,6 +102,13 @@ public: return; } + auto query(ConvertQuery(it->second)); + auto script(ToPgSyntax(query.Query, ConnectionParams)); + + TActorId actorId = Register(CreatePgwireKqpProxyDescribe(ConnectionParams, std::move(ev), script)); + BLOG_D("Created pgwireKqpProxyQuery: " << actorId); + return; + /* TActorSystem* actorSystem = TActivationContext::ActorSystem(); auto query = ConvertQuery(it->second); Ydb::Scripting::ExecuteYqlRequest request; @@ -164,7 +172,7 @@ public: } actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie); - }); + });*/ } void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) { diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp index a2bf13a243d..35808996a2f 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -9,6 +9,7 @@ #define INCLUDE_YDB_INTERNAL_H #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/public/sdk/cpp/client/ydb_result/result.h> +#include <ydb/core/ydb_convert/ydb_convert.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -328,6 +329,94 @@ public: } }; +class TPgwireKqpProxyDescribe : public TActorBootstrapped<TPgwireKqpProxyDescribe> { + using TBase = TActorBootstrapped<TPgwireKqpProxyDescribe>; + + std::unordered_map<TString, TString> ConnectionParams_; + NPG::TEvPGEvents::TEvDescribe::TPtr EventDescribe_; + TString Script_; + +public: + TPgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script) + : ConnectionParams_(std::move(params)) + , EventDescribe_(std::move(evDescribe)) + , Script_(script) + {} + + void Bootstrap() { + auto query(Script_); + TString database; + if (ConnectionParams_.count("database")) { + database = ConnectionParams_["database"]; + } + TString token; + if (ConnectionParams_.count("ydb-serialized-token")) { + token = ConnectionParams_["ydb-serialized-token"]; + } + auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); + request.SetQuery(ToPgSyntax(query, ConnectionParams_)); + request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN); + auto noScript = ConnectionParams_.find("no-script"); + if (noScript == ConnectionParams_.end()) { + request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT); + } else { + request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY); + request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + request.MutableTxControl()->set_commit_tx(true); + } + request.SetKeepSession(false); + request.SetDatabase(database); + event->Record.SetUserToken(token); + + ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); + BLOG_D("Sent event to kqpProxy, RequestActorId = " << EventDescribe_->Sender << ", self: " << SelfId()); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); + + // TODO(xenoxeno): timeout + Become(&TPgwireKqpProxyDescribe::StateWork); + } + + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { + BLOG_D("Handling TEvKqp::TEvQueryResponse"); + NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); + + auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>(); + try { + if (record.HasYdbStatus()) { + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + for (const auto& param : record.GetResponse().GetQueryParameters()) { + Ydb::Type ydbType; + ConvertMiniKQLTypeToYdbType(param.GetType(), ydbType); + response->ParameterTypes.push_back(GetPgOidFromYdbType(ydbType)); + } + } else { + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); + NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues)); + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << status}); + } + } else { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', "No result received"}); + } + } catch (const std::exception& e) { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', e.what()}); + } + BLOG_D("Finally replying to " << EventDescribe_->Sender); + Send(EventDescribe_->Sender, response.release(), 0, EventDescribe_->Cookie); + PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); + } + } +}; + NActors::IActor* CreatePgwireKqpProxy(std::unordered_map<TString, TString> params) { return new TPgwireKqpProxy(std::move(params)); @@ -337,4 +426,8 @@ NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> return new TPgwireKqpProxyQuery(std::move(params), std::move(evQuery)); } +NActors::IActor* CreatePgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script) { + return new TPgwireKqpProxyDescribe(std::move(params), std::move(evDescribe), script); +} + } //namespace NLocalPgwire diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index 002d200a40e..d7faa6234d9 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -531,32 +531,42 @@ protected: void HandleConnected(TEvPGEvents::TEvDescribeResponse::TPtr& ev) { if (IsEventExpected(ev)) { - { // parameterDescription - TPGStreamOutput<TPGParameterDescription> parameterDescription; - parameterDescription << uint16_t(ev->Get()->ParameterTypes.size()); // number of fields - for (auto type : ev->Get()->ParameterTypes) { - parameterDescription << type; + if (ev->Get()->ErrorFields.empty()) { + { // parameterDescription + TPGStreamOutput<TPGParameterDescription> parameterDescription; + parameterDescription << uint16_t(ev->Get()->ParameterTypes.size()); // number of fields + for (auto type : ev->Get()->ParameterTypes) { + parameterDescription << type; + } + SendStream(parameterDescription); } - SendStream(parameterDescription); - } - if (ev->Get()->DataFields.size() > 0) { - // rowDescription - TPGStreamOutput<TPGRowDescription> rowDescription; - rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields - for (const auto& field : ev->Get()->DataFields) { - rowDescription - << TStringBuf(field.Name) << '\0' - << uint32_t(field.TableId) - << uint16_t(field.ColumnId) - << uint32_t(field.DataType) - << uint16_t(field.DataTypeSize) - << uint32_t(0xffffffff) // type modifier - << uint16_t(0) // format text - ; + if (ev->Get()->DataFields.size() > 0) { + // rowDescription + TPGStreamOutput<TPGRowDescription> rowDescription; + rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields + for (const auto& field : ev->Get()->DataFields) { + rowDescription + << TStringBuf(field.Name) << '\0' + << uint32_t(field.TableId) + << uint16_t(field.ColumnId) + << uint32_t(field.DataType) + << uint16_t(field.DataTypeSize) + << uint32_t(0xffffffff) // type modifier + << uint16_t(0) // format text + ; + } + SendStream(rowDescription); + } else { + SendMessage(TPGNoData()); } - SendStream(rowDescription); } else { - SendMessage(TPGNoData()); + // error response + TPGStreamOutput<TPGErrorResponse> errorResponse; + for (const auto& field : ev->Get()->ErrorFields) { + errorResponse << field.first << field.second << '\0'; + } + errorResponse << '\0'; + SendStream(errorResponse); } ++OutgoingSequenceNumber; BecomeReadyForQuery(); |