diff options
author | xenoxeno <xeno@ydb.tech> | 2023-06-13 14:59:52 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-06-13 14:59:52 +0300 |
commit | f93d754c16f6acf1898da5aa077ba730ebdab531 (patch) | |
tree | 218f9535afca1f149623ef2e1861a2d14f4c4f2d | |
parent | d2104865f13f6d5bc5f7817a4318288da51254c4 (diff) | |
download | ydb-f93d754c16f6acf1898da5aa077ba730ebdab531.tar.gz |
PR from branch users/xenoxeno/
rework transactions and sessions handling
add parsed execute actor
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_connection.cpp | 204 | ||||
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_util.h | 37 | ||||
-rw-r--r-- | ydb/core/local_pgwire/log_impl.h | 2 | ||||
-rw-r--r-- | ydb/core/local_pgwire/pgwire_kqp_proxy.cpp | 445 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_connection.cpp | 17 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_proxy_events.h | 10 |
6 files changed, 362 insertions, 353 deletions
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp index 8efa1efa069..9dcbaca5ac1 100644 --- a/ydb/core/local_pgwire/local_pgwire_connection.cpp +++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp @@ -24,8 +24,9 @@ extern NActors::IActor* CreatePgwireKqpProxy( std::unordered_map<TString, TString> params ); -NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery); -NActors::IActor* CreatePgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script); +NActors::IActor* CreatePgwireKqpProxyQuery(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery); +NActors::IActor* CreatePgwireKqpProxyDescribe(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TParsedStatement& statement); +NActors::IActor* CreatePgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, const TParsedStatement& statement); class TPgYdbConnection : public TActor<TPgYdbConnection> { using TBase = TActor<TPgYdbConnection>; @@ -33,12 +34,13 @@ class TPgYdbConnection : public TActor<TPgYdbConnection> { std::unordered_map<TString, TString> ConnectionParams; std::unordered_map<TString, TParsedStatement> ParsedStatements; TString CurrentStatement; - Ydb::StatusIds QueryStatus; - std::unordered_map<ui32, NYdb::TResultSet> ResultSets; + TConnectionState Connection; + std::deque<TAutoPtr<IEventHandle>> Events; + ui32 Inflight = 0; public: TPgYdbConnection(std::unordered_map<TString, TString> params) - : TActor<TPgYdbConnection>(&TPgYdbConnection::StateWork) + : TActor<TPgYdbConnection>(&TPgYdbConnection::StateSchedule) , ConnectionParams(std::move(params)) {} @@ -50,8 +52,8 @@ public: Send(ev->Sender, response.release(), 0, ev->Cookie); return; } - - TActorId actorId = Register(CreatePgwireKqpProxyQuery(ConnectionParams, std::move(ev))); + ++Inflight; + TActorId actorId = Register(CreatePgwireKqpProxyQuery(SelfId(), ConnectionParams, Connection, std::move(ev))); BLOG_D("Created pgwireKqpProxyQuery: " << actorId); } @@ -102,77 +104,10 @@ public: return; } - auto query(ConvertQuery(it->second)); - auto script(ToPgSyntax(query.Query, ConnectionParams)); - - TActorId actorId = Register(CreatePgwireKqpProxyDescribe(ConnectionParams, std::move(ev), script)); - BLOG_D("Created pgwireKqpProxyQuery: " << actorId); + ++Inflight; + TActorId actorId = Register(CreatePgwireKqpProxyDescribe(SelfId(), ConnectionParams, Connection, std::move(ev), it->second)); + BLOG_D("Created pgwireKqpProxyDescribe: " << actorId); return; - /* - TActorSystem* actorSystem = TActivationContext::ActorSystem(); - auto query = ConvertQuery(it->second); - Ydb::Scripting::ExecuteYqlRequest request; - request.set_script(ToPgSyntax(query.Query, ConnectionParams)); - // TODO: - //request.set_parameters(query.Params); - TString database; - if (ConnectionParams.count("database")) { - database = ConnectionParams["database"]; - } - TString token; - if (ConnectionParams.count("ydb-serialized-token")) { - token = ConnectionParams["ydb-serialized-token"]; - } - using TRpcEv = NGRpcService::TGrpcRequestOperationCall<Ydb::Scripting::ExecuteYqlRequest, Ydb::Scripting::ExecuteYqlResponse>; - auto rpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), database, token, actorSystem); - // TODO: it's wrong. it should be done using explain to get all result meta. but it's not ready yet. - rpcFuture.Subscribe([actorSystem, ev](NThreading::TFuture<Ydb::Scripting::ExecuteYqlResponse> future) { - auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>(); - try { - Ydb::Scripting::ExecuteYqlResponse yqlResponse(future.ExtractValueSync()); - if (yqlResponse.has_operation()) { - if (yqlResponse.operation().status() == Ydb::StatusIds::SUCCESS) { - if (yqlResponse.operation().has_result()) { - NYdb::NScripting::TExecuteYqlResult result(ConvertProtoResponseToSdkResult(std::move(yqlResponse))); - if (result.IsSuccess()) { - const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets(); - if (!resultSets.empty()) { - NYdb::TResultSet resultSet = resultSets[0]; - - { - for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { - // TODO: fill data types and sizes - response->DataFields.push_back({ - .Name = column.Name, - .DataType = GetPgOidFromYdbType(column.Type), - }); - } - } - } - } else { - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)result}); - } - } - } else { - NYql::TIssues issues; - NYql::IssuesFromMessage(yqlResponse.operation().issues(), issues); - NYdb::TStatus status(NYdb::EStatus(yqlResponse.operation().status()), std::move(issues)); - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', TStringBuilder() << status}); - } - } else { - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', "No result received"}); - } - } - catch (const std::exception& e) { - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', e.what()}); - } - - actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie); - });*/ } void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) { @@ -199,78 +134,63 @@ public: return; } - TActorSystem* actorSystem = TActivationContext::ActorSystem(); - auto query = ConvertQuery(it->second); - Ydb::Scripting::ExecuteYqlRequest request; - request.set_script(ToPgSyntax(query.Query, ConnectionParams)); - // TODO: - //request.set_parameters(query.Params); - TString database; - if (ConnectionParams.count("database")) { - database = ConnectionParams["database"]; + ++Inflight; + TActorId actorId = Register(CreatePgwireKqpProxyExecute(SelfId(), ConnectionParams, Connection, std::move(ev), it->second)); + BLOG_D("Created pgwireKqpProxyExecute: " << actorId); + } + + void Handle(TEvEvents::TEvProxyCompleted::TPtr& ev) { + --Inflight; + BLOG_D("Received TEvProxyCompleted"); + if (ev->Get()->Connection.Transaction.Status) { + BLOG_D("Updating transaction state to " << ev->Get()->Connection.Transaction.Status); + Connection.Transaction.Status = ev->Get()->Connection.Transaction.Status; + switch (ev->Get()->Connection.Transaction.Status) { + case 'I': + case 'E': + Connection.Transaction.Id.clear(); + BLOG_D("Transaction id cleared"); + break; + case 'T': + if (ev->Get()->Connection.Transaction.Id) { + Connection.Transaction.Id = ev->Get()->Connection.Transaction.Id; + BLOG_D("Transaction id is " << Connection.Transaction.Id); + } + break; + } } - TString token; - if (ConnectionParams.count("ydb-serialized-token")) { - token = ConnectionParams["ydb-serialized-token"]; + if (ev->Get()->Connection.SessionId) { + BLOG_D("Session id is " << ev->Get()->Connection.SessionId); + Connection.SessionId = ev->Get()->Connection.SessionId; } - using TRpcEv = NGRpcService::TGrpcRequestOperationCall<Ydb::Scripting::ExecuteYqlRequest, Ydb::Scripting::ExecuteYqlResponse>; - auto rpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), database, token, actorSystem); + while (!Events.empty() && Inflight == 0) { + StateWork(Events.front()); + Events.pop_front(); + } + } - rpcFuture.Subscribe([actorSystem, ev](NThreading::TFuture<Ydb::Scripting::ExecuteYqlResponse> future) { - auto response = std::make_unique<NPG::TEvPGEvents::TEvExecuteResponse>(); - try { - Ydb::Scripting::ExecuteYqlResponse yqlResponse(future.ExtractValueSync()); - if (yqlResponse.has_operation()) { - if (yqlResponse.operation().status() == Ydb::StatusIds::SUCCESS) { - if (yqlResponse.operation().has_result()) { - NYdb::NScripting::TExecuteYqlResult result(ConvertProtoResponseToSdkResult(std::move(yqlResponse))); - if (result.IsSuccess()) { - const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets(); - if (!resultSets.empty()) { - NYdb::TResultSet resultSet = resultSets[0]; + void PassAway() override { + if (Connection.SessionId) { + BLOG_D("Closing session " << Connection.SessionId); + auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); + ev->Record.MutableRequest()->SetSessionId(Connection.SessionId); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release()); + } + TBase::PassAway(); + } - { - auto maxRows = ev->Get()->Message->GetExecuteData().MaxRows; - NYdb::TResultSetParser parser(std::move(resultSet)); - while (parser.TryNextRow()) { - response->DataRows.emplace_back(); - auto& row = response->DataRows.back(); - row.resize(parser.ColumnsCount()); - for (size_t index = 0; index < parser.ColumnsCount(); ++index) { - row[index] = ColumnValueToString(parser.ColumnParser(index)); - } - if (maxRows != 0) { - if (--maxRows == 0) { - break; - } - } - } - } - } - } else { - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)result}); - } - } - } else { - NYql::TIssues issues; - NYql::IssuesFromMessage(yqlResponse.operation().issues(), issues); - NYdb::TStatus status(NYdb::EStatus(yqlResponse.operation().status()), std::move(issues)); - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', TStringBuilder() << status}); - } + STATEFN(StateSchedule) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvEvents::TEvProxyCompleted, Handle); + cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + default: { + if (Inflight == 0) { + return StateWork(ev); } else { - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', "No result received"}); + Events.push_back(ev); } } - catch (const std::exception& e) { - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', e.what()}); - } - - actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie); - }); + } } STATEFN(StateWork) { diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h index 110996c1810..c5a1120d7d1 100644 --- a/ydb/core/local_pgwire/local_pgwire_util.h +++ b/ydb/core/local_pgwire/local_pgwire_util.h @@ -33,6 +33,33 @@ namespace NLocalPgWire { +struct TTransactionState { + char Status = 0; + TString Id; +}; + +struct TConnectionState { + TString SessionId; + TTransactionState Transaction; +}; + +struct TEvEvents { + enum EEv { + EvProxyCompleted = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "ES_PRIVATE event space is too small."); + + struct TEvProxyCompleted : NActors::TEventLocal<TEvProxyCompleted, EvProxyCompleted> { + TConnectionState Connection; + + TEvProxyCompleted(const TConnectionState& connection = {}) + : Connection(connection) + {} + }; +}; + struct TParsedStatement { NPG::TPGParse::TQueryData QueryData; NPG::TPGBind::TBindData BindData; @@ -140,9 +167,9 @@ inline uint32_t GetPgOidFromYdbType(NYdb::TType type) { } } -inline TString ToPgSyntax(TStringBuf query, const std::unordered_map<TString, TString>& ConnectionParams) { - auto itOptions = ConnectionParams.find("options"); - if (itOptions == ConnectionParams.end()) { +inline TString ToPgSyntax(TStringBuf query, const std::unordered_map<TString, TString>& connectionParams) { + auto itOptions = connectionParams.find("options"); + if (itOptions == connectionParams.end()) { return TStringBuilder() << "--!syntax_pg\n" << query; // default } return TStringBuilder() << "--!" << itOptions->second << "\n" << query; @@ -170,6 +197,10 @@ struct TConvertedQuery { NYdb::TParams Params; }; +inline TString ToPgSyntax(TConvertedQuery query, const std::unordered_map<TString, TString>& connectionParams) { + return ToPgSyntax(query.Query, connectionParams); +} + inline TConvertedQuery ConvertQuery(const TParsedStatement& statement) { auto& bindData = statement.BindData; const auto& queryData = statement.QueryData; diff --git a/ydb/core/local_pgwire/log_impl.h b/ydb/core/local_pgwire/log_impl.h index 2017cd5752f..a754936b3f8 100644 --- a/ydb/core/local_pgwire/log_impl.h +++ b/ydb/core/local_pgwire/log_impl.h @@ -9,3 +9,5 @@ #define BLOG_W(stream) LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::LOCAL_PGWIRE, stream) #define BLOG_NOTICE(stream) LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::LOCAL_PGWIRE, stream) #define BLOG_ERROR(stream) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::LOCAL_PGWIRE, stream) + +#define BLOG_ENSURE(condition) do{if(!condition)BLOG_ERROR(#condition);}while(false) diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp index 4533954dfbe..a774e22f34c 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -17,103 +17,27 @@ namespace NLocalPgWire { using namespace NActors; using namespace NKikimr; -class TPgwireKqpProxy : public TActor<TPgwireKqpProxy> { - using TBase = TActor<TPgwireKqpProxy>; +template<typename Base> +class TPgwireKqpProxy : public TActorBootstrapped<Base> { +protected: + using TBase = TActorBootstrapped<Base>; - NActors::TActorId RequestActorId_; - ui64 RequestCookie_; - bool InTransaction_; + TActorId Owner_; std::unordered_map<TString, TString> ConnectionParams_; - TMap<ui32, NYdb::TResultSet> ResultSets_; -public: - TPgwireKqpProxy(std::unordered_map<TString, TString> params) - : TActor<TPgwireKqpProxy>(&TPgwireKqpProxy::StateWork) - , ConnectionParams_(std::move(params)) - {} - - void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { - NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet())); - ResultSets_.emplace(ev->Get()->Record.GetQueryResultIndex(), resultSet); - - BLOG_D(this->SelfId() << "Send stream data ack" - << ", to: " << ev->Sender); + TConnectionState Connection_; + TString Tag_; - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); - resp->Record.SetFreeSpace(std::numeric_limits<ui64>::max()); - Send(ev->Sender, resp.Release()); - } - - void FillResultSet(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvQueryResponse* response) { - { - for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) { - // TODO: fill data types and sizes - response->DataFields.push_back({ - .Name = column.Name, - .DataType = GetPgOidFromYdbType(column.Type), - // .DataTypeSize = column.Type.GetProto().Getpg_type().Gettyplen() - }); - } - } - { - NYdb::TResultSetParser parser(std::move(resultSet)); - while (parser.TryNextRow()) { - response->DataRows.emplace_back(); - auto& row = response->DataRows.back(); - row.resize(parser.ColumnsCount()); - for (size_t index = 0; index < parser.ColumnsCount(); ++index) { - row[index] = ColumnValueToString(parser.ColumnParser(index)); - } - } - } - } - - void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { - BLOG_D("Handling TEvKqp::TEvQueryResponse"); - NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); - - auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); - // HACK - if (InTransaction_) { - response->Tag = "BEGIN"; - response->TransactionStatus = 'T'; - } - try { - if (record.HasYdbStatus()) { - if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - Y_ENSURE(record.GetResponse().GetYdbResults().empty()); - if (!ResultSets_.empty()) { - FillResultSet(ResultSets_.begin()->second, response.get()); - } - - // HACK - response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size(); - // HACK - } else { - NYql::TIssues issues; - NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); - NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues)); - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', TStringBuilder() << status}); - } - } else { - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', "No result received"}); - } - } catch (const std::exception& e) { - response->ErrorFields.push_back({'E', "ERROR"}); - response->ErrorFields.push_back({'M', e.what()}); + TPgwireKqpProxy(const TActorId owner, std::unordered_map<TString, TString> params, const TConnectionState& connection) + : Owner_(owner) + , ConnectionParams_(std::move(params)) + , Connection_(connection) + { + if (!Connection_.Transaction.Status) { + Connection_.Transaction.Status = 'I'; } - BLOG_D("Finally replying to " << RequestActorId_); - Send(RequestActorId_, response.release(), 0, RequestCookie_); - PassAway(); } - void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) { - BLOG_D("TEvQuery, sender: " << ev->Sender << " , self: " << SelfId()); - RequestActorId_ = ev->Sender; - RequestCookie_ = ev->Cookie; - auto query(ev->Get()->Message->GetQuery()); + THolder<NKqp::TEvKqp::TEvQueryRequest> MakeKqpRequest() { TString database; if (ConnectionParams_.count("database")) { database = ConnectionParams_["database"]; @@ -122,88 +46,104 @@ public: if (ConnectionParams_.count("ydb-serialized-token")) { token = ConnectionParams_["ydb-serialized-token"]; } - auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); - request.SetQuery(ToPgSyntax(query, ConnectionParams_)); - request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY); - request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - request.MutableTxControl()->set_commit_tx(true); - request.SetKeepSession(false); request.SetDatabase(database); event->Record.SetUserToken(token); - InTransaction_ = query.starts_with("BEGIN"); - ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); - BLOG_D("Sent event to kqpProxy, RequestActorId = " << RequestActorId_ << ", self: " << SelfId()); - Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); + return event; } - STATEFN(StateWork) { - switch (ev->GetTypeRewrite()) { - hFunc(NPG::TEvPGEvents::TEvQuery, Handle); - hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); - hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); + void ConvertQueryToRequest(TStringBuf query, NKikimrKqp::TQueryRequest& request) { + if (Connection_.SessionId) { + request.SetSessionId(Connection_.SessionId); + } + request.SetKeepSession(true); + // HACK + if (query.starts_with("BEGIN")) { + Tag_ = "BEGIN"; + request.SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX); + request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + } else if (query.starts_with("COMMIT")) { + Tag_ = "COMMIT"; + request.SetAction(NKikimrKqp::QUERY_ACTION_COMMIT_TX); + request.MutableTxControl()->set_tx_id(Connection_.Transaction.Id); + } else if (query.starts_with("ROLLBACK")) { + Tag_ = "ROLLBACK"; + if (Connection_.Transaction.Status == 'T') { + request.SetAction(NKikimrKqp::QUERY_ACTION_ROLLBACK_TX); + request.MutableTxControl()->set_tx_id(Connection_.Transaction.Id); + } else if (Connection_.Transaction.Status == 'E') { + // ignore, reset to I + auto evQueryResponse = MakeHolder<NKqp::TEvKqp::TEvQueryResponse>(); + evQueryResponse->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS); + evQueryResponse->Record.GetRef().MutableResponse()->SetSessionId(request.GetSessionId()); + TBase::Send(TBase::SelfId(), evQueryResponse.Release()); + } + } else { + if (query.starts_with("SELECT")) { + Tag_ = "SELECT"; + } + request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY); + request.SetQuery(ToPgSyntax(query, ConnectionParams_)); + if (Connection_.Transaction.Status == 'I') { + request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + request.MutableTxControl()->set_commit_tx(true); + } else if (Connection_.Transaction.Status == 'T') { + request.MutableTxControl()->set_tx_id(Connection_.Transaction.Id); + } + } + } + + void ProcessKqpResponseReleaseProxy(const NKikimrKqp::TEvQueryResponse& record) { + Connection_.SessionId = record.GetResponse().GetSessionId(); + + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + Connection_.Transaction.Id = record.GetResponse().GetTxMeta().id(); + if (Connection_.Transaction.Id) { + Connection_.Transaction.Status = 'T'; + } else { + Connection_.Transaction.Status = 'I'; + } + } else { + if (Connection_.Transaction.Id) { + Connection_.Transaction.Id.clear(); + Connection_.Transaction.Status = 'E'; + } else { + Connection_.Transaction.Status = 'I'; + } } + + TBase::Send(Owner_, new TEvEvents::TEvProxyCompleted(Connection_)); } }; -class TPgwireKqpProxyQuery : public TActorBootstrapped<TPgwireKqpProxyQuery> { - using TBase = TActorBootstrapped<TPgwireKqpProxyQuery>; +class TPgwireKqpProxyQuery : public TPgwireKqpProxy<TPgwireKqpProxyQuery> { + using TBase = TPgwireKqpProxy<TPgwireKqpProxyQuery>; - std::unordered_map<TString, TString> ConnectionParams_; NPG::TEvPGEvents::TEvQuery::TPtr EventQuery_; bool WasMeta_ = false; - TString Tag; - char TransactionStatus = 0; + std::size_t RowsSelected_ = 0; public: - TPgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery) - : ConnectionParams_(std::move(params)) + TPgwireKqpProxyQuery(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery) + : TPgwireKqpProxy(owner, std::move(params), connection) , EventQuery_(std::move(evQuery)) - {} + { + } void Bootstrap() { auto query(EventQuery_->Get()->Message->GetQuery()); - TString database; - if (ConnectionParams_.count("database")) { - database = ConnectionParams_["database"]; - } - TString token; - if (ConnectionParams_.count("ydb-serialized-token")) { - token = ConnectionParams_["ydb-serialized-token"]; - } - auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + auto event = MakeKqpRequest(); NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); - request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - request.MutableTxControl()->set_commit_tx(true); - request.SetKeepSession(false); - request.SetDatabase(database); - event->Record.SetUserToken(token); // HACK - if (query.starts_with("BEGIN")) { - Tag = "BEGIN"; - TransactionStatus = 'T'; - request.SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX); - } else if (query.starts_with("COMMIT")) { - Tag = "COMMIT"; - TransactionStatus = 'I'; - request.SetAction(NKikimrKqp::QUERY_ACTION_COMMIT_TX); - } else if (query.starts_with("ROLLBACK")) { - Tag = "ROLLBACK"; - TransactionStatus = 'I'; - request.SetAction(NKikimrKqp::QUERY_ACTION_ROLLBACK_TX); - } else { - request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); - request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY); - request.SetQuery(ToPgSyntax(query, ConnectionParams_)); + ConvertQueryToRequest(query, request); + if (request.HasAction()) { + ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); + BLOG_D("Sent event to kqpProxy " << request.ShortDebugString()); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); } - - ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); - BLOG_D("Sent event to kqpProxy, RequestActorId = " << EventQuery_->Sender << ", self: " << SelfId()); - Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); - // TODO(xenoxeno): timeout Become(&TPgwireKqpProxyQuery::StateWork); } @@ -234,8 +174,8 @@ public: std::unique_ptr<NPG::TEvPGEvents::TEvQueryResponse> MakeResponse() { auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); - response->Tag = Tag; - response->TransactionStatus = TransactionStatus; + response->Tag = Tag_; + response->TransactionStatus = Connection_.Transaction.Status; return response; } @@ -250,10 +190,7 @@ public: FillResultSet(resultSet, response.get()); response->CommandCompleted = false; - // HACK - if (response->DataRows.size() > 0) { - response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size(); - } + RowsSelected_ += response->DataRows.size(); BLOG_D(this->SelfId() << "Send rowset data (" << ev->Get()->Record.GetSeqNo() << ") to: " << EventQuery_->Sender); Send(EventQuery_->Sender, response.release(), 0, EventQuery_->Cookie); @@ -266,18 +203,18 @@ public: } void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { - BLOG_D("Handling TEvKqp::TEvQueryResponse"); + BLOG_D("Handling TEvKqp::TEvQueryResponse " << ev->Get()->Record.ShortDebugString()); NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); - + ProcessKqpResponseReleaseProxy(record); auto response = MakeResponse(); try { if (record.HasYdbStatus()) { if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - Y_ENSURE(record.GetResponse().GetResults().empty()); + BLOG_ENSURE(record.GetResponse().GetResults().empty()); // HACK - if (response->DataRows.size() > 0) { - response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size(); + if (response->Tag == "SELECT") { + response->Tag = TStringBuilder() << response->Tag << " " << RowsSelected_; } } else { NYql::TIssues issues; @@ -308,58 +245,41 @@ public: } }; -class TPgwireKqpProxyDescribe : public TActorBootstrapped<TPgwireKqpProxyDescribe> { - using TBase = TActorBootstrapped<TPgwireKqpProxyDescribe>; +class TPgwireKqpProxyDescribe : public TPgwireKqpProxy<TPgwireKqpProxyDescribe> { + using TBase = TPgwireKqpProxy<TPgwireKqpProxyDescribe>; - std::unordered_map<TString, TString> ConnectionParams_; NPG::TEvPGEvents::TEvDescribe::TPtr EventDescribe_; - TString Script_; + TParsedStatement Statement_; public: - TPgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script) - : ConnectionParams_(std::move(params)) + TPgwireKqpProxyDescribe(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TParsedStatement& statement) + : TPgwireKqpProxy(owner, std::move(params), connection) , EventDescribe_(std::move(evDescribe)) - , Script_(script) + , Statement_(statement) {} void Bootstrap() { - auto query(Script_); - TString database; - if (ConnectionParams_.count("database")) { - database = ConnectionParams_["database"]; - } - TString token; - if (ConnectionParams_.count("ydb-serialized-token")) { - token = ConnectionParams_["ydb-serialized-token"]; - } - auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + auto query(ConvertQuery(Statement_)); + auto event = MakeKqpRequest(); NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); - request.SetQuery(ToPgSyntax(query, ConnectionParams_)); - request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN); - auto noScript = ConnectionParams_.find("no-script"); - if (noScript == ConnectionParams_.end()) { - request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT); - } else { - request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY); - request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); - request.MutableTxControl()->set_commit_tx(true); - } - request.SetKeepSession(false); - request.SetDatabase(database); - event->Record.SetUserToken(token); - ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); - BLOG_D("Sent event to kqpProxy, RequestActorId = " << EventDescribe_->Sender << ", self: " << SelfId()); - Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); + // HACK + ConvertQueryToRequest(query.Query, request); + if (request.HasAction()) { + request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN); + ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); + BLOG_D("Sent event to kqpProxy " << request.ShortDebugString()); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); + } // TODO(xenoxeno): timeout Become(&TPgwireKqpProxyDescribe::StateWork); } void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { BLOG_D("Handling TEvKqp::TEvQueryResponse"); + Send(Owner_, new TEvEvents::TEvProxyCompleted()); NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); - auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>(); try { if (record.HasYdbStatus()) { @@ -396,17 +316,140 @@ public: } }; +class TPgwireKqpProxyExecute : public TPgwireKqpProxy<TPgwireKqpProxyExecute> { + using TBase = TPgwireKqpProxy<TPgwireKqpProxyExecute>; + + NPG::TEvPGEvents::TEvExecute::TPtr EventExecute_; + TParsedStatement Statement_; + std::size_t RowsSelected_ = 0; + +public: + TPgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, const TParsedStatement& statement) + : TPgwireKqpProxy(owner, std::move(params), connection) + , EventExecute_(std::move(evExecute)) + , Statement_(statement) + { + } + + void Bootstrap() { + auto query(ConvertQuery(Statement_)); + auto event = MakeKqpRequest(); + NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); + + // HACK + ConvertQueryToRequest(query.Query, request); + if (request.HasAction()) { + ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); + BLOG_D("Sent event to kqpProxy " << request.ShortDebugString()); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); + } + // TODO(xenoxeno): timeout + Become(&TPgwireKqpProxyQuery::StateWork); + } + + void FillResultSet(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvExecuteResponse* response) { + NYdb::TResultSetParser parser(std::move(resultSet)); + while (parser.TryNextRow()) { + response->DataRows.emplace_back(); + auto& row = response->DataRows.back(); + row.resize(parser.ColumnsCount()); + for (size_t index = 0; index < parser.ColumnsCount(); ++index) { + row[index] = ColumnValueToString(parser.ColumnParser(index)); + } + } + } + + std::unique_ptr<NPG::TEvPGEvents::TEvExecuteResponse> MakeResponse() { + auto response = std::make_unique<NPG::TEvPGEvents::TEvExecuteResponse>(); + + response->Tag = Tag_; + response->TransactionStatus = Connection_.Transaction.Status; + + return response; + } + + void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { + NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet())); + auto response = MakeResponse(); + FillResultSet(resultSet, response.get()); + response->CommandCompleted = false; + + RowsSelected_ += response->DataRows.size(); + + BLOG_D(this->SelfId() << "Send rowset data (" << ev->Get()->Record.GetSeqNo() << ") to: " << EventExecute_->Sender); + Send(EventExecute_->Sender, response.release(), 0, EventExecute_->Cookie); + + BLOG_D(this->SelfId() << "Send stream data ack to: " << ev->Sender); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + resp->Record.SetFreeSpace(std::numeric_limits<ui64>::max()); + Send(ev->Sender, resp.Release()); + } + + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { + BLOG_D("Handling TEvKqp::TEvQueryResponse"); + NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); + ProcessKqpResponseReleaseProxy(record); + auto response = MakeResponse(); + try { + if (record.HasYdbStatus()) { + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + BLOG_ENSURE(record.GetResponse().GetResults().empty()); + + // HACK + if (response->Tag == "SELECT") { + response->Tag = TStringBuilder() << response->Tag << " " << RowsSelected_; + } + } else { + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); + NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues)); + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << status}); + } + } else { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', "No result received"}); + } + } catch (const std::exception& e) { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', e.what()}); + } + response->CommandCompleted = true; + BLOG_D("Finally replying to " << EventExecute_->Sender); + Send(EventExecute_->Sender, response.release(), 0, EventExecute_->Cookie); + PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); + hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); + } + } +}; -NActors::IActor* CreatePgwireKqpProxy(std::unordered_map<TString, TString> params) { - return new TPgwireKqpProxy(std::move(params)); +NActors::IActor* CreatePgwireKqpProxyQuery(const TActorId& owner, + std::unordered_map<TString, TString> params, + const TConnectionState& connection, + NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery) { + return new TPgwireKqpProxyQuery(owner, std::move(params), connection, std::move(evQuery)); } -NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery) { - return new TPgwireKqpProxyQuery(std::move(params), std::move(evQuery)); +NActors::IActor* CreatePgwireKqpProxyDescribe(const TActorId& owner, + std::unordered_map<TString, TString> params, + const TConnectionState& connection, + NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, + const TParsedStatement& statement) { + return new TPgwireKqpProxyDescribe(owner, std::move(params), connection, std::move(evDescribe), statement); } -NActors::IActor* CreatePgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script) { - return new TPgwireKqpProxyDescribe(std::move(params), std::move(evDescribe), script); +NActors::IActor* CreatePgwireKqpProxyExecute(const TActorId& owner, + std::unordered_map<TString, TString> params, + const TConnectionState& connection, + NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, + const TParsedStatement& statement) { + return new TPgwireKqpProxyExecute(owner, std::move(params), connection, std::move(evExecute), statement); } } //namespace NLocalPgwire diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index d7faa6234d9..fe31a8e6501 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -375,7 +375,7 @@ protected: void HandleMessage(const TPGQuery* message) { SyncSequenceNumber = IncomingSequenceNumber; - Send(DatabaseProxy, new TEvPGEvents::TEvQuery(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++); + Send(DatabaseProxy, new TEvPGEvents::TEvQuery(MakePGMessageCopy(message), TransactionStatus), 0, IncomingSequenceNumber++); } void HandleMessage(const TPGParse* message) { @@ -399,7 +399,7 @@ protected: } void HandleMessage(const TPGExecute* message) { - Send(DatabaseProxy, new TEvPGEvents::TEvExecute(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++); + Send(DatabaseProxy, new TEvPGEvents::TEvExecute(MakePGMessageCopy(message), TransactionStatus), 0, IncomingSequenceNumber++); } void HandleMessage(const TPGClose* message) { @@ -577,6 +577,9 @@ protected: void HandleConnected(TEvPGEvents::TEvExecuteResponse::TPtr& ev) { if (IsEventExpected(ev)) { + if (ev->Get()->TransactionStatus) { + TransactionStatus = ev->Get()->TransactionStatus; + } if (ev->Get()->ErrorFields.empty()) { if (ev->Get()->EmptyQuery) { SendMessage(TPGEmptyQueryResponse()); @@ -592,7 +595,9 @@ protected: SendStream(dataRow); } } - { // commandComplete + if (ev->Get()->CommandCompleted) { + // commandComplete + TString tag = ev->Get()->Tag ? ev->Get()->Tag : "OK"; TPGStreamOutput<TPGCommandComplete> commandComplete; commandComplete << tag << '\0'; SendStream(commandComplete); @@ -607,8 +612,10 @@ protected: errorResponse << '\0'; SendStream(errorResponse); } - ++OutgoingSequenceNumber; - BecomeReadyForQuery(); + if (ev->Get()->CommandCompleted) { + ++OutgoingSequenceNumber; + BecomeReadyForQuery(); + } } else { PostponeEvent(ev); } diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h index 0a777849d06..32ad9662dbb 100644 --- a/ydb/core/pgproxy/pg_proxy_events.h +++ b/ydb/core/pgproxy/pg_proxy_events.h @@ -91,9 +91,11 @@ struct TEvPGEvents { struct TEvQuery : NActors::TEventLocal<TEvQuery, EvQuery> { std::unique_ptr<TPGQuery> Message; + char TransactionStatus; - TEvQuery(std::unique_ptr<TPGQuery> message) + TEvQuery(std::unique_ptr<TPGQuery> message, char transactionStatus) : Message(std::move(message)) + , TransactionStatus(transactionStatus) {} }; @@ -192,9 +194,11 @@ struct TEvPGEvents { struct TEvExecute : NActors::TEventLocal<TEvExecute, EvExecute> { std::unique_ptr<TPGExecute> Message; + char TransactionStatus; - TEvExecute(std::unique_ptr<TPGExecute> message) + TEvExecute(std::unique_ptr<TPGExecute> message, char transactionStatus) : Message(std::move(message)) + , TransactionStatus(transactionStatus) {} }; @@ -203,6 +207,8 @@ struct TEvPGEvents { std::vector<std::pair<char, TString>> ErrorFields; TString Tag; bool EmptyQuery = false; + bool CommandCompleted = true; + char TransactionStatus = 0; }; struct TEvCloseResponse : NActors::TEventLocal<TEvCloseResponse, EvCloseResponse> { |