diff options
author | xenoxeno <xeno@ydb.tech> | 2023-05-23 10:34:01 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-05-23 10:34:01 +0300 |
commit | 3b60e34b9fa896db99f9e10d1cf90a945cdf16da (patch) | |
tree | 550923e712f4156876adc816a5e2557e07a612d2 | |
parent | 2d78a3b5b4b567bd6395b26bd81bc208490fda83 (diff) | |
download | ydb-3b60e34b9fa896db99f9e10d1cf90a945cdf16da.tar.gz |
add basic/hack support for transaction status
-rw-r--r-- | ydb/apps/pgwire/pg_ydb_connection.cpp | 11 | ||||
-rw-r--r-- | ydb/apps/pgwire/pg_ydb_proxy.cpp | 8 | ||||
-rw-r--r-- | ydb/core/local_pgwire/local_pgwire_connection.cpp | 1 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_connection.cpp | 8 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_proxy_events.h | 1 |
5 files changed, 27 insertions, 2 deletions
diff --git a/ydb/apps/pgwire/pg_ydb_connection.cpp b/ydb/apps/pgwire/pg_ydb_connection.cpp index f4539bfd64..96786dfcf9 100644 --- a/ydb/apps/pgwire/pg_ydb_connection.cpp +++ b/ydb/apps/pgwire/pg_ydb_connection.cpp @@ -290,6 +290,16 @@ public: Send(ev->Sender, bindComplete.release()); } + void Handle(NPG::TEvPGEvents::TEvClose::TPtr& ev) { + auto closeData = ev->Get()->Message->GetCloseData(); + ParsedStatements.erase(closeData.StatementName); + CurrentStatement.clear(); + BLOG_D("TEvClose CurrentStatement changed to <empty>"); + + auto closeComplete = ev->Get()->Reply(); + Send(ev->Sender, closeComplete.release()); + } + struct TConvertedQuery { TString Query; NYdb::TParams Params; @@ -486,6 +496,7 @@ public: hFunc(NPG::TEvPGEvents::TEvBind, Handle); hFunc(NPG::TEvPGEvents::TEvDescribe, Handle); hFunc(NPG::TEvPGEvents::TEvExecute, Handle); + hFunc(NPG::TEvPGEvents::TEvClose, Handle); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); } } diff --git a/ydb/apps/pgwire/pg_ydb_proxy.cpp b/ydb/apps/pgwire/pg_ydb_proxy.cpp index 836dab9ca7..70ad4a3fd0 100644 --- a/ydb/apps/pgwire/pg_ydb_proxy.cpp +++ b/ydb/apps/pgwire/pg_ydb_proxy.cpp @@ -141,6 +141,13 @@ public: } } + void Handle(NPG::TEvPGEvents::TEvClose::TPtr& ev) { + auto itConnection = PgToYdbConnection.find(ev->Sender); + if (itConnection != PgToYdbConnection.end()) { + Forward(ev, itConnection->second); + } + } + STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { hFunc(NPG::TEvPGEvents::TEvAuth, Handle); @@ -151,6 +158,7 @@ public: hFunc(NPG::TEvPGEvents::TEvBind, Handle); hFunc(NPG::TEvPGEvents::TEvDescribe, Handle); hFunc(NPG::TEvPGEvents::TEvExecute, Handle); + hFunc(NPG::TEvPGEvents::TEvClose, Handle); } } }; diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp index 9d38e49417..8ec4a5f5d4 100644 --- a/ydb/core/local_pgwire/local_pgwire_connection.cpp +++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp @@ -194,6 +194,7 @@ public: // HACK if (ev->Get()->Message->GetQuery().starts_with("BEGIN")) { response->Tag = "BEGIN"; + response->TransactionStatus = 'T'; } // HACK try { diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index 2ff192c9c3..a09821e0bb 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -38,6 +38,7 @@ public: ui64 IncomingSequenceNumber = 1; ui64 OutgoingSequenceNumber = 1; ui64 SyncSequenceNumber = 1; + char TransactionStatus = 'I'; // could be 'I' (idle), 'T' (transaction), 'E' (failed transaction) std::deque<TAutoPtr<IEventHandle>> PostponedEvents; TPGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy) @@ -288,7 +289,7 @@ protected: void SendReadyForQuery() { TPGStreamOutput<TPGReadyForQuery> readyForQuery; - readyForQuery << 'I'; + readyForQuery << TransactionStatus; SendStream(readyForQuery); } @@ -469,12 +470,15 @@ protected: void HandleConnected(TEvPGEvents::TEvQueryResponse::TPtr& ev) { if (IsEventExpected(ev)) { + if (ev->Get()->TransactionStatus) { + TransactionStatus = ev->Get()->TransactionStatus; + } if (ev->Get()->ErrorFields.empty()) { if (ev->Get()->EmptyQuery) { SendMessage(TPGEmptyQueryResponse()); } else { TString tag = ev->Get()->Tag ? ev->Get()->Tag : "OK"; - { // rowDescription + if (!ev->Get()->DataFields.empty()) { // rowDescription TPGStreamOutput<TPGRowDescription> rowDescription; rowDescription << uint16_t(ev->Get()->DataFields.size()); // number of fields for (const auto& field : ev->Get()->DataFields) { diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h index 992d25e883..6aaeea3045 100644 --- a/ydb/core/pgproxy/pg_proxy_events.h +++ b/ydb/core/pgproxy/pg_proxy_events.h @@ -103,6 +103,7 @@ struct TEvPGEvents { std::vector<std::pair<char, TString>> ErrorFields; TString Tag; bool EmptyQuery = false; + char TransactionStatus = 0; }; /* |