diff options
author | xenoxeno <xeno@ydb.tech> | 2023-07-12 07:35:12 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-07-12 07:35:12 +0300 |
commit | 4265abd14384a044d1fec53ae94fd91b18dbfa5e (patch) | |
tree | 275303869dc0957168a9ed001c77d3f9cf5ca4ec | |
parent | 8096026d597e38b356aa1c680d4ea44534ddec08 (diff) | |
download | ydb-4265abd14384a044d1fec53ae94fd91b18dbfa5e.tar.gz |
rework parse/describe commands to better support parameters KIKIMR-18052
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_connection.cpp | 68 | ||||
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_util.h | 54 | ||||
-rw-r--r-- | ydb/core/local_pgwire/pgwire_kqp_proxy.cpp | 152 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_connection.cpp | 45 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_proxy_events.h | 1 |
5 files changed, 222 insertions, 98 deletions
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp index 9dcbaca5ac..dcec1fab9d 100644 --- a/ydb/core/local_pgwire/local_pgwire_connection.cpp +++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp @@ -25,6 +25,7 @@ extern NActors::IActor* CreatePgwireKqpProxy( ); NActors::IActor* CreatePgwireKqpProxyQuery(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery); +NActors::IActor* CreatePgwireKqpProxyParse(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvParse::TPtr&& evParse); NActors::IActor* CreatePgwireKqpProxyDescribe(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TParsedStatement& statement); NActors::IActor* CreatePgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, const TParsedStatement& statement); @@ -59,12 +60,10 @@ public: void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) { BLOG_D("TEvParse " << ev->Sender); - - auto queryData = ev->Get()->Message->GetQueryData(); - ParsedStatements[queryData.Name].QueryData = queryData; - - auto parseComplete = ev->Get()->Reply(); - Send(ev->Sender, parseComplete.release()); + ++Inflight; + TActorId actorId = Register(CreatePgwireKqpProxyParse(SelfId(), ConnectionParams, Connection, std::move(ev))); + BLOG_D("Created pgwireKqpProxyParse: " << actorId); + return; } void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) { @@ -93,7 +92,7 @@ public: TString statementName = ev->Get()->Message->GetDescribeData().Name; if (statementName.empty()) { statementName = CurrentStatement; - BLOG_W("TEvExecute changed empty statement to " << CurrentStatement); + BLOG_W("TEvDescribe changed empty statement to " << CurrentStatement); } auto it = ParsedStatements.find(statementName); if (it == ParsedStatements.end()) { @@ -104,10 +103,13 @@ public: return; } - ++Inflight; - TActorId actorId = Register(CreatePgwireKqpProxyDescribe(SelfId(), ConnectionParams, Connection, std::move(ev), it->second)); - BLOG_D("Created pgwireKqpProxyDescribe: " << actorId); - return; + auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>(); + for (const auto& ydbType : it->second.ParameterTypes) { + response->ParameterTypes.push_back(GetPgOidFromYdbType(ydbType)); + } + response->DataFields = it->second.DataFields; + + Send(ev->Sender, response.release()); } void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) { @@ -142,26 +144,32 @@ public: void Handle(TEvEvents::TEvProxyCompleted::TPtr& ev) { --Inflight; BLOG_D("Received TEvProxyCompleted"); - if (ev->Get()->Connection.Transaction.Status) { - BLOG_D("Updating transaction state to " << ev->Get()->Connection.Transaction.Status); - Connection.Transaction.Status = ev->Get()->Connection.Transaction.Status; - switch (ev->Get()->Connection.Transaction.Status) { - case 'I': - case 'E': - Connection.Transaction.Id.clear(); - BLOG_D("Transaction id cleared"); - break; - case 'T': - if (ev->Get()->Connection.Transaction.Id) { - Connection.Transaction.Id = ev->Get()->Connection.Transaction.Id; - BLOG_D("Transaction id is " << Connection.Transaction.Id); - } - break; - } + if (ev->Get()->ParsedStatement) { + ParsedStatements[ev->Get()->ParsedStatement.value().QueryData.Name] = ev->Get()->ParsedStatement.value(); } - if (ev->Get()->Connection.SessionId) { - BLOG_D("Session id is " << ev->Get()->Connection.SessionId); - Connection.SessionId = ev->Get()->Connection.SessionId; + if (ev->Get()->Connection) { + auto& connection(ev->Get()->Connection.value()); + if (connection.Transaction.Status) { + BLOG_D("Updating transaction state to " << connection.Transaction.Status); + Connection.Transaction.Status = connection.Transaction.Status; + switch (connection.Transaction.Status) { + case 'I': + case 'E': + Connection.Transaction.Id.clear(); + BLOG_D("Transaction id cleared"); + break; + case 'T': + if (connection.Transaction.Id) { + Connection.Transaction.Id = connection.Transaction.Id; + BLOG_D("Transaction id is " << Connection.Transaction.Id); + } + break; + } + } + if (connection.SessionId) { + BLOG_D("Session id is " << connection.SessionId); + Connection.SessionId = connection.SessionId; + } } while (!Events.empty() && Inflight == 0) { StateWork(Events.front()); diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h index fa0dddb2cb..9dedeb4fcc 100644 --- a/ydb/core/local_pgwire/local_pgwire_util.h +++ b/ydb/core/local_pgwire/local_pgwire_util.h @@ -44,6 +44,13 @@ struct TConnectionState { TTransactionState Transaction; }; +struct TParsedStatement { + NPG::TPGParse::TQueryData QueryData; + NPG::TPGBind::TBindData BindData; + std::vector<Ydb::Type> ParameterTypes; + std::vector<NPG::TEvPGEvents::TRowDescriptionField> DataFields; +}; + struct TEvEvents { enum EEv { EvProxyCompleted = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), @@ -53,17 +60,19 @@ struct TEvEvents { static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "ES_PRIVATE event space is too small."); struct TEvProxyCompleted : NActors::TEventLocal<TEvProxyCompleted, EvProxyCompleted> { - TConnectionState Connection; + std::optional<TConnectionState> Connection; + std::optional<TParsedStatement> ParsedStatement; + + TEvProxyCompleted() = default; - TEvProxyCompleted(const TConnectionState& connection = {}) + TEvProxyCompleted(const TConnectionState& connection) : Connection(connection) {} - }; -}; -struct TParsedStatement { - NPG::TPGParse::TQueryData QueryData; - NPG::TPGBind::TBindData BindData; + TEvProxyCompleted(const TParsedStatement& parsedStatement) + : ParsedStatement(parsedStatement) + {} + }; }; inline TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser) { @@ -234,37 +243,6 @@ inline NYdb::NScripting::TExecuteYqlResult ConvertProtoResponseToSdkResult(Ydb:: return {NYdb::TStatus(std::move(alwaysSuccess)), std::move(res), queryStats}; } -struct TConvertedQuery { - TString Query; - NYdb::TParams Params; -}; - -inline TConvertedQuery ConvertQuery(const TParsedStatement& statement) { - auto& bindData = statement.BindData; - const auto& queryData = statement.QueryData; - NYdb::TParamsBuilder paramsBuilder; - TStringBuilder injectedQuery; - - for (size_t idxParam = 0; idxParam < queryData.ParametersTypes.size(); ++idxParam) { - int32_t paramType = queryData.ParametersTypes[idxParam]; - TString paramValue; - if (idxParam < bindData.ParametersValue.size()) { - std::vector<uint8_t> paramVal = bindData.ParametersValue[idxParam]; - paramValue = TString(reinterpret_cast<char*>(paramVal.data()), paramVal.size()); - } - switch (paramType) { - case INT2OID: - paramsBuilder.AddParam(TStringBuilder() << ":_" << idxParam + 1).Int16(atoi(paramValue.data())).Build(); - break; - - } - } - return { - .Query = injectedQuery + queryData.Query, - .Params = paramsBuilder.Build(), - }; -} - inline bool IsQueryEmptyChar(char c) { return c == ' ' || c == ';' || c == '\t' || c == '\n' || c == '\r' || c == '\f' || c == '\v'; } diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp index b26c32e494..5914fe97f1 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -273,6 +273,101 @@ public: } }; +class TPgwireKqpProxyParse : public TPgwireKqpProxy<TPgwireKqpProxyParse> { + using TBase = TPgwireKqpProxy<TPgwireKqpProxyParse>; + + NPG::TEvPGEvents::TEvParse::TPtr EventParse_; + NPG::TPGParse::TQueryData QueryData_; + +public: + TPgwireKqpProxyParse(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvParse::TPtr&& evParse) + : TPgwireKqpProxy(owner, std::move(params), connection) + , EventParse_(std::move(evParse)) + , QueryData_(EventParse_->Get()->Message->GetQueryData()) + {} + + void Bootstrap() { + auto event = MakeKqpRequest(); + NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); + + // HACK + ConvertQueryToRequest(QueryData_.Query, request); + if (request.HasAction()) { + request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN); + + request.SetUsePublicResponseDataFormat(true); + + ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); + BLOG_D("Sent event to kqpProxy " << request.ShortDebugString()); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); + } + // TODO(xenoxeno): timeout + Become(&TPgwireKqpProxyParse::StateWork); + } + + void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { + BLOG_D("Handling TEvKqp::TEvQueryResponse " << ev->Get()->Record.ShortDebugString()); + NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); + auto response = EventParse_->Get()->Reply(); + try { + if (record.HasYdbStatus()) { + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + TParsedStatement statement; + statement.QueryData = std::move(QueryData_); + for (const auto& param : record.GetResponse().GetQueryParameters()) { + Ydb::Type ydbType; + ConvertMiniKQLTypeToYdbType(param.GetType(), ydbType); + statement.ParameterTypes.push_back(ydbType); + } + for (const auto& result : record.GetResponse().GetYdbResults()) { + for (const auto& column : result.columns()) { + std::optional<NYdb::TPgType> pgType = GetPgTypeFromYdbType(column.type()); + if (pgType.has_value()) { + statement.DataFields.push_back({ + .Name = column.name(), + .DataType = pgType->Oid, + .DataTypeSize = pgType->Typlen, + .DataTypeModifier = pgType->Typmod, + }); + } else { + statement.DataFields.push_back({ + .Name = column.name() + }); + } + } + break; // only 1 result is accepted + } + Send(Owner_, new TEvEvents::TEvProxyCompleted(statement)); + } else { + NYql::TIssues issues; + NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues); + NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues)); + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', TStringBuilder() << status}); + } + } else { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', "No result received"}); + } + } catch (const std::exception& e) { + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', e.what()}); + } + if (!response->ErrorFields.empty()) { + Send(Owner_, new TEvEvents::TEvProxyCompleted()); + } + BLOG_D("Finally replying to " << EventParse_->Sender); + Send(EventParse_->Sender, response.release(), 0, EventParse_->Cookie); + PassAway(); + } + + STATEFN(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); + } + } +}; + class TPgwireKqpProxyDescribe : public TPgwireKqpProxy<TPgwireKqpProxyDescribe> { using TBase = TPgwireKqpProxy<TPgwireKqpProxyDescribe>; @@ -287,12 +382,11 @@ public: {} void Bootstrap() { - auto query(ConvertQuery(Statement_)); auto event = MakeKqpRequest(); NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); // HACK - ConvertQueryToRequest(query.Query, request); + ConvertQueryToRequest(Statement_.QueryData.Query, request); if (request.HasAction()) { request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN); @@ -305,7 +399,7 @@ public: } void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { - BLOG_D("Handling TEvKqp::TEvQueryResponse"); + BLOG_D("Handling TEvKqp::TEvQueryResponse " << ev->Get()->Record.ShortDebugString()); Send(Owner_, new TEvEvents::TEvProxyCompleted()); NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>(); @@ -359,20 +453,57 @@ public: { } + static Ydb::TypedValue GetTypedValueFromParam(int16_t format, const std::vector<uint8_t>& value, const Ydb::Type& type) { + enum EFormatType : int16_t { + EFormatText = 0, + EFormatBinary = 1, + }; + + Ydb::TypedValue typedValue; + typedValue.mutable_type()->CopyFrom(type); + NYdb::TTypeParser parser(type); + if (parser.GetKind() == NYdb::TTypeParser::ETypeKind::Pg) { + typedValue.mutable_type()->CopyFrom(type); + if (format == EFormatText) { + typedValue.mutable_value()->set_text_value(TString(reinterpret_cast<const char*>(&value.front()), value.size())); + } else if (format == EFormatBinary) { + typedValue.mutable_value()->set_bytes_value(TString(reinterpret_cast<const char*>(&value.front()), value.size())); + } else { + Y_VERIFY(false/*unknown format type*/); + } + } else { + // it's not supported yet + Y_VERIFY(false/*non-PG type*/); + } + return typedValue; + } + void Bootstrap() { - auto query(ConvertQuery(Statement_)); auto event = MakeKqpRequest(); NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest(); // HACK - ConvertQueryToRequest(query.Query, request); + ConvertQueryToRequest(Statement_.QueryData.Query, request); if (request.HasAction()) { + for (unsigned int paramNum = 0; paramNum < Statement_.BindData.ParametersValue.size(); ++paramNum) { + if (paramNum >= Statement_.ParameterTypes.size()) { + // TODO(xenoxeno): report error + break; + } + Ydb::Type type = Statement_.ParameterTypes[paramNum]; + int16_t format = 0; // text + if (paramNum < Statement_.BindData.ParametersFormat.size()) { + format = Statement_.BindData.ParametersFormat[paramNum]; + } + Ydb::TypedValue value = GetTypedValueFromParam(format, Statement_.BindData.ParametersValue[paramNum], type); + request.MutableYdbParameters()->insert({TStringBuilder() << "$p" << paramNum + 1, value}); + } ActorIdToProto(SelfId(), event->Record.MutableRequestActorId()); BLOG_D("Sent event to kqpProxy " << request.ShortDebugString()); Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release()); } // TODO(xenoxeno): timeout - Become(&TPgwireKqpProxyQuery::StateWork); + Become(&TPgwireKqpProxyExecute::StateWork); } void FillResultSet(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvExecuteResponse* response) { @@ -415,7 +546,7 @@ public: } void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) { - BLOG_D("Handling TEvKqp::TEvQueryResponse"); + BLOG_D("Handling TEvKqp::TEvQueryResponse " << ev->Get()->Record.ShortDebugString()); NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef(); ProcessKqpResponseReleaseProxy(record); auto response = MakeResponse(); @@ -464,6 +595,13 @@ NActors::IActor* CreatePgwireKqpProxyQuery(const TActorId& owner, return new TPgwireKqpProxyQuery(owner, std::move(params), connection, std::move(evQuery)); } +NActors::IActor* CreatePgwireKqpProxyParse(const TActorId& owner, + std::unordered_map<TString, TString> params, + const TConnectionState& connection, + NPG::TEvPGEvents::TEvParse::TPtr&& evParse) { + return new TPgwireKqpProxyParse(owner, std::move(params), connection, std::move(evParse)); +} + NActors::IActor* CreatePgwireKqpProxyDescribe(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index bf781d86e5..11014bccdf 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -457,6 +457,19 @@ protected: } } + static void FillErrorResponse(TPGStreamOutput<TPGErrorResponse>& dataOut, const std::vector<std::pair<char, TString>>& dataIn) { + for (const auto& field : dataIn) { + dataOut << field.first << field.second << '\0'; + } + dataOut << '\0'; + } + + void SendErrorResponse(const std::vector<std::pair<char, TString>>& errorFields) { + TPGStreamOutput<TPGErrorResponse> errorResponse; + FillErrorResponse(errorResponse, errorFields); + SendStream(errorResponse); + } + bool FlushAndPoll() { if (FlushOutput()) { RequestPoller(); @@ -543,13 +556,7 @@ protected: } } } else { - // error response - TPGStreamOutput<TPGErrorResponse> errorResponse; - for (const auto& field : ev->Get()->ErrorFields) { - errorResponse << field.first << field.second << '\0'; - } - errorResponse << '\0'; - SendStream(errorResponse); + SendErrorResponse(ev->Get()->ErrorFields); } if (ev->Get()->CommandCompleted) { BecomeReadyForQuery(); @@ -579,13 +586,7 @@ protected: SendMessage(TPGNoData()); } } else { - // error response - TPGStreamOutput<TPGErrorResponse> errorResponse; - for (const auto& field : ev->Get()->ErrorFields) { - errorResponse << field.first << field.second << '\0'; - } - errorResponse << '\0'; - SendStream(errorResponse); + SendErrorResponse(ev->Get()->ErrorFields); } ++OutgoingSequenceNumber; BecomeReadyForQuery(); @@ -620,13 +621,7 @@ protected: } } } else { - // error response - TPGStreamOutput<TPGErrorResponse> errorResponse; - for (const auto& field : ev->Get()->ErrorFields) { - errorResponse << field.first << field.second << '\0'; - } - errorResponse << '\0'; - SendStream(errorResponse); + SendErrorResponse(ev->Get()->ErrorFields); } if (ev->Get()->CommandCompleted) { ++OutgoingSequenceNumber; @@ -639,8 +634,12 @@ protected: void HandleConnected(TEvPGEvents::TEvParseResponse::TPtr& ev) { if (IsEventExpected(ev)) { - TPGStreamOutput<TPGParseComplete> parseComplete; - SendStream(parseComplete); + if (ev->Get()->ErrorFields.empty()) { + TPGStreamOutput<TPGParseComplete> parseComplete; + SendStream(parseComplete); + } else { + SendErrorResponse(ev->Get()->ErrorFields); + } ++OutgoingSequenceNumber; BecomeReadyForQuery(); } else { diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h index f373264a74..27782930c5 100644 --- a/ydb/core/pgproxy/pg_proxy_events.h +++ b/ydb/core/pgproxy/pg_proxy_events.h @@ -146,6 +146,7 @@ struct TEvPGEvents { struct TEvParseResponse : NActors::TEventLocal<TEvParseResponse, EvParseResponse> { std::unique_ptr<TPGParse> OriginalMessage; + std::vector<std::pair<char, TString>> ErrorFields; TEvParseResponse(std::unique_ptr<TPGParse> originalMessage) : OriginalMessage(std::move(originalMessage)) |