aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-07-12 07:35:12 +0300
committerxenoxeno <xeno@ydb.tech>2023-07-12 07:35:12 +0300
commit4265abd14384a044d1fec53ae94fd91b18dbfa5e (patch)
tree275303869dc0957168a9ed001c77d3f9cf5ca4ec
parent8096026d597e38b356aa1c680d4ea44534ddec08 (diff)
downloadydb-4265abd14384a044d1fec53ae94fd91b18dbfa5e.tar.gz
rework parse/describe commands to better support parameters KIKIMR-18052
-rw-r--r--ydb/core/local_pgwire/local_pgwire_connection.cpp68
-rw-r--r--ydb/core/local_pgwire/local_pgwire_util.h54
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.cpp152
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp45
-rw-r--r--ydb/core/pgproxy/pg_proxy_events.h1
5 files changed, 222 insertions, 98 deletions
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp
index 9dcbaca5ac..dcec1fab9d 100644
--- a/ydb/core/local_pgwire/local_pgwire_connection.cpp
+++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp
@@ -25,6 +25,7 @@ extern NActors::IActor* CreatePgwireKqpProxy(
);
NActors::IActor* CreatePgwireKqpProxyQuery(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery);
+NActors::IActor* CreatePgwireKqpProxyParse(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvParse::TPtr&& evParse);
NActors::IActor* CreatePgwireKqpProxyDescribe(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TParsedStatement& statement);
NActors::IActor* CreatePgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, const TParsedStatement& statement);
@@ -59,12 +60,10 @@ public:
void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) {
BLOG_D("TEvParse " << ev->Sender);
-
- auto queryData = ev->Get()->Message->GetQueryData();
- ParsedStatements[queryData.Name].QueryData = queryData;
-
- auto parseComplete = ev->Get()->Reply();
- Send(ev->Sender, parseComplete.release());
+ ++Inflight;
+ TActorId actorId = Register(CreatePgwireKqpProxyParse(SelfId(), ConnectionParams, Connection, std::move(ev)));
+ BLOG_D("Created pgwireKqpProxyParse: " << actorId);
+ return;
}
void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) {
@@ -93,7 +92,7 @@ public:
TString statementName = ev->Get()->Message->GetDescribeData().Name;
if (statementName.empty()) {
statementName = CurrentStatement;
- BLOG_W("TEvExecute changed empty statement to " << CurrentStatement);
+ BLOG_W("TEvDescribe changed empty statement to " << CurrentStatement);
}
auto it = ParsedStatements.find(statementName);
if (it == ParsedStatements.end()) {
@@ -104,10 +103,13 @@ public:
return;
}
- ++Inflight;
- TActorId actorId = Register(CreatePgwireKqpProxyDescribe(SelfId(), ConnectionParams, Connection, std::move(ev), it->second));
- BLOG_D("Created pgwireKqpProxyDescribe: " << actorId);
- return;
+ auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>();
+ for (const auto& ydbType : it->second.ParameterTypes) {
+ response->ParameterTypes.push_back(GetPgOidFromYdbType(ydbType));
+ }
+ response->DataFields = it->second.DataFields;
+
+ Send(ev->Sender, response.release());
}
void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) {
@@ -142,26 +144,32 @@ public:
void Handle(TEvEvents::TEvProxyCompleted::TPtr& ev) {
--Inflight;
BLOG_D("Received TEvProxyCompleted");
- if (ev->Get()->Connection.Transaction.Status) {
- BLOG_D("Updating transaction state to " << ev->Get()->Connection.Transaction.Status);
- Connection.Transaction.Status = ev->Get()->Connection.Transaction.Status;
- switch (ev->Get()->Connection.Transaction.Status) {
- case 'I':
- case 'E':
- Connection.Transaction.Id.clear();
- BLOG_D("Transaction id cleared");
- break;
- case 'T':
- if (ev->Get()->Connection.Transaction.Id) {
- Connection.Transaction.Id = ev->Get()->Connection.Transaction.Id;
- BLOG_D("Transaction id is " << Connection.Transaction.Id);
- }
- break;
- }
+ if (ev->Get()->ParsedStatement) {
+ ParsedStatements[ev->Get()->ParsedStatement.value().QueryData.Name] = ev->Get()->ParsedStatement.value();
}
- if (ev->Get()->Connection.SessionId) {
- BLOG_D("Session id is " << ev->Get()->Connection.SessionId);
- Connection.SessionId = ev->Get()->Connection.SessionId;
+ if (ev->Get()->Connection) {
+ auto& connection(ev->Get()->Connection.value());
+ if (connection.Transaction.Status) {
+ BLOG_D("Updating transaction state to " << connection.Transaction.Status);
+ Connection.Transaction.Status = connection.Transaction.Status;
+ switch (connection.Transaction.Status) {
+ case 'I':
+ case 'E':
+ Connection.Transaction.Id.clear();
+ BLOG_D("Transaction id cleared");
+ break;
+ case 'T':
+ if (connection.Transaction.Id) {
+ Connection.Transaction.Id = connection.Transaction.Id;
+ BLOG_D("Transaction id is " << Connection.Transaction.Id);
+ }
+ break;
+ }
+ }
+ if (connection.SessionId) {
+ BLOG_D("Session id is " << connection.SessionId);
+ Connection.SessionId = connection.SessionId;
+ }
}
while (!Events.empty() && Inflight == 0) {
StateWork(Events.front());
diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h
index fa0dddb2cb..9dedeb4fcc 100644
--- a/ydb/core/local_pgwire/local_pgwire_util.h
+++ b/ydb/core/local_pgwire/local_pgwire_util.h
@@ -44,6 +44,13 @@ struct TConnectionState {
TTransactionState Transaction;
};
+struct TParsedStatement {
+ NPG::TPGParse::TQueryData QueryData;
+ NPG::TPGBind::TBindData BindData;
+ std::vector<Ydb::Type> ParameterTypes;
+ std::vector<NPG::TEvPGEvents::TRowDescriptionField> DataFields;
+};
+
struct TEvEvents {
enum EEv {
EvProxyCompleted = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
@@ -53,17 +60,19 @@ struct TEvEvents {
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "ES_PRIVATE event space is too small.");
struct TEvProxyCompleted : NActors::TEventLocal<TEvProxyCompleted, EvProxyCompleted> {
- TConnectionState Connection;
+ std::optional<TConnectionState> Connection;
+ std::optional<TParsedStatement> ParsedStatement;
+
+ TEvProxyCompleted() = default;
- TEvProxyCompleted(const TConnectionState& connection = {})
+ TEvProxyCompleted(const TConnectionState& connection)
: Connection(connection)
{}
- };
-};
-struct TParsedStatement {
- NPG::TPGParse::TQueryData QueryData;
- NPG::TPGBind::TBindData BindData;
+ TEvProxyCompleted(const TParsedStatement& parsedStatement)
+ : ParsedStatement(parsedStatement)
+ {}
+ };
};
inline TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser) {
@@ -234,37 +243,6 @@ inline NYdb::NScripting::TExecuteYqlResult ConvertProtoResponseToSdkResult(Ydb::
return {NYdb::TStatus(std::move(alwaysSuccess)), std::move(res), queryStats};
}
-struct TConvertedQuery {
- TString Query;
- NYdb::TParams Params;
-};
-
-inline TConvertedQuery ConvertQuery(const TParsedStatement& statement) {
- auto& bindData = statement.BindData;
- const auto& queryData = statement.QueryData;
- NYdb::TParamsBuilder paramsBuilder;
- TStringBuilder injectedQuery;
-
- for (size_t idxParam = 0; idxParam < queryData.ParametersTypes.size(); ++idxParam) {
- int32_t paramType = queryData.ParametersTypes[idxParam];
- TString paramValue;
- if (idxParam < bindData.ParametersValue.size()) {
- std::vector<uint8_t> paramVal = bindData.ParametersValue[idxParam];
- paramValue = TString(reinterpret_cast<char*>(paramVal.data()), paramVal.size());
- }
- switch (paramType) {
- case INT2OID:
- paramsBuilder.AddParam(TStringBuilder() << ":_" << idxParam + 1).Int16(atoi(paramValue.data())).Build();
- break;
-
- }
- }
- return {
- .Query = injectedQuery + queryData.Query,
- .Params = paramsBuilder.Build(),
- };
-}
-
inline bool IsQueryEmptyChar(char c) {
return c == ' ' || c == ';' || c == '\t' || c == '\n' || c == '\r' || c == '\f' || c == '\v';
}
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
index b26c32e494..5914fe97f1 100644
--- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
@@ -273,6 +273,101 @@ public:
}
};
+class TPgwireKqpProxyParse : public TPgwireKqpProxy<TPgwireKqpProxyParse> {
+ using TBase = TPgwireKqpProxy<TPgwireKqpProxyParse>;
+
+ NPG::TEvPGEvents::TEvParse::TPtr EventParse_;
+ NPG::TPGParse::TQueryData QueryData_;
+
+public:
+ TPgwireKqpProxyParse(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvParse::TPtr&& evParse)
+ : TPgwireKqpProxy(owner, std::move(params), connection)
+ , EventParse_(std::move(evParse))
+ , QueryData_(EventParse_->Get()->Message->GetQueryData())
+ {}
+
+ void Bootstrap() {
+ auto event = MakeKqpRequest();
+ NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
+
+ // HACK
+ ConvertQueryToRequest(QueryData_.Query, request);
+ if (request.HasAction()) {
+ request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN);
+
+ request.SetUsePublicResponseDataFormat(true);
+
+ ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
+ BLOG_D("Sent event to kqpProxy " << request.ShortDebugString());
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
+ }
+ // TODO(xenoxeno): timeout
+ Become(&TPgwireKqpProxyParse::StateWork);
+ }
+
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
+ BLOG_D("Handling TEvKqp::TEvQueryResponse " << ev->Get()->Record.ShortDebugString());
+ NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
+ auto response = EventParse_->Get()->Reply();
+ try {
+ if (record.HasYdbStatus()) {
+ if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ TParsedStatement statement;
+ statement.QueryData = std::move(QueryData_);
+ for (const auto& param : record.GetResponse().GetQueryParameters()) {
+ Ydb::Type ydbType;
+ ConvertMiniKQLTypeToYdbType(param.GetType(), ydbType);
+ statement.ParameterTypes.push_back(ydbType);
+ }
+ for (const auto& result : record.GetResponse().GetYdbResults()) {
+ for (const auto& column : result.columns()) {
+ std::optional<NYdb::TPgType> pgType = GetPgTypeFromYdbType(column.type());
+ if (pgType.has_value()) {
+ statement.DataFields.push_back({
+ .Name = column.name(),
+ .DataType = pgType->Oid,
+ .DataTypeSize = pgType->Typlen,
+ .DataTypeModifier = pgType->Typmod,
+ });
+ } else {
+ statement.DataFields.push_back({
+ .Name = column.name()
+ });
+ }
+ }
+ break; // only 1 result is accepted
+ }
+ Send(Owner_, new TEvEvents::TEvProxyCompleted(statement));
+ } else {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
+ NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues));
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << status});
+ }
+ } else {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', "No result received"});
+ }
+ } catch (const std::exception& e) {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', e.what()});
+ }
+ if (!response->ErrorFields.empty()) {
+ Send(Owner_, new TEvEvents::TEvProxyCompleted());
+ }
+ BLOG_D("Finally replying to " << EventParse_->Sender);
+ Send(EventParse_->Sender, response.release(), 0, EventParse_->Cookie);
+ PassAway();
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
+ }
+ }
+};
+
class TPgwireKqpProxyDescribe : public TPgwireKqpProxy<TPgwireKqpProxyDescribe> {
using TBase = TPgwireKqpProxy<TPgwireKqpProxyDescribe>;
@@ -287,12 +382,11 @@ public:
{}
void Bootstrap() {
- auto query(ConvertQuery(Statement_));
auto event = MakeKqpRequest();
NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
// HACK
- ConvertQueryToRequest(query.Query, request);
+ ConvertQueryToRequest(Statement_.QueryData.Query, request);
if (request.HasAction()) {
request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN);
@@ -305,7 +399,7 @@ public:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
- BLOG_D("Handling TEvKqp::TEvQueryResponse");
+ BLOG_D("Handling TEvKqp::TEvQueryResponse " << ev->Get()->Record.ShortDebugString());
Send(Owner_, new TEvEvents::TEvProxyCompleted());
NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>();
@@ -359,20 +453,57 @@ public:
{
}
+ static Ydb::TypedValue GetTypedValueFromParam(int16_t format, const std::vector<uint8_t>& value, const Ydb::Type& type) {
+ enum EFormatType : int16_t {
+ EFormatText = 0,
+ EFormatBinary = 1,
+ };
+
+ Ydb::TypedValue typedValue;
+ typedValue.mutable_type()->CopyFrom(type);
+ NYdb::TTypeParser parser(type);
+ if (parser.GetKind() == NYdb::TTypeParser::ETypeKind::Pg) {
+ typedValue.mutable_type()->CopyFrom(type);
+ if (format == EFormatText) {
+ typedValue.mutable_value()->set_text_value(TString(reinterpret_cast<const char*>(&value.front()), value.size()));
+ } else if (format == EFormatBinary) {
+ typedValue.mutable_value()->set_bytes_value(TString(reinterpret_cast<const char*>(&value.front()), value.size()));
+ } else {
+ Y_VERIFY(false/*unknown format type*/);
+ }
+ } else {
+ // it's not supported yet
+ Y_VERIFY(false/*non-PG type*/);
+ }
+ return typedValue;
+ }
+
void Bootstrap() {
- auto query(ConvertQuery(Statement_));
auto event = MakeKqpRequest();
NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
// HACK
- ConvertQueryToRequest(query.Query, request);
+ ConvertQueryToRequest(Statement_.QueryData.Query, request);
if (request.HasAction()) {
+ for (unsigned int paramNum = 0; paramNum < Statement_.BindData.ParametersValue.size(); ++paramNum) {
+ if (paramNum >= Statement_.ParameterTypes.size()) {
+ // TODO(xenoxeno): report error
+ break;
+ }
+ Ydb::Type type = Statement_.ParameterTypes[paramNum];
+ int16_t format = 0; // text
+ if (paramNum < Statement_.BindData.ParametersFormat.size()) {
+ format = Statement_.BindData.ParametersFormat[paramNum];
+ }
+ Ydb::TypedValue value = GetTypedValueFromParam(format, Statement_.BindData.ParametersValue[paramNum], type);
+ request.MutableYdbParameters()->insert({TStringBuilder() << "$p" << paramNum + 1, value});
+ }
ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
BLOG_D("Sent event to kqpProxy " << request.ShortDebugString());
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
}
// TODO(xenoxeno): timeout
- Become(&TPgwireKqpProxyQuery::StateWork);
+ Become(&TPgwireKqpProxyExecute::StateWork);
}
void FillResultSet(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvExecuteResponse* response) {
@@ -415,7 +546,7 @@ public:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
- BLOG_D("Handling TEvKqp::TEvQueryResponse");
+ BLOG_D("Handling TEvKqp::TEvQueryResponse " << ev->Get()->Record.ShortDebugString());
NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
ProcessKqpResponseReleaseProxy(record);
auto response = MakeResponse();
@@ -464,6 +595,13 @@ NActors::IActor* CreatePgwireKqpProxyQuery(const TActorId& owner,
return new TPgwireKqpProxyQuery(owner, std::move(params), connection, std::move(evQuery));
}
+NActors::IActor* CreatePgwireKqpProxyParse(const TActorId& owner,
+ std::unordered_map<TString, TString> params,
+ const TConnectionState& connection,
+ NPG::TEvPGEvents::TEvParse::TPtr&& evParse) {
+ return new TPgwireKqpProxyParse(owner, std::move(params), connection, std::move(evParse));
+}
+
NActors::IActor* CreatePgwireKqpProxyDescribe(const TActorId& owner,
std::unordered_map<TString, TString> params,
const TConnectionState& connection,
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index bf781d86e5..11014bccdf 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -457,6 +457,19 @@ protected:
}
}
+ static void FillErrorResponse(TPGStreamOutput<TPGErrorResponse>& dataOut, const std::vector<std::pair<char, TString>>& dataIn) {
+ for (const auto& field : dataIn) {
+ dataOut << field.first << field.second << '\0';
+ }
+ dataOut << '\0';
+ }
+
+ void SendErrorResponse(const std::vector<std::pair<char, TString>>& errorFields) {
+ TPGStreamOutput<TPGErrorResponse> errorResponse;
+ FillErrorResponse(errorResponse, errorFields);
+ SendStream(errorResponse);
+ }
+
bool FlushAndPoll() {
if (FlushOutput()) {
RequestPoller();
@@ -543,13 +556,7 @@ protected:
}
}
} else {
- // error response
- TPGStreamOutput<TPGErrorResponse> errorResponse;
- for (const auto& field : ev->Get()->ErrorFields) {
- errorResponse << field.first << field.second << '\0';
- }
- errorResponse << '\0';
- SendStream(errorResponse);
+ SendErrorResponse(ev->Get()->ErrorFields);
}
if (ev->Get()->CommandCompleted) {
BecomeReadyForQuery();
@@ -579,13 +586,7 @@ protected:
SendMessage(TPGNoData());
}
} else {
- // error response
- TPGStreamOutput<TPGErrorResponse> errorResponse;
- for (const auto& field : ev->Get()->ErrorFields) {
- errorResponse << field.first << field.second << '\0';
- }
- errorResponse << '\0';
- SendStream(errorResponse);
+ SendErrorResponse(ev->Get()->ErrorFields);
}
++OutgoingSequenceNumber;
BecomeReadyForQuery();
@@ -620,13 +621,7 @@ protected:
}
}
} else {
- // error response
- TPGStreamOutput<TPGErrorResponse> errorResponse;
- for (const auto& field : ev->Get()->ErrorFields) {
- errorResponse << field.first << field.second << '\0';
- }
- errorResponse << '\0';
- SendStream(errorResponse);
+ SendErrorResponse(ev->Get()->ErrorFields);
}
if (ev->Get()->CommandCompleted) {
++OutgoingSequenceNumber;
@@ -639,8 +634,12 @@ protected:
void HandleConnected(TEvPGEvents::TEvParseResponse::TPtr& ev) {
if (IsEventExpected(ev)) {
- TPGStreamOutput<TPGParseComplete> parseComplete;
- SendStream(parseComplete);
+ if (ev->Get()->ErrorFields.empty()) {
+ TPGStreamOutput<TPGParseComplete> parseComplete;
+ SendStream(parseComplete);
+ } else {
+ SendErrorResponse(ev->Get()->ErrorFields);
+ }
++OutgoingSequenceNumber;
BecomeReadyForQuery();
} else {
diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h
index f373264a74..27782930c5 100644
--- a/ydb/core/pgproxy/pg_proxy_events.h
+++ b/ydb/core/pgproxy/pg_proxy_events.h
@@ -146,6 +146,7 @@ struct TEvPGEvents {
struct TEvParseResponse : NActors::TEventLocal<TEvParseResponse, EvParseResponse> {
std::unique_ptr<TPGParse> OriginalMessage;
+ std::vector<std::pair<char, TString>> ErrorFields;
TEvParseResponse(std::unique_ptr<TPGParse> originalMessage)
: OriginalMessage(std::move(originalMessage))