aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-05-23 10:34:01 +0300
committerxenoxeno <xeno@ydb.tech>2023-05-23 10:34:01 +0300
commit3b60e34b9fa896db99f9e10d1cf90a945cdf16da (patch)
tree550923e712f4156876adc816a5e2557e07a612d2
parent2d78a3b5b4b567bd6395b26bd81bc208490fda83 (diff)
downloadydb-3b60e34b9fa896db99f9e10d1cf90a945cdf16da.tar.gz
add basic/hack support for transaction status
-rw-r--r--ydb/apps/pgwire/pg_ydb_connection.cpp11
-rw-r--r--ydb/apps/pgwire/pg_ydb_proxy.cpp8
-rw-r--r--ydb/core/local_pgwire/local_pgwire_connection.cpp1
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp8
-rw-r--r--ydb/core/pgproxy/pg_proxy_events.h1
5 files changed, 27 insertions, 2 deletions
diff --git a/ydb/apps/pgwire/pg_ydb_connection.cpp b/ydb/apps/pgwire/pg_ydb_connection.cpp
index f4539bfd64..96786dfcf9 100644
--- a/ydb/apps/pgwire/pg_ydb_connection.cpp
+++ b/ydb/apps/pgwire/pg_ydb_connection.cpp
@@ -290,6 +290,16 @@ public:
Send(ev->Sender, bindComplete.release());
}
+ void Handle(NPG::TEvPGEvents::TEvClose::TPtr& ev) {
+ auto closeData = ev->Get()->Message->GetCloseData();
+ ParsedStatements.erase(closeData.StatementName);
+ CurrentStatement.clear();
+ BLOG_D("TEvClose CurrentStatement changed to <empty>");
+
+ auto closeComplete = ev->Get()->Reply();
+ Send(ev->Sender, closeComplete.release());
+ }
+
struct TConvertedQuery {
TString Query;
NYdb::TParams Params;
@@ -486,6 +496,7 @@ public:
hFunc(NPG::TEvPGEvents::TEvBind, Handle);
hFunc(NPG::TEvPGEvents::TEvDescribe, Handle);
hFunc(NPG::TEvPGEvents::TEvExecute, Handle);
+ hFunc(NPG::TEvPGEvents::TEvClose, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
}
}
diff --git a/ydb/apps/pgwire/pg_ydb_proxy.cpp b/ydb/apps/pgwire/pg_ydb_proxy.cpp
index 836dab9ca7..70ad4a3fd0 100644
--- a/ydb/apps/pgwire/pg_ydb_proxy.cpp
+++ b/ydb/apps/pgwire/pg_ydb_proxy.cpp
@@ -141,6 +141,13 @@ public:
}
}
+ void Handle(NPG::TEvPGEvents::TEvClose::TPtr& ev) {
+ auto itConnection = PgToYdbConnection.find(ev->Sender);
+ if (itConnection != PgToYdbConnection.end()) {
+ Forward(ev, itConnection->second);
+ }
+ }
+
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(NPG::TEvPGEvents::TEvAuth, Handle);
@@ -151,6 +158,7 @@ public:
hFunc(NPG::TEvPGEvents::TEvBind, Handle);
hFunc(NPG::TEvPGEvents::TEvDescribe, Handle);
hFunc(NPG::TEvPGEvents::TEvExecute, Handle);
+ hFunc(NPG::TEvPGEvents::TEvClose, Handle);
}
}
};
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp
index 9d38e49417..8ec4a5f5d4 100644
--- a/ydb/core/local_pgwire/local_pgwire_connection.cpp
+++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp
@@ -194,6 +194,7 @@ public:
// HACK
if (ev->Get()->Message->GetQuery().starts_with("BEGIN")) {
response->Tag = "BEGIN";
+ response->TransactionStatus = 'T';
}
// HACK
try {
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index 2ff192c9c3..a09821e0bb 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -38,6 +38,7 @@ public:
ui64 IncomingSequenceNumber = 1;
ui64 OutgoingSequenceNumber = 1;
ui64 SyncSequenceNumber = 1;
+ char TransactionStatus = 'I'; // could be 'I' (idle), 'T' (transaction), 'E' (failed transaction)
std::deque<TAutoPtr<IEventHandle>> PostponedEvents;
TPGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy)
@@ -288,7 +289,7 @@ protected:
void SendReadyForQuery() {
TPGStreamOutput<TPGReadyForQuery> readyForQuery;
- readyForQuery << 'I';
+ readyForQuery << TransactionStatus;
SendStream(readyForQuery);
}
@@ -469,12 +470,15 @@ protected:
void HandleConnected(TEvPGEvents::TEvQueryResponse::TPtr& ev) {
if (IsEventExpected(ev)) {
+ if (ev->Get()->TransactionStatus) {
+ TransactionStatus = ev->Get()->TransactionStatus;
+ }
if (ev->Get()->ErrorFields.empty()) {
if (ev->Get()->EmptyQuery) {
SendMessage(TPGEmptyQueryResponse());
} else {
TString tag = ev->Get()->Tag ? ev->Get()->Tag : "OK";
- { // rowDescription
+ if (!ev->Get()->DataFields.empty()) { // rowDescription
TPGStreamOutput<TPGRowDescription> rowDescription;
rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields
for (const auto& field : ev->Get()->DataFields) {
diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h
index 992d25e883..6aaeea3045 100644
--- a/ydb/core/pgproxy/pg_proxy_events.h
+++ b/ydb/core/pgproxy/pg_proxy_events.h
@@ -103,6 +103,7 @@ struct TEvPGEvents {
std::vector<std::pair<char, TString>> ErrorFields;
TString Tag;
bool EmptyQuery = false;
+ char TransactionStatus = 0;
};
/*