aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-06-13 14:59:52 +0300
committerxenoxeno <xeno@ydb.tech>2023-06-13 14:59:52 +0300
commitf93d754c16f6acf1898da5aa077ba730ebdab531 (patch)
tree218f9535afca1f149623ef2e1861a2d14f4c4f2d
parentd2104865f13f6d5bc5f7817a4318288da51254c4 (diff)
downloadydb-f93d754c16f6acf1898da5aa077ba730ebdab531.tar.gz
PR from branch users/xenoxeno/
rework transactions and sessions handling add parsed execute actor
-rw-r--r--ydb/core/local_pgwire/local_pgwire_connection.cpp204
-rw-r--r--ydb/core/local_pgwire/local_pgwire_util.h37
-rw-r--r--ydb/core/local_pgwire/log_impl.h2
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.cpp445
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp17
-rw-r--r--ydb/core/pgproxy/pg_proxy_events.h10
6 files changed, 362 insertions, 353 deletions
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp
index 8efa1efa069..9dcbaca5ac1 100644
--- a/ydb/core/local_pgwire/local_pgwire_connection.cpp
+++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp
@@ -24,8 +24,9 @@ extern NActors::IActor* CreatePgwireKqpProxy(
std::unordered_map<TString, TString> params
);
-NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery);
-NActors::IActor* CreatePgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script);
+NActors::IActor* CreatePgwireKqpProxyQuery(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery);
+NActors::IActor* CreatePgwireKqpProxyDescribe(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TParsedStatement& statement);
+NActors::IActor* CreatePgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, const TParsedStatement& statement);
class TPgYdbConnection : public TActor<TPgYdbConnection> {
using TBase = TActor<TPgYdbConnection>;
@@ -33,12 +34,13 @@ class TPgYdbConnection : public TActor<TPgYdbConnection> {
std::unordered_map<TString, TString> ConnectionParams;
std::unordered_map<TString, TParsedStatement> ParsedStatements;
TString CurrentStatement;
- Ydb::StatusIds QueryStatus;
- std::unordered_map<ui32, NYdb::TResultSet> ResultSets;
+ TConnectionState Connection;
+ std::deque<TAutoPtr<IEventHandle>> Events;
+ ui32 Inflight = 0;
public:
TPgYdbConnection(std::unordered_map<TString, TString> params)
- : TActor<TPgYdbConnection>(&TPgYdbConnection::StateWork)
+ : TActor<TPgYdbConnection>(&TPgYdbConnection::StateSchedule)
, ConnectionParams(std::move(params))
{}
@@ -50,8 +52,8 @@ public:
Send(ev->Sender, response.release(), 0, ev->Cookie);
return;
}
-
- TActorId actorId = Register(CreatePgwireKqpProxyQuery(ConnectionParams, std::move(ev)));
+ ++Inflight;
+ TActorId actorId = Register(CreatePgwireKqpProxyQuery(SelfId(), ConnectionParams, Connection, std::move(ev)));
BLOG_D("Created pgwireKqpProxyQuery: " << actorId);
}
@@ -102,77 +104,10 @@ public:
return;
}
- auto query(ConvertQuery(it->second));
- auto script(ToPgSyntax(query.Query, ConnectionParams));
-
- TActorId actorId = Register(CreatePgwireKqpProxyDescribe(ConnectionParams, std::move(ev), script));
- BLOG_D("Created pgwireKqpProxyQuery: " << actorId);
+ ++Inflight;
+ TActorId actorId = Register(CreatePgwireKqpProxyDescribe(SelfId(), ConnectionParams, Connection, std::move(ev), it->second));
+ BLOG_D("Created pgwireKqpProxyDescribe: " << actorId);
return;
- /*
- TActorSystem* actorSystem = TActivationContext::ActorSystem();
- auto query = ConvertQuery(it->second);
- Ydb::Scripting::ExecuteYqlRequest request;
- request.set_script(ToPgSyntax(query.Query, ConnectionParams));
- // TODO:
- //request.set_parameters(query.Params);
- TString database;
- if (ConnectionParams.count("database")) {
- database = ConnectionParams["database"];
- }
- TString token;
- if (ConnectionParams.count("ydb-serialized-token")) {
- token = ConnectionParams["ydb-serialized-token"];
- }
- using TRpcEv = NGRpcService::TGrpcRequestOperationCall<Ydb::Scripting::ExecuteYqlRequest, Ydb::Scripting::ExecuteYqlResponse>;
- auto rpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), database, token, actorSystem);
- // TODO: it's wrong. it should be done using explain to get all result meta. but it's not ready yet.
- rpcFuture.Subscribe([actorSystem, ev](NThreading::TFuture<Ydb::Scripting::ExecuteYqlResponse> future) {
- auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>();
- try {
- Ydb::Scripting::ExecuteYqlResponse yqlResponse(future.ExtractValueSync());
- if (yqlResponse.has_operation()) {
- if (yqlResponse.operation().status() == Ydb::StatusIds::SUCCESS) {
- if (yqlResponse.operation().has_result()) {
- NYdb::NScripting::TExecuteYqlResult result(ConvertProtoResponseToSdkResult(std::move(yqlResponse)));
- if (result.IsSuccess()) {
- const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets();
- if (!resultSets.empty()) {
- NYdb::TResultSet resultSet = resultSets[0];
-
- {
- for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
- // TODO: fill data types and sizes
- response->DataFields.push_back({
- .Name = column.Name,
- .DataType = GetPgOidFromYdbType(column.Type),
- });
- }
- }
- }
- } else {
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)result});
- }
- }
- } else {
- NYql::TIssues issues;
- NYql::IssuesFromMessage(yqlResponse.operation().issues(), issues);
- NYdb::TStatus status(NYdb::EStatus(yqlResponse.operation().status()), std::move(issues));
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', TStringBuilder() << status});
- }
- } else {
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', "No result received"});
- }
- }
- catch (const std::exception& e) {
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', e.what()});
- }
-
- actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie);
- });*/
}
void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) {
@@ -199,78 +134,63 @@ public:
return;
}
- TActorSystem* actorSystem = TActivationContext::ActorSystem();
- auto query = ConvertQuery(it->second);
- Ydb::Scripting::ExecuteYqlRequest request;
- request.set_script(ToPgSyntax(query.Query, ConnectionParams));
- // TODO:
- //request.set_parameters(query.Params);
- TString database;
- if (ConnectionParams.count("database")) {
- database = ConnectionParams["database"];
+ ++Inflight;
+ TActorId actorId = Register(CreatePgwireKqpProxyExecute(SelfId(), ConnectionParams, Connection, std::move(ev), it->second));
+ BLOG_D("Created pgwireKqpProxyExecute: " << actorId);
+ }
+
+ void Handle(TEvEvents::TEvProxyCompleted::TPtr& ev) {
+ --Inflight;
+ BLOG_D("Received TEvProxyCompleted");
+ if (ev->Get()->Connection.Transaction.Status) {
+ BLOG_D("Updating transaction state to " << ev->Get()->Connection.Transaction.Status);
+ Connection.Transaction.Status = ev->Get()->Connection.Transaction.Status;
+ switch (ev->Get()->Connection.Transaction.Status) {
+ case 'I':
+ case 'E':
+ Connection.Transaction.Id.clear();
+ BLOG_D("Transaction id cleared");
+ break;
+ case 'T':
+ if (ev->Get()->Connection.Transaction.Id) {
+ Connection.Transaction.Id = ev->Get()->Connection.Transaction.Id;
+ BLOG_D("Transaction id is " << Connection.Transaction.Id);
+ }
+ break;
+ }
}
- TString token;
- if (ConnectionParams.count("ydb-serialized-token")) {
- token = ConnectionParams["ydb-serialized-token"];
+ if (ev->Get()->Connection.SessionId) {
+ BLOG_D("Session id is " << ev->Get()->Connection.SessionId);
+ Connection.SessionId = ev->Get()->Connection.SessionId;
}
- using TRpcEv = NGRpcService::TGrpcRequestOperationCall<Ydb::Scripting::ExecuteYqlRequest, Ydb::Scripting::ExecuteYqlResponse>;
- auto rpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(request), database, token, actorSystem);
+ while (!Events.empty() && Inflight == 0) {
+ StateWork(Events.front());
+ Events.pop_front();
+ }
+ }
- rpcFuture.Subscribe([actorSystem, ev](NThreading::TFuture<Ydb::Scripting::ExecuteYqlResponse> future) {
- auto response = std::make_unique<NPG::TEvPGEvents::TEvExecuteResponse>();
- try {
- Ydb::Scripting::ExecuteYqlResponse yqlResponse(future.ExtractValueSync());
- if (yqlResponse.has_operation()) {
- if (yqlResponse.operation().status() == Ydb::StatusIds::SUCCESS) {
- if (yqlResponse.operation().has_result()) {
- NYdb::NScripting::TExecuteYqlResult result(ConvertProtoResponseToSdkResult(std::move(yqlResponse)));
- if (result.IsSuccess()) {
- const TVector<NYdb::TResultSet>& resultSets = result.GetResultSets();
- if (!resultSets.empty()) {
- NYdb::TResultSet resultSet = resultSets[0];
+ void PassAway() override {
+ if (Connection.SessionId) {
+ BLOG_D("Closing session " << Connection.SessionId);
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
+ ev->Record.MutableRequest()->SetSessionId(Connection.SessionId);
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release());
+ }
+ TBase::PassAway();
+ }
- {
- auto maxRows = ev->Get()->Message->GetExecuteData().MaxRows;
- NYdb::TResultSetParser parser(std::move(resultSet));
- while (parser.TryNextRow()) {
- response->DataRows.emplace_back();
- auto& row = response->DataRows.back();
- row.resize(parser.ColumnsCount());
- for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
- row[index] = ColumnValueToString(parser.ColumnParser(index));
- }
- if (maxRows != 0) {
- if (--maxRows == 0) {
- break;
- }
- }
- }
- }
- }
- } else {
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', TStringBuilder() << (NYdb::TStatus&)result});
- }
- }
- } else {
- NYql::TIssues issues;
- NYql::IssuesFromMessage(yqlResponse.operation().issues(), issues);
- NYdb::TStatus status(NYdb::EStatus(yqlResponse.operation().status()), std::move(issues));
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', TStringBuilder() << status});
- }
+ STATEFN(StateSchedule) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvEvents::TEvProxyCompleted, Handle);
+ cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
+ default: {
+ if (Inflight == 0) {
+ return StateWork(ev);
} else {
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', "No result received"});
+ Events.push_back(ev);
}
}
- catch (const std::exception& e) {
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', e.what()});
- }
-
- actorSystem->Send(ev->Sender, response.release(), 0, ev->Cookie);
- });
+ }
}
STATEFN(StateWork) {
diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h
index 110996c1810..c5a1120d7d1 100644
--- a/ydb/core/local_pgwire/local_pgwire_util.h
+++ b/ydb/core/local_pgwire/local_pgwire_util.h
@@ -33,6 +33,33 @@
namespace NLocalPgWire {
+struct TTransactionState {
+ char Status = 0;
+ TString Id;
+};
+
+struct TConnectionState {
+ TString SessionId;
+ TTransactionState Transaction;
+};
+
+struct TEvEvents {
+ enum EEv {
+ EvProxyCompleted = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "ES_PRIVATE event space is too small.");
+
+ struct TEvProxyCompleted : NActors::TEventLocal<TEvProxyCompleted, EvProxyCompleted> {
+ TConnectionState Connection;
+
+ TEvProxyCompleted(const TConnectionState& connection = {})
+ : Connection(connection)
+ {}
+ };
+};
+
struct TParsedStatement {
NPG::TPGParse::TQueryData QueryData;
NPG::TPGBind::TBindData BindData;
@@ -140,9 +167,9 @@ inline uint32_t GetPgOidFromYdbType(NYdb::TType type) {
}
}
-inline TString ToPgSyntax(TStringBuf query, const std::unordered_map<TString, TString>& ConnectionParams) {
- auto itOptions = ConnectionParams.find("options");
- if (itOptions == ConnectionParams.end()) {
+inline TString ToPgSyntax(TStringBuf query, const std::unordered_map<TString, TString>& connectionParams) {
+ auto itOptions = connectionParams.find("options");
+ if (itOptions == connectionParams.end()) {
return TStringBuilder() << "--!syntax_pg\n" << query; // default
}
return TStringBuilder() << "--!" << itOptions->second << "\n" << query;
@@ -170,6 +197,10 @@ struct TConvertedQuery {
NYdb::TParams Params;
};
+inline TString ToPgSyntax(TConvertedQuery query, const std::unordered_map<TString, TString>& connectionParams) {
+ return ToPgSyntax(query.Query, connectionParams);
+}
+
inline TConvertedQuery ConvertQuery(const TParsedStatement& statement) {
auto& bindData = statement.BindData;
const auto& queryData = statement.QueryData;
diff --git a/ydb/core/local_pgwire/log_impl.h b/ydb/core/local_pgwire/log_impl.h
index 2017cd5752f..a754936b3f8 100644
--- a/ydb/core/local_pgwire/log_impl.h
+++ b/ydb/core/local_pgwire/log_impl.h
@@ -9,3 +9,5 @@
#define BLOG_W(stream) LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::LOCAL_PGWIRE, stream)
#define BLOG_NOTICE(stream) LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::LOCAL_PGWIRE, stream)
#define BLOG_ERROR(stream) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::LOCAL_PGWIRE, stream)
+
+#define BLOG_ENSURE(condition) do{if(!condition)BLOG_ERROR(#condition);}while(false)
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
index 4533954dfbe..a774e22f34c 100644
--- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
@@ -17,103 +17,27 @@ namespace NLocalPgWire {
using namespace NActors;
using namespace NKikimr;
-class TPgwireKqpProxy : public TActor<TPgwireKqpProxy> {
- using TBase = TActor<TPgwireKqpProxy>;
+template<typename Base>
+class TPgwireKqpProxy : public TActorBootstrapped<Base> {
+protected:
+ using TBase = TActorBootstrapped<Base>;
- NActors::TActorId RequestActorId_;
- ui64 RequestCookie_;
- bool InTransaction_;
+ TActorId Owner_;
std::unordered_map<TString, TString> ConnectionParams_;
- TMap<ui32, NYdb::TResultSet> ResultSets_;
-public:
- TPgwireKqpProxy(std::unordered_map<TString, TString> params)
- : TActor<TPgwireKqpProxy>(&TPgwireKqpProxy::StateWork)
- , ConnectionParams_(std::move(params))
- {}
-
- void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
- NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet()));
- ResultSets_.emplace(ev->Get()->Record.GetQueryResultIndex(), resultSet);
-
- BLOG_D(this->SelfId() << "Send stream data ack"
- << ", to: " << ev->Sender);
+ TConnectionState Connection_;
+ TString Tag_;
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
- resp->Record.SetFreeSpace(std::numeric_limits<ui64>::max());
- Send(ev->Sender, resp.Release());
- }
-
- void FillResultSet(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvQueryResponse* response) {
- {
- for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
- // TODO: fill data types and sizes
- response->DataFields.push_back({
- .Name = column.Name,
- .DataType = GetPgOidFromYdbType(column.Type),
- // .DataTypeSize = column.Type.GetProto().Getpg_type().Gettyplen()
- });
- }
- }
- {
- NYdb::TResultSetParser parser(std::move(resultSet));
- while (parser.TryNextRow()) {
- response->DataRows.emplace_back();
- auto& row = response->DataRows.back();
- row.resize(parser.ColumnsCount());
- for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
- row[index] = ColumnValueToString(parser.ColumnParser(index));
- }
- }
- }
- }
-
- void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
- BLOG_D("Handling TEvKqp::TEvQueryResponse");
- NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
-
- auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
- // HACK
- if (InTransaction_) {
- response->Tag = "BEGIN";
- response->TransactionStatus = 'T';
- }
- try {
- if (record.HasYdbStatus()) {
- if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
- Y_ENSURE(record.GetResponse().GetYdbResults().empty());
- if (!ResultSets_.empty()) {
- FillResultSet(ResultSets_.begin()->second, response.get());
- }
-
- // HACK
- response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size();
- // HACK
- } else {
- NYql::TIssues issues;
- NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
- NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues));
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', TStringBuilder() << status});
- }
- } else {
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', "No result received"});
- }
- } catch (const std::exception& e) {
- response->ErrorFields.push_back({'E', "ERROR"});
- response->ErrorFields.push_back({'M', e.what()});
+ TPgwireKqpProxy(const TActorId owner, std::unordered_map<TString, TString> params, const TConnectionState& connection)
+ : Owner_(owner)
+ , ConnectionParams_(std::move(params))
+ , Connection_(connection)
+ {
+ if (!Connection_.Transaction.Status) {
+ Connection_.Transaction.Status = 'I';
}
- BLOG_D("Finally replying to " << RequestActorId_);
- Send(RequestActorId_, response.release(), 0, RequestCookie_);
- PassAway();
}
- void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) {
- BLOG_D("TEvQuery, sender: " << ev->Sender << " , self: " << SelfId());
- RequestActorId_ = ev->Sender;
- RequestCookie_ = ev->Cookie;
- auto query(ev->Get()->Message->GetQuery());
+ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeKqpRequest() {
TString database;
if (ConnectionParams_.count("database")) {
database = ConnectionParams_["database"];
@@ -122,88 +46,104 @@ public:
if (ConnectionParams_.count("ydb-serialized-token")) {
token = ConnectionParams_["ydb-serialized-token"];
}
-
auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
- request.SetQuery(ToPgSyntax(query, ConnectionParams_));
- request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
- request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- request.MutableTxControl()->set_commit_tx(true);
- request.SetKeepSession(false);
request.SetDatabase(database);
event->Record.SetUserToken(token);
- InTransaction_ = query.starts_with("BEGIN");
- ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
- BLOG_D("Sent event to kqpProxy, RequestActorId = " << RequestActorId_ << ", self: " << SelfId());
- Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
+ return event;
}
- STATEFN(StateWork) {
- switch (ev->GetTypeRewrite()) {
- hFunc(NPG::TEvPGEvents::TEvQuery, Handle);
- hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
- hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
+ void ConvertQueryToRequest(TStringBuf query, NKikimrKqp::TQueryRequest& request) {
+ if (Connection_.SessionId) {
+ request.SetSessionId(Connection_.SessionId);
+ }
+ request.SetKeepSession(true);
+ // HACK
+ if (query.starts_with("BEGIN")) {
+ Tag_ = "BEGIN";
+ request.SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX);
+ request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ } else if (query.starts_with("COMMIT")) {
+ Tag_ = "COMMIT";
+ request.SetAction(NKikimrKqp::QUERY_ACTION_COMMIT_TX);
+ request.MutableTxControl()->set_tx_id(Connection_.Transaction.Id);
+ } else if (query.starts_with("ROLLBACK")) {
+ Tag_ = "ROLLBACK";
+ if (Connection_.Transaction.Status == 'T') {
+ request.SetAction(NKikimrKqp::QUERY_ACTION_ROLLBACK_TX);
+ request.MutableTxControl()->set_tx_id(Connection_.Transaction.Id);
+ } else if (Connection_.Transaction.Status == 'E') {
+ // ignore, reset to I
+ auto evQueryResponse = MakeHolder<NKqp::TEvKqp::TEvQueryResponse>();
+ evQueryResponse->Record.GetRef().SetYdbStatus(Ydb::StatusIds::SUCCESS);
+ evQueryResponse->Record.GetRef().MutableResponse()->SetSessionId(request.GetSessionId());
+ TBase::Send(TBase::SelfId(), evQueryResponse.Release());
+ }
+ } else {
+ if (query.starts_with("SELECT")) {
+ Tag_ = "SELECT";
+ }
+ request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
+ request.SetQuery(ToPgSyntax(query, ConnectionParams_));
+ if (Connection_.Transaction.Status == 'I') {
+ request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.MutableTxControl()->set_commit_tx(true);
+ } else if (Connection_.Transaction.Status == 'T') {
+ request.MutableTxControl()->set_tx_id(Connection_.Transaction.Id);
+ }
+ }
+ }
+
+ void ProcessKqpResponseReleaseProxy(const NKikimrKqp::TEvQueryResponse& record) {
+ Connection_.SessionId = record.GetResponse().GetSessionId();
+
+ if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ Connection_.Transaction.Id = record.GetResponse().GetTxMeta().id();
+ if (Connection_.Transaction.Id) {
+ Connection_.Transaction.Status = 'T';
+ } else {
+ Connection_.Transaction.Status = 'I';
+ }
+ } else {
+ if (Connection_.Transaction.Id) {
+ Connection_.Transaction.Id.clear();
+ Connection_.Transaction.Status = 'E';
+ } else {
+ Connection_.Transaction.Status = 'I';
+ }
}
+
+ TBase::Send(Owner_, new TEvEvents::TEvProxyCompleted(Connection_));
}
};
-class TPgwireKqpProxyQuery : public TActorBootstrapped<TPgwireKqpProxyQuery> {
- using TBase = TActorBootstrapped<TPgwireKqpProxyQuery>;
+class TPgwireKqpProxyQuery : public TPgwireKqpProxy<TPgwireKqpProxyQuery> {
+ using TBase = TPgwireKqpProxy<TPgwireKqpProxyQuery>;
- std::unordered_map<TString, TString> ConnectionParams_;
NPG::TEvPGEvents::TEvQuery::TPtr EventQuery_;
bool WasMeta_ = false;
- TString Tag;
- char TransactionStatus = 0;
+ std::size_t RowsSelected_ = 0;
public:
- TPgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery)
- : ConnectionParams_(std::move(params))
+ TPgwireKqpProxyQuery(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery)
+ : TPgwireKqpProxy(owner, std::move(params), connection)
, EventQuery_(std::move(evQuery))
- {}
+ {
+ }
void Bootstrap() {
auto query(EventQuery_->Get()->Message->GetQuery());
- TString database;
- if (ConnectionParams_.count("database")) {
- database = ConnectionParams_["database"];
- }
- TString token;
- if (ConnectionParams_.count("ydb-serialized-token")) {
- token = ConnectionParams_["ydb-serialized-token"];
- }
- auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ auto event = MakeKqpRequest();
NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
- request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- request.MutableTxControl()->set_commit_tx(true);
- request.SetKeepSession(false);
- request.SetDatabase(database);
- event->Record.SetUserToken(token);
// HACK
- if (query.starts_with("BEGIN")) {
- Tag = "BEGIN";
- TransactionStatus = 'T';
- request.SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX);
- } else if (query.starts_with("COMMIT")) {
- Tag = "COMMIT";
- TransactionStatus = 'I';
- request.SetAction(NKikimrKqp::QUERY_ACTION_COMMIT_TX);
- } else if (query.starts_with("ROLLBACK")) {
- Tag = "ROLLBACK";
- TransactionStatus = 'I';
- request.SetAction(NKikimrKqp::QUERY_ACTION_ROLLBACK_TX);
- } else {
- request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
- request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
- request.SetQuery(ToPgSyntax(query, ConnectionParams_));
+ ConvertQueryToRequest(query, request);
+ if (request.HasAction()) {
+ ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
+ BLOG_D("Sent event to kqpProxy " << request.ShortDebugString());
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
}
-
- ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
- BLOG_D("Sent event to kqpProxy, RequestActorId = " << EventQuery_->Sender << ", self: " << SelfId());
- Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
-
// TODO(xenoxeno): timeout
Become(&TPgwireKqpProxyQuery::StateWork);
}
@@ -234,8 +174,8 @@ public:
std::unique_ptr<NPG::TEvPGEvents::TEvQueryResponse> MakeResponse() {
auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
- response->Tag = Tag;
- response->TransactionStatus = TransactionStatus;
+ response->Tag = Tag_;
+ response->TransactionStatus = Connection_.Transaction.Status;
return response;
}
@@ -250,10 +190,7 @@ public:
FillResultSet(resultSet, response.get());
response->CommandCompleted = false;
- // HACK
- if (response->DataRows.size() > 0) {
- response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size();
- }
+ RowsSelected_ += response->DataRows.size();
BLOG_D(this->SelfId() << "Send rowset data (" << ev->Get()->Record.GetSeqNo() << ") to: " << EventQuery_->Sender);
Send(EventQuery_->Sender, response.release(), 0, EventQuery_->Cookie);
@@ -266,18 +203,18 @@ public:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
- BLOG_D("Handling TEvKqp::TEvQueryResponse");
+ BLOG_D("Handling TEvKqp::TEvQueryResponse " << ev->Get()->Record.ShortDebugString());
NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
-
+ ProcessKqpResponseReleaseProxy(record);
auto response = MakeResponse();
try {
if (record.HasYdbStatus()) {
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
- Y_ENSURE(record.GetResponse().GetResults().empty());
+ BLOG_ENSURE(record.GetResponse().GetResults().empty());
// HACK
- if (response->DataRows.size() > 0) {
- response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size();
+ if (response->Tag == "SELECT") {
+ response->Tag = TStringBuilder() << response->Tag << " " << RowsSelected_;
}
} else {
NYql::TIssues issues;
@@ -308,58 +245,41 @@ public:
}
};
-class TPgwireKqpProxyDescribe : public TActorBootstrapped<TPgwireKqpProxyDescribe> {
- using TBase = TActorBootstrapped<TPgwireKqpProxyDescribe>;
+class TPgwireKqpProxyDescribe : public TPgwireKqpProxy<TPgwireKqpProxyDescribe> {
+ using TBase = TPgwireKqpProxy<TPgwireKqpProxyDescribe>;
- std::unordered_map<TString, TString> ConnectionParams_;
NPG::TEvPGEvents::TEvDescribe::TPtr EventDescribe_;
- TString Script_;
+ TParsedStatement Statement_;
public:
- TPgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script)
- : ConnectionParams_(std::move(params))
+ TPgwireKqpProxyDescribe(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TParsedStatement& statement)
+ : TPgwireKqpProxy(owner, std::move(params), connection)
, EventDescribe_(std::move(evDescribe))
- , Script_(script)
+ , Statement_(statement)
{}
void Bootstrap() {
- auto query(Script_);
- TString database;
- if (ConnectionParams_.count("database")) {
- database = ConnectionParams_["database"];
- }
- TString token;
- if (ConnectionParams_.count("ydb-serialized-token")) {
- token = ConnectionParams_["ydb-serialized-token"];
- }
- auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ auto query(ConvertQuery(Statement_));
+ auto event = MakeKqpRequest();
NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
- request.SetQuery(ToPgSyntax(query, ConnectionParams_));
- request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN);
- auto noScript = ConnectionParams_.find("no-script");
- if (noScript == ConnectionParams_.end()) {
- request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT);
- } else {
- request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
- request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- request.MutableTxControl()->set_commit_tx(true);
- }
- request.SetKeepSession(false);
- request.SetDatabase(database);
- event->Record.SetUserToken(token);
- ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
- BLOG_D("Sent event to kqpProxy, RequestActorId = " << EventDescribe_->Sender << ", self: " << SelfId());
- Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
+ // HACK
+ ConvertQueryToRequest(query.Query, request);
+ if (request.HasAction()) {
+ request.SetAction(NKikimrKqp::QUERY_ACTION_EXPLAIN);
+ ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
+ BLOG_D("Sent event to kqpProxy " << request.ShortDebugString());
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
+ }
// TODO(xenoxeno): timeout
Become(&TPgwireKqpProxyDescribe::StateWork);
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
BLOG_D("Handling TEvKqp::TEvQueryResponse");
+ Send(Owner_, new TEvEvents::TEvProxyCompleted());
NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
-
auto response = std::make_unique<NPG::TEvPGEvents::TEvDescribeResponse>();
try {
if (record.HasYdbStatus()) {
@@ -396,17 +316,140 @@ public:
}
};
+class TPgwireKqpProxyExecute : public TPgwireKqpProxy<TPgwireKqpProxyExecute> {
+ using TBase = TPgwireKqpProxy<TPgwireKqpProxyExecute>;
+
+ NPG::TEvPGEvents::TEvExecute::TPtr EventExecute_;
+ TParsedStatement Statement_;
+ std::size_t RowsSelected_ = 0;
+
+public:
+ TPgwireKqpProxyExecute(const TActorId& owner, std::unordered_map<TString, TString> params, const TConnectionState& connection, NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute, const TParsedStatement& statement)
+ : TPgwireKqpProxy(owner, std::move(params), connection)
+ , EventExecute_(std::move(evExecute))
+ , Statement_(statement)
+ {
+ }
+
+ void Bootstrap() {
+ auto query(ConvertQuery(Statement_));
+ auto event = MakeKqpRequest();
+ NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
+
+ // HACK
+ ConvertQueryToRequest(query.Query, request);
+ if (request.HasAction()) {
+ ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
+ BLOG_D("Sent event to kqpProxy " << request.ShortDebugString());
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
+ }
+ // TODO(xenoxeno): timeout
+ Become(&TPgwireKqpProxyQuery::StateWork);
+ }
+
+ void FillResultSet(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvExecuteResponse* response) {
+ NYdb::TResultSetParser parser(std::move(resultSet));
+ while (parser.TryNextRow()) {
+ response->DataRows.emplace_back();
+ auto& row = response->DataRows.back();
+ row.resize(parser.ColumnsCount());
+ for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
+ row[index] = ColumnValueToString(parser.ColumnParser(index));
+ }
+ }
+ }
+
+ std::unique_ptr<NPG::TEvPGEvents::TEvExecuteResponse> MakeResponse() {
+ auto response = std::make_unique<NPG::TEvPGEvents::TEvExecuteResponse>();
+
+ response->Tag = Tag_;
+ response->TransactionStatus = Connection_.Transaction.Status;
+
+ return response;
+ }
+
+ void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
+ NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet()));
+ auto response = MakeResponse();
+ FillResultSet(resultSet, response.get());
+ response->CommandCompleted = false;
+
+ RowsSelected_ += response->DataRows.size();
+
+ BLOG_D(this->SelfId() << "Send rowset data (" << ev->Get()->Record.GetSeqNo() << ") to: " << EventExecute_->Sender);
+ Send(EventExecute_->Sender, response.release(), 0, EventExecute_->Cookie);
+
+ BLOG_D(this->SelfId() << "Send stream data ack to: " << ev->Sender);
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(std::numeric_limits<ui64>::max());
+ Send(ev->Sender, resp.Release());
+ }
+
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
+ BLOG_D("Handling TEvKqp::TEvQueryResponse");
+ NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
+ ProcessKqpResponseReleaseProxy(record);
+ auto response = MakeResponse();
+ try {
+ if (record.HasYdbStatus()) {
+ if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ BLOG_ENSURE(record.GetResponse().GetResults().empty());
+
+ // HACK
+ if (response->Tag == "SELECT") {
+ response->Tag = TStringBuilder() << response->Tag << " " << RowsSelected_;
+ }
+ } else {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
+ NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues));
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << status});
+ }
+ } else {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', "No result received"});
+ }
+ } catch (const std::exception& e) {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', e.what()});
+ }
+ response->CommandCompleted = true;
+ BLOG_D("Finally replying to " << EventExecute_->Sender);
+ Send(EventExecute_->Sender, response.release(), 0, EventExecute_->Cookie);
+ PassAway();
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
+ hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
+ }
+ }
+};
-NActors::IActor* CreatePgwireKqpProxy(std::unordered_map<TString, TString> params) {
- return new TPgwireKqpProxy(std::move(params));
+NActors::IActor* CreatePgwireKqpProxyQuery(const TActorId& owner,
+ std::unordered_map<TString, TString> params,
+ const TConnectionState& connection,
+ NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery) {
+ return new TPgwireKqpProxyQuery(owner, std::move(params), connection, std::move(evQuery));
}
-NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery) {
- return new TPgwireKqpProxyQuery(std::move(params), std::move(evQuery));
+NActors::IActor* CreatePgwireKqpProxyDescribe(const TActorId& owner,
+ std::unordered_map<TString, TString> params,
+ const TConnectionState& connection,
+ NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe,
+ const TParsedStatement& statement) {
+ return new TPgwireKqpProxyDescribe(owner, std::move(params), connection, std::move(evDescribe), statement);
}
-NActors::IActor* CreatePgwireKqpProxyDescribe(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvDescribe::TPtr&& evDescribe, const TString& script) {
- return new TPgwireKqpProxyDescribe(std::move(params), std::move(evDescribe), script);
+NActors::IActor* CreatePgwireKqpProxyExecute(const TActorId& owner,
+ std::unordered_map<TString, TString> params,
+ const TConnectionState& connection,
+ NPG::TEvPGEvents::TEvExecute::TPtr&& evExecute,
+ const TParsedStatement& statement) {
+ return new TPgwireKqpProxyExecute(owner, std::move(params), connection, std::move(evExecute), statement);
}
} //namespace NLocalPgwire
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index d7faa6234d9..fe31a8e6501 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -375,7 +375,7 @@ protected:
void HandleMessage(const TPGQuery* message) {
SyncSequenceNumber = IncomingSequenceNumber;
- Send(DatabaseProxy, new TEvPGEvents::TEvQuery(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++);
+ Send(DatabaseProxy, new TEvPGEvents::TEvQuery(MakePGMessageCopy(message), TransactionStatus), 0, IncomingSequenceNumber++);
}
void HandleMessage(const TPGParse* message) {
@@ -399,7 +399,7 @@ protected:
}
void HandleMessage(const TPGExecute* message) {
- Send(DatabaseProxy, new TEvPGEvents::TEvExecute(MakePGMessageCopy(message)), 0, IncomingSequenceNumber++);
+ Send(DatabaseProxy, new TEvPGEvents::TEvExecute(MakePGMessageCopy(message), TransactionStatus), 0, IncomingSequenceNumber++);
}
void HandleMessage(const TPGClose* message) {
@@ -577,6 +577,9 @@ protected:
void HandleConnected(TEvPGEvents::TEvExecuteResponse::TPtr& ev) {
if (IsEventExpected(ev)) {
+ if (ev->Get()->TransactionStatus) {
+ TransactionStatus = ev->Get()->TransactionStatus;
+ }
if (ev->Get()->ErrorFields.empty()) {
if (ev->Get()->EmptyQuery) {
SendMessage(TPGEmptyQueryResponse());
@@ -592,7 +595,9 @@ protected:
SendStream(dataRow);
}
}
- { // commandComplete
+ if (ev->Get()->CommandCompleted) {
+ // commandComplete
+ TString tag = ev->Get()->Tag ? ev->Get()->Tag : "OK";
TPGStreamOutput<TPGCommandComplete> commandComplete;
commandComplete << tag << '\0';
SendStream(commandComplete);
@@ -607,8 +612,10 @@ protected:
errorResponse << '\0';
SendStream(errorResponse);
}
- ++OutgoingSequenceNumber;
- BecomeReadyForQuery();
+ if (ev->Get()->CommandCompleted) {
+ ++OutgoingSequenceNumber;
+ BecomeReadyForQuery();
+ }
} else {
PostponeEvent(ev);
}
diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h
index 0a777849d06..32ad9662dbb 100644
--- a/ydb/core/pgproxy/pg_proxy_events.h
+++ b/ydb/core/pgproxy/pg_proxy_events.h
@@ -91,9 +91,11 @@ struct TEvPGEvents {
struct TEvQuery : NActors::TEventLocal<TEvQuery, EvQuery> {
std::unique_ptr<TPGQuery> Message;
+ char TransactionStatus;
- TEvQuery(std::unique_ptr<TPGQuery> message)
+ TEvQuery(std::unique_ptr<TPGQuery> message, char transactionStatus)
: Message(std::move(message))
+ , TransactionStatus(transactionStatus)
{}
};
@@ -192,9 +194,11 @@ struct TEvPGEvents {
struct TEvExecute : NActors::TEventLocal<TEvExecute, EvExecute> {
std::unique_ptr<TPGExecute> Message;
+ char TransactionStatus;
- TEvExecute(std::unique_ptr<TPGExecute> message)
+ TEvExecute(std::unique_ptr<TPGExecute> message, char transactionStatus)
: Message(std::move(message))
+ , TransactionStatus(transactionStatus)
{}
};
@@ -203,6 +207,8 @@ struct TEvPGEvents {
std::vector<std::pair<char, TString>> ErrorFields;
TString Tag;
bool EmptyQuery = false;
+ bool CommandCompleted = true;
+ char TransactionStatus = 0;
};
struct TEvCloseResponse : NActors::TEventLocal<TEvCloseResponse, EvCloseResponse> {