aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-08-25 13:28:42 +0300
committerxenoxeno <xeno@ydb.tech>2023-08-25 14:56:35 +0300
commit6e6c6b1eea85592869da3c8e29dbc48237dba90a (patch)
tree8bed351bfd494ba080f966c192f936abcb4b0f86
parentb135460aae0ab7483ff22ea433e91d59e5599077 (diff)
downloadydb-6e6c6b1eea85592869da3c8e29dbc48237dba90a.tar.gz
improve PG compatibility KIKIMR-18994
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.cpp5
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp6
2 files changed, 7 insertions, 4 deletions
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
index 2dd9d51458..6272fb73ea 100644
--- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
@@ -74,7 +74,7 @@ protected:
}
request.SetKeepSession(true);
// HACK
- TString q(ToUpperASCII(query.substr(0, 10)));
+ TString q(ToUpperASCII(query.substr(0, 20)));
if (q.StartsWith("BEGIN")) {
Tag_ = "BEGIN";
request.SetAction(NKikimrKqp::QUERY_ACTION_BEGIN_TX);
@@ -100,6 +100,8 @@ protected:
Tag_ = "SELECT";
}
if (q.StartsWith("CREATE") || q.StartsWith("ALTER") || q.StartsWith("DROP")) {
+ TStringBuf tag(q);
+ Tag_ = TStringBuilder() << tag.NextTok(' ') << " " << tag.NextTok(' ');
request.SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
request.SetType(NKikimrKqp::QUERY_TYPE_SQL_DDL);
} else {
@@ -307,6 +309,7 @@ public:
BLOG_D("Sent event to kqpProxy " << request.ShortDebugString());
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), event.Release());
} else { // for DDL and TCL
+ BLOG_D("Skipping parse of DDL/TCL");
auto response = EventParse_->Get()->Reply();
TParsedStatement statement;
statement.QueryData = std::move(QueryData_);
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index 1231ed9d7f..981e6d85e4 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -576,7 +576,7 @@ protected:
void HandleConnected(TEvPGEvents::TEvDescribeResponse::TPtr& ev) {
if (IsEventExpected(ev)) {
if (ev->Get()->ErrorFields.empty()) {
- { // parameterDescription
+ if (ev->Get()->ParameterTypes.size() > 0) { // parameterDescription
TPGStreamOutput<TPGParameterDescription> parameterDescription;
parameterDescription << uint16_t(ev->Get()->ParameterTypes.size()); // number of fields
for (auto type : ev->Get()->ParameterTypes) {
@@ -767,9 +767,9 @@ protected:
break;
} else if (-res == EINTR) {
continue;
- } else if (!res) {
+ } else if (res == 0) {
// connection closed
- BLOG_ERROR("connection closed iSQ: " << IncomingSequenceNumber << " oSQ: " << OutgoingSequenceNumber << " sSQ: " << SyncSequenceNumber);
+ BLOG_ERROR("connection was gracefully closed iSQ: " << IncomingSequenceNumber << " oSQ: " << OutgoingSequenceNumber << " sSQ: " << SyncSequenceNumber);
return PassAway();
} else {
BLOG_ERROR("connection closed - error in recv: " << strerror(-res));