diff options
author | xenoxeno <xeno@ydb.tech> | 2023-05-30 11:53:41 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-05-30 11:53:41 +0300 |
commit | df23a796955e5802312e57e12cc4be84358df127 (patch) | |
tree | 0b11d315ce400c3b144a34b5f613959c9705c732 | |
parent | 2cc99c035541be550db2d59a0465c05d374b03d0 (diff) | |
download | ydb-df23a796955e5802312e57e12cc4be84358df127.tar.gz |
cleanup a little and add a streaming behavior to query
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_connection.cpp | 29 | ||||
-rw-r--r-- | ydb/core/local_pgwire/pgwire_kqp_proxy.cpp | 197 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_connection.cpp | 9 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_proxy_events.h | 1 |
4 files changed, 201 insertions, 35 deletions
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp index 5c72000bf6..d7f83922e8 100644 --- a/ydb/core/local_pgwire/local_pgwire_connection.cpp +++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp @@ -21,34 +21,37 @@ using namespace NActors; using namespace NKikimr; extern NActors::IActor* CreatePgwireKqpProxy( - NActors::TActorId actorId, - ui64 cookie, std::unordered_map<TString, TString> params ); -class TPgYdbConnection : public TActorBootstrapped<TPgYdbConnection> { - using TBase = TActorBootstrapped<TPgYdbConnection>; +NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery); + +class TPgYdbConnection : public TActor<TPgYdbConnection> { + using TBase = TActor<TPgYdbConnection>; std::unordered_map<TString, TString> ConnectionParams; std::unordered_map<TString, TParsedStatement> ParsedStatements; TString CurrentStatement; Ydb::StatusIds QueryStatus; std::unordered_map<ui32, NYdb::TResultSet> ResultSets; + public: TPgYdbConnection(std::unordered_map<TString, TString> params) - : ConnectionParams(std::move(params)) + : TActor<TPgYdbConnection>(&TPgYdbConnection::StateWork) + , ConnectionParams(std::move(params)) {} - void Bootstrap() { - Become(&TPgYdbConnection::StateWork); - } - void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) { BLOG_D("TEvQuery " << ev->Sender); - IActor* actor = CreatePgwireKqpProxy(ev->Sender, ev->Cookie, ConnectionParams); - TActorId actorId = Register(actor); - BLOG_D("Created pgwireKqpProxy: " << actorId); - Forward(ev, actorId); + if (IsQueryEmpty(ev->Get()->Message->GetQuery())) { + auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); + response->EmptyQuery = true; + Send(ev->Sender, response.release(), 0, ev->Cookie); + return; + } + + TActorId actorId = Register(CreatePgwireKqpProxyQuery(ConnectionParams, std::move(ev))); + BLOG_D("Created pgwireKqpProxyQuery: " << actorId); } void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) { diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp index bce7d9f044..a2bf13a243 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -17,8 +17,8 @@ namespace NLocalPgWire { using namespace NActors; using namespace NKikimr; -class TPgwireKqpProxy : public TActorBootstrapped<TPgwireKqpProxy> { - using TBase = TActorBootstrapped<TPgwireKqpProxy>; +class TPgwireKqpProxy : public TActor<TPgwireKqpProxy> { + using TBase = TActor<TPgwireKqpProxy>; NActors::TActorId RequestActorId_; ui64 RequestCookie_; @@ -26,16 +26,11 @@ class TPgwireKqpProxy : public TActorBootstrapped<TPgwireKqpProxy> { std::unordered_map<TString, TString> ConnectionParams_; TMap<ui32, NYdb::TResultSet> ResultSets_; public: - TPgwireKqpProxy(NActors::TActorId actorId, ui64 cookie, std::unordered_map<TString, TString> params) - : RequestActorId_(actorId) - , RequestCookie_(cookie) + TPgwireKqpProxy(std::unordered_map<TString, TString> params) + : TActor<TPgwireKqpProxy>(&TPgwireKqpProxy::StateWork) , ConnectionParams_(std::move(params)) {} - void Bootstrap() { - Become(&TPgwireKqpProxy::StateWork); - } - void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet())); ResultSets_.emplace(ev->Get()->Record.GetQueryResultIndex(), resultSet); @@ -122,18 +117,14 @@ public: } BLOG_D("Finally replying to " << RequestActorId_); Send(RequestActorId_, response.release(), 0, RequestCookie_); - //PassAway(); + PassAway(); } void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) { BLOG_D("TEvQuery, sender: " << ev->Sender << " , self: " << SelfId()); + RequestActorId_ = ev->Sender; + RequestCookie_ = ev->Cookie; auto query(ev->Get()->Message->GetQuery()); - if (IsQueryEmpty(query)) { - auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); - response->EmptyQuery = true; - Send(ev->Sender, response.release(), 0, ev->Cookie); - return; - } TString database; if (ConnectionParams_.count("database")) { database = ConnectionParams_["database"]; @@ -158,7 +149,7 @@ public: request.SetKeepSession(false); request.SetDatabase(database); event->Record.SetUserToken(token); - InTransaction_ = ev->Get()->Message->GetQuery().starts_with("BEGIN"); + InTransaction_ = query.starts_with("BEGIN"); ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); BLOG_D("Sent event to kqpProxy, RequestActorId = " << RequestActorId_ << ", self: " << SelfId()); Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); @@ -173,9 +164,177 @@ public: } }; +class TPgwireKqpProxyQuery : public TActorBootstrapped<TPgwireKqpProxyQuery> { + using TBase = TActorBootstrapped<TPgwireKqpProxyQuery>; + + std::unordered_map<TString, TString> ConnectionParams_; + NPG::TEvPGEvents::TEvQuery::TPtr EventQuery_; + bool InTransaction_ = false; + bool WasMeta_ = false; + +public: + TPgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery) + : ConnectionParams_(std::move(params)) + , EventQuery_(std::move(evQuery)) + {} + + void Bootstrap() { + auto query(EventQuery_->Get()->Message->GetQuery()); + TString database; + if (ConnectionParams_.count("database")) { + database = ConnectionParams_["database"]; + } + TString token; + if (ConnectionParams_.count("ydb-serialized-token")) { + token = ConnectionParams_["ydb-serialized-token"]; + } + auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); + request.SetQuery(ToPgSyntax(query, ConnectionParams_)); + request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + auto noScript = ConnectionParams_.find("no-script"); + if (noScript == ConnectionParams_.end()) { + request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT); + } else { + request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY); + request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + request.MutableTxControl()->set_commit_tx(true); + } + request.SetKeepSession(false); + request.SetDatabase(database); + event->Record.SetUserToken(token); + + // HACK + InTransaction_ = query.starts_with("BEGIN"); + + ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); + BLOG_D("Sent event to kqpProxy, RequestActorId = " << EventQuery_->Sender << ", self: " << SelfId()); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); + + // TODO(xenoxeno): timeout + Become(&TPgwireKqpProxyQuery::StateWork); + } + + void FillMeta(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvQueryResponse* response) { + for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { + // TODO: fill data sizes + response->DataFields.push_back({ + .Name = column.Name, + .DataType = GetPgOidFromYdbType(column.Type), + // .DataTypeSize = column.Type.GetProto().Getpg_type().Gettyplen() + }); + } + } + + void FillResultSet(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvQueryResponse* response) { + NYdb::TResultSetParser parser(std::move(resultSet)); + while (parser.TryNextRow()) { + response->DataRows.emplace_back(); + auto& row = response->DataRows.back(); + row.resize(parser.ColumnsCount()); + for (size_t index = 0; index < parser.ColumnsCount(); ++index) { + row[index] = ColumnValueToString(parser.ColumnParser(index)); + } + } + } + + std::unique_ptr<NPG::TEvPGEvents::TEvQueryResponse> MakeResponse() { + auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); + + // HACK + if (InTransaction_) { + response->Tag = "BEGIN"; + response->TransactionStatus = 'T'; + } + + return response; + } + + void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { + NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet())); + auto response = MakeResponse(); + if (!WasMeta_) { + FillMeta(resultSet, response.get()); + WasMeta_ = true; + } + FillResultSet(resultSet, response.get()); + response->CommandCompleted = false; + + // HACK + if (response->DataRows.size() > 0) { + response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size(); + } + + BLOG_D(this->SelfId() << "Send rowset data (" << ev->Get()->Record.GetSeqNo() << ") to: " << EventQuery_->Sender); + Send(EventQuery_->Sender, response.release(), 0, EventQuery_->Cookie); + + BLOG_D(this->SelfId() << "Send stream data ack to: " << ev->Sender); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + resp->Record.SetFreeSpace(std::numeric_limits<ui64>::max()); + Send(ev->Sender, resp.Release()); + } + + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { + BLOG_D("Handling TEvKqp::TEvQueryResponse"); + NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); + + auto response = MakeResponse(); + try { + if (record.HasYdbStatus()) { + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + auto results = record.GetResponse().GetResults(); + if (!results.empty()) { + auto resultSet = record.MutableResponse()->MutableYdbResults()->Add(); + NKqp::ConvertKqpQueryResultToDbResult(results.at(0), resultSet); + if (!WasMeta_) { + FillMeta(*resultSet, response.get()); + WasMeta_ = true; + } + FillResultSet(*resultSet, response.get()); + } + + // HACK + if (response->DataRows.size() > 0) { + response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size(); + } + + } else { + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); + NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues)); + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << status}); + } + } else { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', "No result received"}); + } + } catch (const std::exception& e) { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', e.what()}); + } + response->CommandCompleted = true; + BLOG_D("Finally replying to " << EventQuery_->Sender); + Send(EventQuery_->Sender, response.release(), 0, EventQuery_->Cookie); + PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); + hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); + } + } +}; + + +NActors::IActor* CreatePgwireKqpProxy(std::unordered_map<TString, TString> params) { + return new TPgwireKqpProxy(std::move(params)); +} -NActors::IActor* CreatePgwireKqpProxy(NActors::TActorId actorId, ui64 cookie, std::unordered_map<TString, TString> params) { - return new TPgwireKqpProxy(std::move(actorId), std::move(cookie), std::move(params)); +NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery) { + return new TPgwireKqpProxyQuery(std::move(params), std::move(evQuery)); } } //namespace NLocalPgwire diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index c895aa866d..002d200a40 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -478,7 +478,6 @@ protected: if (ev->Get()->EmptyQuery) { SendMessage(TPGEmptyQueryResponse()); } else { - TString tag = ev->Get()->Tag ? ev->Get()->Tag : "OK"; if (!ev->Get()->DataFields.empty()) { // rowDescription TPGStreamOutput<TPGRowDescription> rowDescription; rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields @@ -505,7 +504,9 @@ protected: SendStream(dataRow); } } - { // commandComplete + if (ev->Get()->CommandCompleted) { + // commandComplete + TString tag = ev->Get()->Tag ? ev->Get()->Tag : "OK"; TPGStreamOutput<TPGCommandComplete> commandComplete; commandComplete << tag << '\0'; SendStream(commandComplete); @@ -520,7 +521,9 @@ protected: errorResponse << '\0'; SendStream(errorResponse); } - BecomeReadyForQuery(); + if (ev->Get()->CommandCompleted) { + BecomeReadyForQuery(); + } } else { PostponeEvent(ev); } diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h index 6aaeea3045..0a777849d0 100644 --- a/ydb/core/pgproxy/pg_proxy_events.h +++ b/ydb/core/pgproxy/pg_proxy_events.h @@ -103,6 +103,7 @@ struct TEvPGEvents { std::vector<std::pair<char, TString>> ErrorFields; TString Tag; bool EmptyQuery = false; + bool CommandCompleted = true; char TransactionStatus = 0; }; |