aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-05-30 11:53:41 +0300
committerxenoxeno <xeno@ydb.tech>2023-05-30 11:53:41 +0300
commitdf23a796955e5802312e57e12cc4be84358df127 (patch)
tree0b11d315ce400c3b144a34b5f613959c9705c732
parent2cc99c035541be550db2d59a0465c05d374b03d0 (diff)
downloadydb-df23a796955e5802312e57e12cc4be84358df127.tar.gz
cleanup a little and add a streaming behavior to query
-rw-r--r--ydb/core/local_pgwire/local_pgwire_connection.cpp29
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.cpp197
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp9
-rw-r--r--ydb/core/pgproxy/pg_proxy_events.h1
4 files changed, 201 insertions, 35 deletions
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp
index 5c72000bf6..d7f83922e8 100644
--- a/ydb/core/local_pgwire/local_pgwire_connection.cpp
+++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp
@@ -21,34 +21,37 @@ using namespace NActors;
using namespace NKikimr;
extern NActors::IActor* CreatePgwireKqpProxy(
- NActors::TActorId actorId,
- ui64 cookie,
std::unordered_map<TString, TString> params
);
-class TPgYdbConnection : public TActorBootstrapped<TPgYdbConnection> {
- using TBase = TActorBootstrapped<TPgYdbConnection>;
+NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery);
+
+class TPgYdbConnection : public TActor<TPgYdbConnection> {
+ using TBase = TActor<TPgYdbConnection>;
std::unordered_map<TString, TString> ConnectionParams;
std::unordered_map<TString, TParsedStatement> ParsedStatements;
TString CurrentStatement;
Ydb::StatusIds QueryStatus;
std::unordered_map<ui32, NYdb::TResultSet> ResultSets;
+
public:
TPgYdbConnection(std::unordered_map<TString, TString> params)
- : ConnectionParams(std::move(params))
+ : TActor<TPgYdbConnection>(&TPgYdbConnection::StateWork)
+ , ConnectionParams(std::move(params))
{}
- void Bootstrap() {
- Become(&TPgYdbConnection::StateWork);
- }
-
void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) {
BLOG_D("TEvQuery " << ev->Sender);
- IActor* actor = CreatePgwireKqpProxy(ev->Sender, ev->Cookie, ConnectionParams);
- TActorId actorId = Register(actor);
- BLOG_D("Created pgwireKqpProxy: " << actorId);
- Forward(ev, actorId);
+ if (IsQueryEmpty(ev->Get()->Message->GetQuery())) {
+ auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
+ response->EmptyQuery = true;
+ Send(ev->Sender, response.release(), 0, ev->Cookie);
+ return;
+ }
+
+ TActorId actorId = Register(CreatePgwireKqpProxyQuery(ConnectionParams, std::move(ev)));
+ BLOG_D("Created pgwireKqpProxyQuery: " << actorId);
}
void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) {
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
index bce7d9f044..a2bf13a243 100644
--- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
@@ -17,8 +17,8 @@ namespace NLocalPgWire {
using namespace NActors;
using namespace NKikimr;
-class TPgwireKqpProxy : public TActorBootstrapped<TPgwireKqpProxy> {
- using TBase = TActorBootstrapped<TPgwireKqpProxy>;
+class TPgwireKqpProxy : public TActor<TPgwireKqpProxy> {
+ using TBase = TActor<TPgwireKqpProxy>;
NActors::TActorId RequestActorId_;
ui64 RequestCookie_;
@@ -26,16 +26,11 @@ class TPgwireKqpProxy : public TActorBootstrapped<TPgwireKqpProxy> {
std::unordered_map<TString, TString> ConnectionParams_;
TMap<ui32, NYdb::TResultSet> ResultSets_;
public:
- TPgwireKqpProxy(NActors::TActorId actorId, ui64 cookie, std::unordered_map<TString, TString> params)
- : RequestActorId_(actorId)
- , RequestCookie_(cookie)
+ TPgwireKqpProxy(std::unordered_map<TString, TString> params)
+ : TActor<TPgwireKqpProxy>(&TPgwireKqpProxy::StateWork)
, ConnectionParams_(std::move(params))
{}
- void Bootstrap() {
- Become(&TPgwireKqpProxy::StateWork);
- }
-
void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet()));
ResultSets_.emplace(ev->Get()->Record.GetQueryResultIndex(), resultSet);
@@ -122,18 +117,14 @@ public:
}
BLOG_D("Finally replying to " << RequestActorId_);
Send(RequestActorId_, response.release(), 0, RequestCookie_);
- //PassAway();
+ PassAway();
}
void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) {
BLOG_D("TEvQuery, sender: " << ev->Sender << " , self: " << SelfId());
+ RequestActorId_ = ev->Sender;
+ RequestCookie_ = ev->Cookie;
auto query(ev->Get()->Message->GetQuery());
- if (IsQueryEmpty(query)) {
- auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
- response->EmptyQuery = true;
- Send(ev->Sender, response.release(), 0, ev->Cookie);
- return;
- }
TString database;
if (ConnectionParams_.count("database")) {
database = ConnectionParams_["database"];
@@ -158,7 +149,7 @@ public:
request.SetKeepSession(false);
request.SetDatabase(database);
event->Record.SetUserToken(token);
- InTransaction_ = ev->Get()->Message->GetQuery().starts_with("BEGIN");
+ InTransaction_ = query.starts_with("BEGIN");
ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
BLOG_D("Sent event to kqpProxy, RequestActorId = " << RequestActorId_ << ", self: " << SelfId());
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
@@ -173,9 +164,177 @@ public:
}
};
+class TPgwireKqpProxyQuery : public TActorBootstrapped<TPgwireKqpProxyQuery> {
+ using TBase = TActorBootstrapped<TPgwireKqpProxyQuery>;
+
+ std::unordered_map<TString, TString> ConnectionParams_;
+ NPG::TEvPGEvents::TEvQuery::TPtr EventQuery_;
+ bool InTransaction_ = false;
+ bool WasMeta_ = false;
+
+public:
+ TPgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery)
+ : ConnectionParams_(std::move(params))
+ , EventQuery_(std::move(evQuery))
+ {}
+
+ void Bootstrap() {
+ auto query(EventQuery_->Get()->Message->GetQuery());
+ TString database;
+ if (ConnectionParams_.count("database")) {
+ database = ConnectionParams_["database"];
+ }
+ TString token;
+ if (ConnectionParams_.count("ydb-serialized-token")) {
+ token = ConnectionParams_["ydb-serialized-token"];
+ }
+ auto event = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ NKikimrKqp::TQueryRequest& request = *event->Record.MutableRequest();
+ request.SetQuery(ToPgSyntax(query, ConnectionParams_));
+ request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ auto noScript = ConnectionParams_.find("no-script");
+ if (noScript == ConnectionParams_.end()) {
+ request.SetType(NKikimrKqp::QUERY_TYPE_SQL_SCRIPT);
+ } else {
+ request.SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY);
+ request.MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.MutableTxControl()->set_commit_tx(true);
+ }
+ request.SetKeepSession(false);
+ request.SetDatabase(database);
+ event->Record.SetUserToken(token);
+
+ // HACK
+ InTransaction_ = query.starts_with("BEGIN");
+
+ ActorIdToProto(SelfId(), event->Record.MutableRequestActorId());
+ BLOG_D("Sent event to kqpProxy, RequestActorId = " << EventQuery_->Sender << ", self: " << SelfId());
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
+
+ // TODO(xenoxeno): timeout
+ Become(&TPgwireKqpProxyQuery::StateWork);
+ }
+
+ void FillMeta(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvQueryResponse* response) {
+ for (const NYdb::TColumn& column : resultSet.GetColumnsMeta()) {
+ // TODO: fill data sizes
+ response->DataFields.push_back({
+ .Name = column.Name,
+ .DataType = GetPgOidFromYdbType(column.Type),
+ // .DataTypeSize = column.Type.GetProto().Getpg_type().Gettyplen()
+ });
+ }
+ }
+
+ void FillResultSet(const NYdb::TResultSet& resultSet, NPG::TEvPGEvents::TEvQueryResponse* response) {
+ NYdb::TResultSetParser parser(std::move(resultSet));
+ while (parser.TryNextRow()) {
+ response->DataRows.emplace_back();
+ auto& row = response->DataRows.back();
+ row.resize(parser.ColumnsCount());
+ for (size_t index = 0; index < parser.ColumnsCount(); ++index) {
+ row[index] = ColumnValueToString(parser.ColumnParser(index));
+ }
+ }
+ }
+
+ std::unique_ptr<NPG::TEvPGEvents::TEvQueryResponse> MakeResponse() {
+ auto response = std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
+
+ // HACK
+ if (InTransaction_) {
+ response->Tag = "BEGIN";
+ response->TransactionStatus = 'T';
+ }
+
+ return response;
+ }
+
+ void Handle(NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
+ NYdb::TResultSet resultSet(std::move(*ev->Get()->Record.MutableResultSet()));
+ auto response = MakeResponse();
+ if (!WasMeta_) {
+ FillMeta(resultSet, response.get());
+ WasMeta_ = true;
+ }
+ FillResultSet(resultSet, response.get());
+ response->CommandCompleted = false;
+
+ // HACK
+ if (response->DataRows.size() > 0) {
+ response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size();
+ }
+
+ BLOG_D(this->SelfId() << "Send rowset data (" << ev->Get()->Record.GetSeqNo() << ") to: " << EventQuery_->Sender);
+ Send(EventQuery_->Sender, response.release(), 0, EventQuery_->Cookie);
+
+ BLOG_D(this->SelfId() << "Send stream data ack to: " << ev->Sender);
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(std::numeric_limits<ui64>::max());
+ Send(ev->Sender, resp.Release());
+ }
+
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev) {
+ BLOG_D("Handling TEvKqp::TEvQueryResponse");
+ NKikimrKqp::TEvQueryResponse& record = ev->Get()->Record.GetRef();
+
+ auto response = MakeResponse();
+ try {
+ if (record.HasYdbStatus()) {
+ if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ auto results = record.GetResponse().GetResults();
+ if (!results.empty()) {
+ auto resultSet = record.MutableResponse()->MutableYdbResults()->Add();
+ NKqp::ConvertKqpQueryResultToDbResult(results.at(0), resultSet);
+ if (!WasMeta_) {
+ FillMeta(*resultSet, response.get());
+ WasMeta_ = true;
+ }
+ FillResultSet(*resultSet, response.get());
+ }
+
+ // HACK
+ if (response->DataRows.size() > 0) {
+ response->Tag = TStringBuilder() << "SELECT " << response->DataRows.size();
+ }
+
+ } else {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(record.GetResponse().GetQueryIssues(), issues);
+ NYdb::TStatus status(NYdb::EStatus(record.GetYdbStatus()), std::move(issues));
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', TStringBuilder() << status});
+ }
+ } else {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', "No result received"});
+ }
+ } catch (const std::exception& e) {
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', e.what()});
+ }
+ response->CommandCompleted = true;
+ BLOG_D("Finally replying to " << EventQuery_->Sender);
+ Send(EventQuery_->Sender, response.release(), 0, EventQuery_->Cookie);
+ PassAway();
+ }
+
+ STATEFN(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
+ hFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
+ }
+ }
+};
+
+
+NActors::IActor* CreatePgwireKqpProxy(std::unordered_map<TString, TString> params) {
+ return new TPgwireKqpProxy(std::move(params));
+}
-NActors::IActor* CreatePgwireKqpProxy(NActors::TActorId actorId, ui64 cookie, std::unordered_map<TString, TString> params) {
- return new TPgwireKqpProxy(std::move(actorId), std::move(cookie), std::move(params));
+NActors::IActor* CreatePgwireKqpProxyQuery(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvQuery::TPtr&& evQuery) {
+ return new TPgwireKqpProxyQuery(std::move(params), std::move(evQuery));
}
} //namespace NLocalPgwire
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index c895aa866d..002d200a40 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -478,7 +478,6 @@ protected:
if (ev->Get()->EmptyQuery) {
SendMessage(TPGEmptyQueryResponse());
} else {
- TString tag = ev->Get()->Tag ? ev->Get()->Tag : "OK";
if (!ev->Get()->DataFields.empty()) { // rowDescription
TPGStreamOutput<TPGRowDescription> rowDescription;
rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields
@@ -505,7 +504,9 @@ protected:
SendStream(dataRow);
}
}
- { // commandComplete
+ if (ev->Get()->CommandCompleted) {
+ // commandComplete
+ TString tag = ev->Get()->Tag ? ev->Get()->Tag : "OK";
TPGStreamOutput<TPGCommandComplete> commandComplete;
commandComplete << tag << '\0';
SendStream(commandComplete);
@@ -520,7 +521,9 @@ protected:
errorResponse << '\0';
SendStream(errorResponse);
}
- BecomeReadyForQuery();
+ if (ev->Get()->CommandCompleted) {
+ BecomeReadyForQuery();
+ }
} else {
PostponeEvent(ev);
}
diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h
index 6aaeea3045..0a777849d0 100644
--- a/ydb/core/pgproxy/pg_proxy_events.h
+++ b/ydb/core/pgproxy/pg_proxy_events.h
@@ -103,6 +103,7 @@ struct TEvPGEvents {
std::vector<std::pair<char, TString>> ErrorFields;
TString Tag;
bool EmptyQuery = false;
+ bool CommandCompleted = true;
char TransactionStatus = 0;
};