aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-09-26 13:05:23 +0300
committerxenoxeno <xeno@ydb.tech>2023-09-26 13:48:11 +0300
commit0ea90c05c3f9d7b47f368fdc3af65acb8fec43ef (patch)
treea844c026ad04fbb748a05cc8bef824188f1181b2
parentdf1df29214d1b77e33a217256d8e11ae779ded4f (diff)
downloadydb-0ea90c05c3f9d7b47f368fdc3af65acb8fec43ef.tar.gz
rework pgwire to better distinguish between statements and portals KIKIMR-19435
-rw-r--r--ydb/apps/pgwire/pg_ydb_connection.cpp2
-rw-r--r--ydb/core/local_pgwire/local_pgwire_connection.cpp104
-rw-r--r--ydb/core/local_pgwire/local_pgwire_util.h10
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.cpp26
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.h2
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp11
-rw-r--r--ydb/core/pgproxy/pg_proxy_events.h1
-rw-r--r--ydb/core/pgproxy/pg_proxy_types.cpp30
-rw-r--r--ydb/core/pgproxy/pg_proxy_types.h8
9 files changed, 110 insertions, 84 deletions
diff --git a/ydb/apps/pgwire/pg_ydb_connection.cpp b/ydb/apps/pgwire/pg_ydb_connection.cpp
index ce8ccf53108..35810a1d299 100644
--- a/ydb/apps/pgwire/pg_ydb_connection.cpp
+++ b/ydb/apps/pgwire/pg_ydb_connection.cpp
@@ -211,7 +211,7 @@ public:
void Handle(NPG::TEvPGEvents::TEvClose::TPtr& ev) {
auto closeData = ev->Get()->Message->GetCloseData();
- ParsedStatements.erase(closeData.StatementName);
+ ParsedStatements.erase(closeData.Name);
CurrentStatement.clear();
BLOG_D("TEvClose CurrentStatement changed to <empty>");
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp
index 6a8c98f764c..83d735dd3e7 100644
--- a/ydb/core/local_pgwire/local_pgwire_connection.cpp
+++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp
@@ -27,7 +27,7 @@ class TPgYdbConnection : public TActor<TPgYdbConnection> {
std::unordered_map<TString, TString> ConnectionParams;
std::unordered_map<TString, TParsedStatement> ParsedStatements;
- TString CurrentStatement;
+ std::unordered_map<TString, TPortal> Portals;
TConnectionState Connection;
std::deque<TAutoPtr<IEventHandle>> Events;
ui32 Inflight = 0;
@@ -89,64 +89,86 @@ public:
}
void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) {
+ BLOG_D("TEvBind " << ev->Sender);
auto bindData = ev->Get()->Message->GetBindData();
- ParsedStatements[bindData.StatementName].BindData = bindData;
- CurrentStatement = bindData.StatementName;
- BLOG_D("TEvBind CurrentStatement changed to " << CurrentStatement);
-
- auto bindComplete = ev->Get()->Reply();
- Send(ev->Sender, bindComplete.release());
+ auto statementName(bindData.StatementName);
+ auto itParsedStatement = ParsedStatements.find(statementName);
+ auto bindResponse = ev->Get()->Reply();
+ if (itParsedStatement == ParsedStatements.end()) {
+ bindResponse->ErrorFields.push_back({'E', "ERROR"});
+ bindResponse->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << statementName << "\" not found"});
+ } else {
+ auto portalName(bindData.PortalName);
+ // TODO(xenoxeno): performance hit
+ Portals[portalName].Construct(itParsedStatement->second, std::move(bindData));
+ BLOG_D("Created portal \"" << portalName << "\" from statement \"" << statementName <<"\"");
+ }
+ Send(ev->Sender, bindResponse.release(), 0, ev->Cookie);
}
void Handle(NPG::TEvPGEvents::TEvClose::TPtr& ev) {
auto closeData = ev->Get()->Message->GetCloseData();
- ParsedStatements.erase(closeData.StatementName);
- CurrentStatement.clear();
- BLOG_D("TEvClose CurrentStatement changed to <empty>");
-
+ switch (closeData.Type) {
+ case NPG::TPGClose::TCloseData::ECloseType::Statement:
+ ParsedStatements.erase(closeData.Name);
+ break;
+ case NPG::TPGClose::TCloseData::ECloseType::Portal:
+ Portals.erase(closeData.Name);
+ break;
+ default:
+ BLOG_ERROR("Unknown close type \"" << static_cast<char>(closeData.Type) << "\"");
+ break;
+ }
auto closeComplete = ev->Get()->Reply();
Send(ev->Sender, closeComplete.release());
}
void Handle(NPG::TEvPGEvents::TEvDescribe::TPtr& ev) {
BLOG_D("TEvDescribe " << ev->Sender);
-
- TString statementName = ev->Get()->Message->GetDescribeData().Name;
- if (statementName.empty()) {
- statementName = CurrentStatement;
- BLOG_W("TEvDescribe changed empty statement to " << CurrentStatement);
- }
- auto it = ParsedStatements.find(statementName);
- if (it == ParsedStatements.end()) {
- auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>();
- errorResponse->ErrorFields.push_back({'E', "ERROR"});
- errorResponse->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << statementName << "\" not found"});
- Send(ev->Sender, errorResponse.release(), 0, ev->Cookie);
- return;
- }
-
auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>();
- for (const auto& ydbType : it->second.ParameterTypes) {
- response->ParameterTypes.push_back(GetPgOidFromYdbType(ydbType));
+ auto describeData = ev->Get()->Message->GetDescribeData();
+ switch (describeData.Type) {
+ case NPG::TPGDescribe::TDescribeData::EDescribeType::Statement: {
+ auto it = ParsedStatements.find(describeData.Name);
+ if (it == ParsedStatements.end()) {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << describeData.Name << "\" not found"});
+ } else {
+ for (const auto& ydbType : it->second.ParameterTypes) {
+ response->ParameterTypes.push_back(GetPgOidFromYdbType(ydbType));
+ }
+ response->DataFields = it->second.DataFields;
+ }
+ }
+ break;
+ case NPG::TPGDescribe::TDescribeData::EDescribeType::Portal: {
+ auto it = Portals.find(describeData.Name);
+ if (it == Portals.end()) {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << "Portal \"" << describeData.Name << "\" not found"});
+ } else {
+ response->DataFields = it->second.DataFields;
+ }
+ }
+ break;
+ default: {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << "Unknown describe type \"" << static_cast<char>(describeData.Type) << "\""});
+ }
+ break;
}
- response->DataFields = it->second.DataFields;
-
- Send(ev->Sender, response.release());
+ Send(ev->Sender, response.release(), 0, ev->Cookie);
}
void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) {
BLOG_D("TEvExecute " << ev->Sender);
- TString statementName = ev->Get()->Message->GetExecuteData().PortalName;
- if (statementName.empty()) {
- statementName = CurrentStatement;
- BLOG_W("TEvExecute changed empty statement to " << CurrentStatement);
- }
- auto it = ParsedStatements.find(statementName);
- if (it == ParsedStatements.end()) {
+ TString portalName = ev->Get()->Message->GetExecuteData().PortalName;
+ auto it = Portals.find(portalName);
+ if (it == Portals.end()) {
auto errorResponse = std::make_unique<NPG::TEvPGEvents::TEvExecuteResponse>();
errorResponse->ErrorFields.push_back({'E', "ERROR"});
- errorResponse->ErrorFields.push_back({'M', TStringBuilder() << "Parsed statement \"" << statementName << "\" not found"});
+ errorResponse->ErrorFields.push_back({'M', TStringBuilder() << "Portal \"" << portalName << "\" not found"});
Send(ev->Sender, errorResponse.release(), 0, ev->Cookie);
return;
}
@@ -167,7 +189,9 @@ public:
--Inflight;
BLOG_D("Received TEvProxyCompleted");
if (ev->Get()->ParsedStatement) {
- ParsedStatements[ev->Get()->ParsedStatement.value().QueryData.Name] = ev->Get()->ParsedStatement.value();
+ auto name(ev->Get()->ParsedStatement.value().QueryData.Name);
+ BLOG_D("Updating ParsedStatement \"" << name << "\"");
+ ParsedStatements[name] = ev->Get()->ParsedStatement.value();
}
if (ev->Get()->Connection) {
auto& connection(ev->Get()->Connection.value());
diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h
index 22ba3d3fd82..a4e4daba4f0 100644
--- a/ydb/core/local_pgwire/local_pgwire_util.h
+++ b/ydb/core/local_pgwire/local_pgwire_util.h
@@ -31,11 +31,19 @@ struct TConnectionState {
struct TParsedStatement {
NPG::TPGParse::TQueryData QueryData;
- NPG::TPGBind::TBindData BindData;
std::vector<Ydb::Type> ParameterTypes;
std::vector<NPG::TEvPGEvents::TRowDescriptionField> DataFields;
};
+struct TPortal : TParsedStatement {
+ NPG::TPGBind::TBindData BindData;
+
+ void Construct(const TParsedStatement& parsedStatement, NPG::TPGBind::TBindData&& bindData) {
+ (TParsedStatement&)(*this) = parsedStatement;
+ BindData = std::move(bindData);
+ }
+};
+
enum EFormatType : int16_t {
EFormatText = 0,
EFormatBinary = 1,
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
index e3b6cc6435d..940fe5449e6 100644
--- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
@@ -388,14 +388,14 @@ class TPgwireKqpProxyExecute : public TPgwireKqpProxy<TPgwireKqpProxyExecute> {
using TBase = TPgwireKqpProxy<TPgwireKqpProxyExecute>;
NPG::TEvPGEvents::TEvExecute::TPtr EventExecute_;
- TParsedStatement Statement_;
+ TPortal Portal_;
std::size_t RowsSelected_ = 0;
public:
- TPgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, const TParsedStatement& statement)
+ TPgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, const TPortal& portal)
: TPgwireKqpProxy(owner, std::move(params), connection)
, EventExecute_(std::move(evExecute))
- , Statement_(statement)
+ , Portal_(portal)
{
}
@@ -404,20 +404,20 @@ public:
NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
// HACK
- ConvertQueryToRequest(Statement_.QueryData.Query, request);
+ ConvertQueryToRequest(Portal_.QueryData.Query, request);
request.SetUsePublicResponseDataFormat(true);
if (request.HasAction()) {
- for (unsigned int paramNum = 0; paramNum < Statement_.BindData.ParametersValue.size(); ++paramNum) {
- if (paramNum >= Statement_.ParameterTypes.size()) {
+ for (unsigned int paramNum = 0; paramNum < Portal_.BindData.ParametersValue.size(); ++paramNum) {
+ if (paramNum >= Portal_.ParameterTypes.size()) {
// TODO(xenoxeno): report error
break;
}
- Ydb::Type type = Statement_.ParameterTypes[paramNum];
+ Ydb::Type type = Portal_.ParameterTypes[paramNum];
int16_t format = 0; // text
- if (paramNum < Statement_.BindData.ParametersFormat.size()) {
- format = Statement_.BindData.ParametersFormat[paramNum];
+ if (paramNum < Portal_.BindData.ParametersFormat.size()) {
+ format = Portal_.BindData.ParametersFormat[paramNum];
}
- Ydb::TypedValue value = GetTypedValueFromParam(format, Statement_.BindData.ParametersValue[paramNum], type);
+ Ydb::TypedValue value = GetTypedValueFromParam(format, Portal_.BindData.ParametersValue[paramNum], type);
request.MutableYdbParameters()->insert({TStringBuilder() << "$p" << paramNum + 1, value});
}
ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
@@ -440,7 +440,7 @@ public:
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet()));
auto response = MakeResponse();
- FillResultSet(resultSet, response.get()->DataRows, Statement_.BindData.ResultsFormat);
+ FillResultSet(resultSet, response.get()->DataRows, Portal_.BindData.ResultsFormat);
response->CommandCompleted = false;
response->ReadyForQuery = false;
@@ -513,8 +513,8 @@ NActors::IActor* CreatePgwireKqpProxyExecute(const TActorId& owner,
std::unordered_map<TString, TString> params,
const TConnectionState& connection,
NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute,
- const TParsedStatement& statement) {
- return new TPgwireKqpProxyExecute(owner, std::move(params), connection, std::move(evExecute), statement);
+ const TPortal& portal) {
+ return new TPgwireKqpProxyExecute(owner, std::move(params), connection, std::move(evExecute), portal);
}
} //namespace NLocalPgwire
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.h b/ydb/core/local_pgwire/pgwire_kqp_proxy.h
index 3dab049c177..074b6fcfb06 100644
--- a/ydb/core/local_pgwire/pgwire_kqp_proxy.h
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.h
@@ -19,6 +19,6 @@ NActors::IActor* CreatePgwireKqpProxyExecute(const NActors::TActorId& owner,
std::unordered_map<TString, TString> params,
const TConnectionState& connection,
NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute,
- const TParsedStatement& statement);
+ const TPortal& portal);
}
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index 163c25bd548..d4326212742 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -579,7 +579,8 @@ protected:
void HandleConnected(TEvPGEvents::TEvDescribeResponse::TPtr& ev) {
if (IsEventExpected(ev)) {
if (ev->Get()->ErrorFields.empty()) {
- if (ev->Get()->ParameterTypes.size() > 0) { // parameterDescription
+ if (ev->Get()->ParameterTypes.size() > 0) {
+ // parameterDescription (statement only)
TPGStreamOutput<TPGParameterDescription> parameterDescription;
parameterDescription << uint16_t(ev->Get()->ParameterTypes.size()); // number of fields
for (auto type : ev->Get()->ParameterTypes) {
@@ -659,8 +660,12 @@ protected:
void HandleConnected(TEvPGEvents::TEvBindResponse::TPtr& ev) {
if (IsEventExpected(ev)) {
- TPGStreamOutput<TPGBindComplete> bindComplete;
- SendStream(bindComplete);
+ if (ev->Get()->ErrorFields.empty()) {
+ TPGStreamOutput<TPGBindComplete> bindComplete;
+ SendStream(bindComplete);
+ } else {
+ SendErrorResponse(ev->Get()->ErrorFields);
+ }
++OutgoingSequenceNumber;
BecomeReadyForQuery();
} else {
diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h
index e01795f716c..56c88b75ff2 100644
--- a/ydb/core/pgproxy/pg_proxy_events.h
+++ b/ydb/core/pgproxy/pg_proxy_events.h
@@ -168,6 +168,7 @@ struct TEvPGEvents {
struct TEvBindResponse : NActors::TEventLocal<TEvBindResponse, EvBindResponse> {
std::unique_ptr<TPGBind> OriginalMessage;
+ std::vector<std::pair<char, TString>> ErrorFields;
TEvBindResponse(std::unique_ptr<TPGBind> originalMessage)
: OriginalMessage(std::move(originalMessage))
diff --git a/ydb/core/pgproxy/pg_proxy_types.cpp b/ydb/core/pgproxy/pg_proxy_types.cpp
index 334add17349..98398fe6ee3 100644
--- a/ydb/core/pgproxy/pg_proxy_types.cpp
+++ b/ydb/core/pgproxy/pg_proxy_types.cpp
@@ -86,7 +86,7 @@ TString TPGParse::Dump() const {
TPGStreamInput stream(*this);
TStringBuf name;
stream >> name;
- return TStringBuilder() << "Name: " << name;
+ return TStringBuilder() << "Statement: \"" << name << "\"";
}
TPGParse::TQueryData TPGParse::GetQueryData() const {
@@ -146,11 +146,7 @@ TString TPGBind::Dump() const {
TStringBuf statementName;
stream >> portalName;
stream >> statementName;
- if (portalName) {
- text << "Portal: '" << portalName << "'";
- } else if (statementName) {
- text << "Statement: '" << statementName << "'";
- }
+ text << "Statement: \"" << statementName << "\"" << " Portal: \"" << portalName << "\"";
uint16_t numberOfParameterFormats = 0;
stream >> numberOfParameterFormats;
std::vector<uint16_t> parameterFormats;
@@ -230,7 +226,7 @@ TString TPGDescribe::Dump() const {
text << "Type: " << describeType;
TStringBuf name;
stream >> name;
- text << " Name: " << name;
+ text << " Name: \"" << name << "\"";
return text;
}
@@ -247,14 +243,9 @@ TString TPGExecute::Dump() const {
TStringBuf name;
uint32_t maxRows = 0;
stream >> name >> maxRows;
- if (name) {
- text << "Name: " << name;
- }
+ text << "Portal: \"" << name << "\"";
if (maxRows) {
- if (!text.empty()) {
- text << ' ';
- }
- text << "MaxRows: " << maxRows;
+ text << " MaxRows: " << maxRows;
}
return text;
}
@@ -265,15 +256,8 @@ TPGClose::TCloseData TPGClose::GetCloseData() const {
char type;
TString name;
stream >> type;
- stream >> name;
- switch (type) {
- case 'S':
- closeData.StatementName = name;
- break;
- case 'P':
- closeData.PortalName = name;
- break;
- }
+ closeData.Type = static_cast<TCloseData::ECloseType>(type);
+ stream >> closeData.Name;
return closeData;
}
diff --git a/ydb/core/pgproxy/pg_proxy_types.h b/ydb/core/pgproxy/pg_proxy_types.h
index 6b1482810bb..0825200ca0b 100644
--- a/ydb/core/pgproxy/pg_proxy_types.h
+++ b/ydb/core/pgproxy/pg_proxy_types.h
@@ -235,8 +235,12 @@ struct TPGExecute : TPGMessageType<'E'> {
struct TPGClose : TPGMessageType<'C'> {
struct TCloseData {
- TString PortalName;
- TString StatementName;
+ enum class ECloseType : char {
+ Portal = 'P',
+ Statement = 'S',
+ };
+ ECloseType Type;
+ TString Name;
};
TCloseData GetCloseData() const;