diff options
author | xenoxeno <xeno@ydb.tech> | 2023-09-26 13:05:23 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-09-26 13:48:11 +0300 |
commit | 0ea90c05c3f9d7b47f368fdc3af65acb8fec43ef (patch) | |
tree | a844c026ad04fbb748a05cc8bef824188f1181b2 | |
parent | df1df29214d1b77e33a217256d8e11ae779ded4f (diff) | |
download | ydb-0ea90c05c3f9d7b47f368fdc3af65acb8fec43ef.tar.gz |
rework pgwire to better distinguish between statements and portals KIKIMR-19435
-rw-r--r-- | ydb/apps/pgwire/pg_ydb_connection.cpp | 2 | ||||
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_connection.cpp | 104 | ||||
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_util.h | 10 | ||||
-rw-r--r-- | ydb/core/local_pgwire/pgwire_kqp_proxy.cpp | 26 | ||||
-rw-r--r-- | ydb/core/local_pgwire/pgwire_kqp_proxy.h | 2 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_connection.cpp | 11 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_proxy_events.h | 1 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_proxy_types.cpp | 30 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_proxy_types.h | 8 |
9 files changed, 110 insertions, 84 deletions
diff --git a/ydb/apps/pgwire/pg_ydb_connection.cpp b/ydb/apps/pgwire/pg_ydb_connection.cpp index ce8ccf53108..35810a1d299 100644 --- a/ydb/apps/pgwire/pg_ydb_connection.cpp +++ b/ydb/apps/pgwire/pg_ydb_connection.cpp @@ -211,7 +211,7 @@ public: void Handle(NPG::TEvPGEvents::TEvClose::TPtr& ev) { auto closeData = ev->Get()->Message->GetCloseData(); - ParsedStatements.erase(closeData.StatementName); + ParsedStatements.erase(closeData.Name); CurrentStatement.clear(); BLOG_D("TEvClose CurrentStatement changed to <empty>"); diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp index 6a8c98f764c..83d735dd3e7 100644 --- a/ydb/core/local_pgwire/local_pgwire_connection.cpp +++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp @@ -27,7 +27,7 @@ class TPgYdbConnection : public TActor<TPgYdbConnection> { std::unordered_map<TString, TString> ConnectionParams; std::unordered_map<TString, TParsedStatement> ParsedStatements; - TString CurrentStatement; + std::unordered_map<TString, TPortal> Portals; TConnectionState Connection; std::deque<TAutoPtr<IEventHandle>> Events; ui32 Inflight = 0; @@ -89,64 +89,86 @@ public: } void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) { + BLOG_D("TEvBind " << ev->Sender); auto bindData = ev->Get()->Message->GetBindData(); - ParsedStatements[bindData.StatementName].BindData = bindData; - CurrentStatement = bindData.StatementName; - BLOG_D("TEvBind CurrentStatement changed to " << CurrentStatement); - - auto bindComplete = ev->Get()->Reply(); - Send(ev->Sender, bindComplete.release()); + auto statementName(bindData.StatementName); + auto itParsedStatement = ParsedStatements.find(statementName); + auto bindResponse = ev->Get()->Reply(); + if (itParsedStatement == ParsedStatements.end()) { + bindResponse->ErrorFields.push_back({'E', "ERROR"}); + bindResponse->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << statementName << "\" not found"}); + } else { + auto portalName(bindData.PortalName); + // TODO(xenoxeno): performance hit + Portals[portalName].Construct(itParsedStatement->second, std::move(bindData)); + BLOG_D("Created portal \"" << portalName << "\" from statement \"" << statementName <<"\""); + } + Send(ev->Sender, bindResponse.release(), 0, ev->Cookie); } void Handle(NPG::TEvPGEvents::TEvClose::TPtr& ev) { auto closeData = ev->Get()->Message->GetCloseData(); - ParsedStatements.erase(closeData.StatementName); - CurrentStatement.clear(); - BLOG_D("TEvClose CurrentStatement changed to <empty>"); - + switch (closeData.Type) { + case NPG::TPGClose::TCloseData::ECloseType::Statement: + ParsedStatements.erase(closeData.Name); + break; + case NPG::TPGClose::TCloseData::ECloseType::Portal: + Portals.erase(closeData.Name); + break; + default: + BLOG_ERROR("Unknown close type \"" << static_cast<char>(closeData.Type) << "\""); + break; + } auto closeComplete = ev->Get()->Reply(); Send(ev->Sender, closeComplete.release()); } void Handle(NPG::TEvPGEvents::TEvDescribe::TPtr& ev) { BLOG_D("TEvDescribe " << ev->Sender); - - TString statementName = ev->Get()->Message->GetDescribeData().Name; - if (statementName.empty()) { - statementName = CurrentStatement; - BLOG_W("TEvDescribe changed empty statement to " << CurrentStatement); - } - auto it = ParsedStatements.find(statementName); - if (it == ParsedStatements.end()) { - auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>(); - errorResponse->ErrorFields.push_back({'E', "ERROR"}); - errorResponse->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << statementName << "\" not found"}); - Send(ev->Sender, errorResponse.release(), 0, ev->Cookie); - return; - } - auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>(); - for (const auto& ydbType : it->second.ParameterTypes) { - response->ParameterTypes.push_back(GetPgOidFromYdbType(ydbType)); + auto describeData = ev->Get()->Message->GetDescribeData(); + switch (describeData.Type) { + case NPG::TPGDescribe::TDescribeData::EDescribeType::Statement: { + auto it = ParsedStatements.find(describeData.Name); + if (it == ParsedStatements.end()) { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << describeData.Name << "\" not found"}); + } else { + for (const auto& ydbType : it->second.ParameterTypes) { + response->ParameterTypes.push_back(GetPgOidFromYdbType(ydbType)); + } + response->DataFields = it->second.DataFields; + } + } + break; + case NPG::TPGDescribe::TDescribeData::EDescribeType::Portal: { + auto it = Portals.find(describeData.Name); + if (it == Portals.end()) { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << "Portal \"" << describeData.Name << "\" not found"}); + } else { + response->DataFields = it->second.DataFields; + } + } + break; + default: { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << "Unknown describe type \"" << static_cast<char>(describeData.Type) << "\""}); + } + break; } - response->DataFields = it->second.DataFields; - - Send(ev->Sender, response.release()); + Send(ev->Sender, response.release(), 0, ev->Cookie); } void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) { BLOG_D("TEvExecute " << ev->Sender); - TString statementName = ev->Get()->Message->GetExecuteData().PortalName; - if (statementName.empty()) { - statementName = CurrentStatement; - BLOG_W("TEvExecute changed empty statement to " << CurrentStatement); - } - auto it = ParsedStatements.find(statementName); - if (it == ParsedStatements.end()) { + TString portalName = ev->Get()->Message->GetExecuteData().PortalName; + auto it = Portals.find(portalName); + if (it == Portals.end()) { auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvExecuteResponse>(); errorResponse->ErrorFields.push_back({'E', "ERROR"}); - errorResponse->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << statementName << "\" not found"}); + errorResponse->ErrorFields.push_back({'M', TStringBuilder() << "Portal \"" << portalName << "\" not found"}); Send(ev->Sender, errorResponse.release(), 0, ev->Cookie); return; } @@ -167,7 +189,9 @@ public: --Inflight; BLOG_D("Received TEvProxyCompleted"); if (ev->Get()->ParsedStatement) { - ParsedStatements[ev->Get()->ParsedStatement.value().QueryData.Name] = ev->Get()->ParsedStatement.value(); + auto name(ev->Get()->ParsedStatement.value().QueryData.Name); + BLOG_D("Updating ParsedStatement \"" << name << "\""); + ParsedStatements[name] = ev->Get()->ParsedStatement.value(); } if (ev->Get()->Connection) { auto& connection(ev->Get()->Connection.value()); diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h index 22ba3d3fd82..a4e4daba4f0 100644 --- a/ydb/core/local_pgwire/local_pgwire_util.h +++ b/ydb/core/local_pgwire/local_pgwire_util.h @@ -31,11 +31,19 @@ struct TConnectionState { struct TParsedStatement { NPG::TPGParse::TQueryData QueryData; - NPG::TPGBind::TBindData BindData; std::vector<Ydb::Type> ParameterTypes; std::vector<NPG::TEvPGEvents::TRowDescriptionField> DataFields; }; +struct TPortal : TParsedStatement { + NPG::TPGBind::TBindData BindData; + + void Construct(const TParsedStatement& parsedStatement, NPG::TPGBind::TBindData&& bindData) { + (TParsedStatement&)(*this) = parsedStatement; + BindData = std::move(bindData); + } +}; + enum EFormatType : int16_t { EFormatText = 0, EFormatBinary = 1, diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp index e3b6cc6435d..940fe5449e6 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -388,14 +388,14 @@ class TPgwireKqpProxyExecute : public TPgwireKqpProxy<TPgwireKqpProxyExecute> { using TBase = TPgwireKqpProxy<TPgwireKqpProxyExecute>; NPG::TEvPGEvents::TEvExecute::TPtr EventExecute_; - TParsedStatement Statement_; + TPortal Portal_; std::size_t RowsSelected_ = 0; public: - TPgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, const TParsedStatement& statement) + TPgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, const TPortal& portal) : TPgwireKqpProxy(owner, std::move(params), connection) , EventExecute_(std::move(evExecute)) - , Statement_(statement) + , Portal_(portal) { } @@ -404,20 +404,20 @@ public: NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); // HACK - ConvertQueryToRequest(Statement_.QueryData.Query, request); + ConvertQueryToRequest(Portal_.QueryData.Query, request); request.SetUsePublicResponseDataFormat(true); if (request.HasAction()) { - for (unsigned int paramNum = 0; paramNum < Statement_.BindData.ParametersValue.size(); ++paramNum) { - if (paramNum >= Statement_.ParameterTypes.size()) { + for (unsigned int paramNum = 0; paramNum < Portal_.BindData.ParametersValue.size(); ++paramNum) { + if (paramNum >= Portal_.ParameterTypes.size()) { // TODO(xenoxeno): report error break; } - Ydb::Type type = Statement_.ParameterTypes[paramNum]; + Ydb::Type type = Portal_.ParameterTypes[paramNum]; int16_t format = 0; // text - if (paramNum < Statement_.BindData.ParametersFormat.size()) { - format = Statement_.BindData.ParametersFormat[paramNum]; + if (paramNum < Portal_.BindData.ParametersFormat.size()) { + format = Portal_.BindData.ParametersFormat[paramNum]; } - Ydb::TypedValue value = GetTypedValueFromParam(format, Statement_.BindData.ParametersValue[paramNum], type); + Ydb::TypedValue value = GetTypedValueFromParam(format, Portal_.BindData.ParametersValue[paramNum], type); request.MutableYdbParameters()->insert({TStringBuilder() << "$p" << paramNum + 1, value}); } ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); @@ -440,7 +440,7 @@ public: void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet())); auto response = MakeResponse(); - FillResultSet(resultSet, response.get()->DataRows, Statement_.BindData.ResultsFormat); + FillResultSet(resultSet, response.get()->DataRows, Portal_.BindData.ResultsFormat); response->CommandCompleted = false; response->ReadyForQuery = false; @@ -513,8 +513,8 @@ NActors::IActor* CreatePgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, - const TParsedStatement& statement) { - return new TPgwireKqpProxyExecute(owner, std::move(params), connection, std::move(evExecute), statement); + const TPortal& portal) { + return new TPgwireKqpProxyExecute(owner, std::move(params), connection, std::move(evExecute), portal); } } //namespace NLocalPgwire diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.h b/ydb/core/local_pgwire/pgwire_kqp_proxy.h index 3dab049c177..074b6fcfb06 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.h +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.h @@ -19,6 +19,6 @@ NActors::IActor* CreatePgwireKqpProxyExecute(const NActors::TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, - const TParsedStatement& statement); + const TPortal& portal); } diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index 163c25bd548..d4326212742 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -579,7 +579,8 @@ protected: void HandleConnected(TEvPGEvents::TEvDescribeResponse::TPtr& ev) { if (IsEventExpected(ev)) { if (ev->Get()->ErrorFields.empty()) { - if (ev->Get()->ParameterTypes.size() > 0) { // parameterDescription + if (ev->Get()->ParameterTypes.size() > 0) { + // parameterDescription (statement only) TPGStreamOutput<TPGParameterDescription> parameterDescription; parameterDescription << uint16_t(ev->Get()->ParameterTypes.size()); // number of fields for (auto type : ev->Get()->ParameterTypes) { @@ -659,8 +660,12 @@ protected: void HandleConnected(TEvPGEvents::TEvBindResponse::TPtr& ev) { if (IsEventExpected(ev)) { - TPGStreamOutput<TPGBindComplete> bindComplete; - SendStream(bindComplete); + if (ev->Get()->ErrorFields.empty()) { + TPGStreamOutput<TPGBindComplete> bindComplete; + SendStream(bindComplete); + } else { + SendErrorResponse(ev->Get()->ErrorFields); + } ++OutgoingSequenceNumber; BecomeReadyForQuery(); } else { diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h index e01795f716c..56c88b75ff2 100644 --- a/ydb/core/pgproxy/pg_proxy_events.h +++ b/ydb/core/pgproxy/pg_proxy_events.h @@ -168,6 +168,7 @@ struct TEvPGEvents { struct TEvBindResponse : NActors::TEventLocal<TEvBindResponse, EvBindResponse> { std::unique_ptr<TPGBind> OriginalMessage; + std::vector<std::pair<char, TString>> ErrorFields; TEvBindResponse(std::unique_ptr<TPGBind> originalMessage) : OriginalMessage(std::move(originalMessage)) diff --git a/ydb/core/pgproxy/pg_proxy_types.cpp b/ydb/core/pgproxy/pg_proxy_types.cpp index 334add17349..98398fe6ee3 100644 --- a/ydb/core/pgproxy/pg_proxy_types.cpp +++ b/ydb/core/pgproxy/pg_proxy_types.cpp @@ -86,7 +86,7 @@ TString TPGParse::Dump() const { TPGStreamInput stream(*this); TStringBuf name; stream >> name; - return TStringBuilder() << "Name: " << name; + return TStringBuilder() << "Statement: \"" << name << "\""; } TPGParse::TQueryData TPGParse::GetQueryData() const { @@ -146,11 +146,7 @@ TString TPGBind::Dump() const { TStringBuf statementName; stream >> portalName; stream >> statementName; - if (portalName) { - text << "Portal: '" << portalName << "'"; - } else if (statementName) { - text << "Statement: '" << statementName << "'"; - } + text << "Statement: \"" << statementName << "\"" << " Portal: \"" << portalName << "\""; uint16_t numberOfParameterFormats = 0; stream >> numberOfParameterFormats; std::vector<uint16_t> parameterFormats; @@ -230,7 +226,7 @@ TString TPGDescribe::Dump() const { text << "Type: " << describeType; TStringBuf name; stream >> name; - text << " Name: " << name; + text << " Name: \"" << name << "\""; return text; } @@ -247,14 +243,9 @@ TString TPGExecute::Dump() const { TStringBuf name; uint32_t maxRows = 0; stream >> name >> maxRows; - if (name) { - text << "Name: " << name; - } + text << "Portal: \"" << name << "\""; if (maxRows) { - if (!text.empty()) { - text << ' '; - } - text << "MaxRows: " << maxRows; + text << " MaxRows: " << maxRows; } return text; } @@ -265,15 +256,8 @@ TPGClose::TCloseData TPGClose::GetCloseData() const { char type; TString name; stream >> type; - stream >> name; - switch (type) { - case 'S': - closeData.StatementName = name; - break; - case 'P': - closeData.PortalName = name; - break; - } + closeData.Type = static_cast<TCloseData::ECloseType>(type); + stream >> closeData.Name; return closeData; } diff --git a/ydb/core/pgproxy/pg_proxy_types.h b/ydb/core/pgproxy/pg_proxy_types.h index 6b1482810bb..0825200ca0b 100644 --- a/ydb/core/pgproxy/pg_proxy_types.h +++ b/ydb/core/pgproxy/pg_proxy_types.h @@ -235,8 +235,12 @@ struct TPGExecute : TPGMessageType<'E'> { struct TPGClose : TPGMessageType<'C'> { struct TCloseData { - TString PortalName; - TString StatementName; + enum class ECloseType : char { + Portal = 'P', + Statement = 'S', + }; + ECloseType Type; + TString Name; }; TCloseData GetCloseData() const; |