diff options
author | xenoxeno <xeno@ydb.tech> | 2023-10-31 21:05:43 +0300 |
---|---|---|
committer | xenoxeno <xeno@ydb.tech> | 2023-10-31 21:22:55 +0300 |
commit | 5e44523f6b7dcb158f3373a269c0495e1cadf71f (patch) | |
tree | 991b12d002609ea1da003cb93377c8b362cb238c | |
parent | 9869642e375e60baf2debe0bb01c63322878c79f (diff) | |
download | ydb-5e44523f6b7dcb158f3373a269c0495e1cadf71f.tar.gz |
pg cancel request
24 files changed, 485 insertions, 72 deletions
diff --git a/.mapping.json b/.mapping.json index e303f11a27..d926e9aa61 100644 --- a/.mapping.json +++ b/.mapping.json @@ -4989,6 +4989,11 @@ "ydb/core/pgproxy/CMakeLists.linux-x86_64.txt":"", "ydb/core/pgproxy/CMakeLists.txt":"", "ydb/core/pgproxy/CMakeLists.windows-x86_64.txt":"", + "ydb/core/pgproxy/protos/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/pgproxy/protos/CMakeLists.linux-aarch64.txt":"", + "ydb/core/pgproxy/protos/CMakeLists.linux-x86_64.txt":"", + "ydb/core/pgproxy/protos/CMakeLists.txt":"", + "ydb/core/pgproxy/protos/CMakeLists.windows-x86_64.txt":"", "ydb/core/pgproxy/ut/CMakeLists.darwin-x86_64.txt":"", "ydb/core/pgproxy/ut/CMakeLists.linux-aarch64.txt":"", "ydb/core/pgproxy/ut/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/apps/pgwire/pg_ydb_proxy.cpp b/ydb/apps/pgwire/pg_ydb_proxy.cpp index 70ad4a3fd0..f8b9baf8e3 100644 --- a/ydb/apps/pgwire/pg_ydb_proxy.cpp +++ b/ydb/apps/pgwire/pg_ydb_proxy.cpp @@ -93,6 +93,7 @@ public: TActorId actorId = Register(actor); PgToYdbConnection[ev->Sender] = actorId; BLOG_D("Created ydb connection " << actorId); + Send(ev->Sender, new NPG::TEvPGEvents::TEvFinishHandshake(), 0, ev->Cookie); } void Handle(NPG::TEvPGEvents::TEvConnectionClosed::TPtr& ev) { diff --git a/ydb/core/local_pgwire/local_pgwire.cpp b/ydb/core/local_pgwire/local_pgwire.cpp index 5c26127a63..d8ffcf8aed 100644 --- a/ydb/core/local_pgwire/local_pgwire.cpp +++ b/ydb/core/local_pgwire/local_pgwire.cpp @@ -1,4 +1,6 @@ #include "log_impl.h" +#include "local_pgwire.h" +#include "local_pgwire_util.h" #include <ydb/core/pgproxy/pg_proxy_events.h> #include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h> @@ -10,7 +12,7 @@ namespace NLocalPgWire { using namespace NActors; using namespace NKikimr; -extern IActor* CreateConnection(std::unordered_map<TString, TString> params); +extern IActor* CreateConnection(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvConnectionOpened::TPtr&& event, const TConnectionState& connection); class TPgYdbProxy : public TActor<TPgYdbProxy> { using TBase = TActor<TPgYdbProxy>; @@ -45,9 +47,15 @@ class TPgYdbProxy : public TActor<TPgYdbProxy> { }; }; - std::unordered_map<TActorId, TActorId> PgToYdbConnection; + struct TConnectionState { + TActorId YdbConnection; + uint32_t ConnectionNum; + }; + + std::unordered_map<TActorId, TConnectionState> ConnectionState; std::unordered_map<TActorId, TSecurityState> SecurityState; std::unordered_map<TString, TTokenState> TokenState; + uint32_t ConnectionNum = 0; public: TPgYdbProxy() @@ -97,7 +105,7 @@ public: void Handle(NPG::TEvPGEvents::TEvAuth::TPtr& ev) { std::unordered_map<TString, TString> clientParams = ev->Get()->InitialMessage->GetClientParams(); - BLOG_D("TEvAuth " << ev->Get()->InitialMessage->Dump()); + BLOG_D("TEvAuth " << ev->Get()->InitialMessage->Dump() << " cookie " << ev->Cookie); Ydb::Auth::LoginRequest request; request.set_user(clientParams["user"]); if (ev->Get()->PasswordMessage) { @@ -132,7 +140,7 @@ public: } void Handle(NPG::TEvPGEvents::TEvConnectionOpened::TPtr& ev) { - BLOG_D("TEvConnectionOpened " << ev->Sender); + BLOG_D("TEvConnectionOpened " << ev->Sender << " cookie " << ev->Cookie); auto params = ev->Get()->Message->GetClientParams(); auto itSecurityState = SecurityState.find(ev->Sender); if (itSecurityState != SecurityState.end()) { @@ -143,64 +151,83 @@ public: params["ydb-serialized-token"] = itSecurityState->second.SerializedToken; } } - IActor* actor = CreateConnection(std::move(params)); + auto& connectionState = ConnectionState[ev->Sender]; + connectionState.ConnectionNum = ++ConnectionNum; + IActor* actor = CreateConnection(std::move(params), std::move(ev), {.ConnectionNum = connectionState.ConnectionNum}); TActorId actorId = Register(actor); - PgToYdbConnection[ev->Sender] = actorId; - BLOG_D("Created ydb connection " << actorId); + connectionState.YdbConnection = actorId; + BLOG_D("Created ydb connection " << actorId << " num " << connectionState.ConnectionNum); } void Handle(NPG::TEvPGEvents::TEvConnectionClosed::TPtr& ev) { - BLOG_D("TEvConnectionClosed " << ev->Sender); - auto itConnection = PgToYdbConnection.find(ev->Sender); - if (itConnection != PgToYdbConnection.end()) { - Send(itConnection->second, new TEvents::TEvPoisonPill()); - BLOG_D("Destroyed ydb connection " << itConnection->second); + BLOG_D("TEvConnectionClosed " << ev->Sender << " cookie " << ev->Cookie); + auto itConnection = ConnectionState.find(ev->Sender); + if (itConnection != ConnectionState.end()) { + Send(itConnection->second.YdbConnection, new TEvents::TEvPoisonPill()); + BLOG_D("Destroyed ydb connection " << itConnection->second.YdbConnection << " num " << itConnection->second.ConnectionNum); } SecurityState.erase(ev->Sender); + ConnectionState.erase(itConnection); // TODO: cleanup TokenState too } void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) { - auto itConnection = PgToYdbConnection.find(ev->Sender); - if (itConnection != PgToYdbConnection.end()) { - Forward(ev, itConnection->second); + auto itConnection = ConnectionState.find(ev->Sender); + if (itConnection != ConnectionState.end()) { + Forward(ev, itConnection->second.YdbConnection); } } void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) { - auto itConnection = PgToYdbConnection.find(ev->Sender); - if (itConnection != PgToYdbConnection.end()) { - Forward(ev, itConnection->second); + auto itConnection = ConnectionState.find(ev->Sender); + if (itConnection != ConnectionState.end()) { + Forward(ev, itConnection->second.YdbConnection); } } void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) { - auto itConnection = PgToYdbConnection.find(ev->Sender); - if (itConnection != PgToYdbConnection.end()) { - Forward(ev, itConnection->second); + auto itConnection = ConnectionState.find(ev->Sender); + if (itConnection != ConnectionState.end()) { + Forward(ev, itConnection->second.YdbConnection); } } void Handle(NPG::TEvPGEvents::TEvDescribe::TPtr& ev) { - auto itConnection = PgToYdbConnection.find(ev->Sender); - if (itConnection != PgToYdbConnection.end()) { - Forward(ev, itConnection->second); + auto itConnection = ConnectionState.find(ev->Sender); + if (itConnection != ConnectionState.end()) { + Forward(ev, itConnection->second.YdbConnection); } } void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) { - auto itConnection = PgToYdbConnection.find(ev->Sender); - if (itConnection != PgToYdbConnection.end()) { - Forward(ev, itConnection->second); - BLOG_D("Forwarded to ydb connection " << itConnection->second); + auto itConnection = ConnectionState.find(ev->Sender); + if (itConnection != ConnectionState.end()) { + Forward(ev, itConnection->second.YdbConnection); } } void Handle(NPG::TEvPGEvents::TEvClose::TPtr& ev) { - auto itConnection = PgToYdbConnection.find(ev->Sender); - if (itConnection != PgToYdbConnection.end()) { - Forward(ev, itConnection->second); - BLOG_D("Forwarded to ydb connection " << itConnection->second); + auto itConnection = ConnectionState.find(ev->Sender); + if (itConnection != ConnectionState.end()) { + Forward(ev, itConnection->second.YdbConnection); + } + } + + void Handle(NPG::TEvPGEvents::TEvCancelRequest::TPtr& ev) { + uint32_t nodeId = ev->Get()->Record.GetProcessId(); + if (nodeId == SelfId().NodeId()) { + uint32_t connectionNum = ev->Get()->Record.GetSecretKey(); + for (const auto& [pgConnectionId, connectionState] : ConnectionState) { + if (connectionState.ConnectionNum == connectionNum) { + BLOG_D("Cancelling ConnectionNum " << connectionNum); + Forward(ev, connectionState.YdbConnection); + return; + } + } + BLOG_W("Cancelling ConnectionNum " << connectionNum << " - connection not found"); + } else { + BLOG_D("Forwarding TEvCancelRequest to Node " << nodeId); + Forward(ev, CreateLocalPgWireProxyId(nodeId)); } } @@ -215,6 +242,7 @@ public: hFunc(NPG::TEvPGEvents::TEvDescribe, Handle); hFunc(NPG::TEvPGEvents::TEvExecute, Handle); hFunc(NPG::TEvPGEvents::TEvClose, Handle); + hFunc(NPG::TEvPGEvents::TEvCancelRequest, Handle); hFunc(TEvPrivate::TEvTokenReady, Handle); hFunc(TEvTicketParser::TEvAuthorizeTicketResult, Handle); } diff --git a/ydb/core/local_pgwire/local_pgwire.h b/ydb/core/local_pgwire/local_pgwire.h index b66a3d9fd2..125a373947 100644 --- a/ydb/core/local_pgwire/local_pgwire.h +++ b/ydb/core/local_pgwire/local_pgwire.h @@ -2,7 +2,7 @@ namespace NLocalPgWire { -inline NActors::TActorId CreateLocalPgWireProxyId() { return NActors::TActorId(0, "localpgwire"); } +inline NActors::TActorId CreateLocalPgWireProxyId(uint32_t nodeId = 0) { return NActors::TActorId(nodeId, "localpgwire"); } NActors::IActor* CreateLocalPgWireProxy(); } diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp index 85f43e74ec..c986e08657 100644 --- a/ydb/core/local_pgwire/local_pgwire_connection.cpp +++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp @@ -22,22 +22,62 @@ namespace NLocalPgWire { using namespace NActors; using namespace NKikimr; -class TPgYdbConnection : public TActor<TPgYdbConnection> { - using TBase = TActor<TPgYdbConnection>; +class TPgYdbConnection : public TActorBootstrapped<TPgYdbConnection> { + using TBase = TActorBootstrapped<TPgYdbConnection>; std::unordered_map<TString, TString> ConnectionParams; + NPG::TEvPGEvents::TEvConnectionOpened::TPtr ConnectionEvent; std::unordered_map<TString, TParsedStatement> ParsedStatements; std::unordered_map<TString, TPortal> Portals; TConnectionState Connection; std::deque<TAutoPtr<IEventHandle>> Events; ui32 Inflight = 0; + std::unordered_set<TActorId> CurrentRunningQueries; public: - TPgYdbConnection(std::unordered_map<TString, TString> params) - : TActor<TPgYdbConnection>(&TPgYdbConnection::StateSchedule) - , ConnectionParams(std::move(params)) + TPgYdbConnection(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvConnectionOpened::TPtr&& event, const TConnectionState& connection) + : ConnectionParams(std::move(params)) + , ConnectionEvent(std::move(event)) + , Connection(connection) {} + void Bootstrap() { + TString database; + if (ConnectionParams.count("database")) { + database = ConnectionParams["database"]; + } + auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>(); + NKikimrKqp::TCreateSessionRequest& request = *ev->Record.MutableRequest(); + request.SetDatabase(database); + BLOG_D("Sent CreateSessionRequest to kqpProxy " << ev->Record.ShortDebugString()); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release()); + TBase::Become(&TPgYdbConnection::StateCreateSession); + } + + void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev) { + const auto& record(ev->Get()->Record); + BLOG_D("Received TEvCreateSessionResponse " << record.ShortDebugString()); + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + BLOG_D("Session id is " << record.GetResponse().GetSessionId()); + Connection.SessionId = record.GetResponse().GetSessionId(); + + auto response = MakeHolder<NPG::TEvPGEvents::TEvFinishHandshake>(); + response->BackendData.Pid = SelfId().NodeId(); + response->BackendData.Key = Connection.ConnectionNum; + Send(ConnectionEvent->Sender, response.Release(), 0, ev->Cookie); + TBase::Become(&TPgYdbConnection::StateSchedule); + ConnectionEvent.Destroy(); // don't need it anymore + } else { + BLOG_W("Failed to create session: " << record.ShortDebugString()); + auto response = MakeHolder<NPG::TEvPGEvents::TEvFinishHandshake>(); + // TODO: report actuall error + response->ErrorFields.push_back({'E', "ERROR"}); + response->ErrorFields.push_back({'M', record.GetError()}); + Send(ConnectionEvent->Sender, response.Release(), 0, ev->Cookie); + return PassAway(); + } + } + void ProcessEventsQueue() { while (!Events.empty() && Inflight == 0) { StateWork(Events.front()); @@ -58,6 +98,7 @@ public: ++Inflight; TActorId actorId = RegisterWithSameMailbox(CreatePgwireKqpProxyQuery(SelfId(), ConnectionParams, Connection, std::move(ev))); BLOG_D("Created pgwireKqpProxyQuery: " << actorId); + CurrentRunningQueries.insert(actorId); } void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) { @@ -85,7 +126,7 @@ public: ++Inflight; TActorId actorId = RegisterWithSameMailbox(CreatePgwireKqpProxyParse(SelfId(), ConnectionParams, Connection, std::move(ev))); BLOG_D("Created pgwireKqpProxyParse: " << actorId); - return; + CurrentRunningQueries.insert(actorId); } void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) { @@ -183,6 +224,7 @@ public: ++Inflight; TActorId actorId = RegisterWithSameMailbox(CreatePgwireKqpProxyExecute(SelfId(), ConnectionParams, Connection, std::move(ev), it->second)); BLOG_D("Created pgwireKqpProxyExecute: " << actorId); + CurrentRunningQueries.insert(actorId); } void Handle(TEvEvents::TEvUpdateStatement::TPtr& ev) { @@ -216,23 +258,39 @@ public: BLOG_D("Session id is " << connection.SessionId); Connection.SessionId = connection.SessionId; } + CurrentRunningQueries.erase(ev->Sender); ProcessEventsQueue(); } + void Handle(NPG::TEvPGEvents::TEvCancelRequest::TPtr&) { + BLOG_D("Received TEvCancelRequest"); + for (const TActorId& actor : CurrentRunningQueries) { + Send(actor, new TEvEvents::TEvCancelRequest()); + } + } + void PassAway() override { if (Connection.SessionId) { - BLOG_D("Closing session " << Connection.SessionId); auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>(); ev->Record.MutableRequest()->SetSessionId(Connection.SessionId); + BLOG_D("Closing session " << Connection.SessionId << ", sent event to kqpProxy " << ev->Record.ShortDebugString()); Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release()); } TBase::PassAway(); } + STATEFN(StateCreateSession) { + switch (ev->GetTypeRewrite()) { + hFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle); + cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + } + } + STATEFN(StateSchedule) { switch (ev->GetTypeRewrite()) { hFunc(TEvEvents::TEvProxyCompleted, Handle); hFunc(TEvEvents::TEvUpdateStatement, Handle); + hFunc(NPG::TEvPGEvents::TEvCancelRequest, Handle); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); default: { if (Inflight == 0) { @@ -259,8 +317,8 @@ public: }; -NActors::IActor* CreateConnection(std::unordered_map<TString, TString> params) { - return new TPgYdbConnection(std::move(params)); +NActors::IActor* CreateConnection(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvConnectionOpened::TPtr&& event, const TConnectionState& connection) { + return new TPgYdbConnection(std::move(params), std::move(event), connection); } } diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h index eb5ca48a5a..21ecf6dd88 100644 --- a/ydb/core/local_pgwire/local_pgwire_util.h +++ b/ydb/core/local_pgwire/local_pgwire_util.h @@ -27,6 +27,7 @@ struct TTransactionState { struct TConnectionState { TString SessionId; TTransactionState Transaction; + uint32_t ConnectionNum = 0; }; struct TParsedStatement { @@ -54,6 +55,7 @@ struct TEvEvents { EvProxyCompleted = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvUpdateStatement, EvSingleQuery, + EvCancelRequest, EvEnd }; @@ -92,6 +94,10 @@ struct TEvEvents { return std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>(); } }; + + struct TEvCancelRequest : NActors::TEventLocal<TEvCancelRequest, EvCancelRequest> { + TEvCancelRequest() = default; + }; }; TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser); diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp index 17c2e3455d..23f9f067fc 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -241,7 +241,7 @@ protected: void ReplyWithResponseAndPassAway() { Response_->TransactionStatus = Connection_.Transaction.Status; TBase::Send(Owner_, new TEvEvents::TEvProxyCompleted(Connection_)); - BLOG_D("Finally replying to " << EventRequest_->Sender); + BLOG_D("Finally replying to " << EventRequest_->Sender << " cookie " << EventRequest_->Cookie); TBase::Send(EventRequest_->Sender, Response_.release(), 0, EventRequest_->Cookie); TBase::PassAway(); } @@ -297,7 +297,20 @@ protected: return ReplyWithResponseAndPassAway(); } + void Handle(TEvEvents::TEvCancelRequest::TPtr&) { + auto ev = MakeHolder<NKqp::TEvKqp::TEvCancelQueryRequest>(); + if (Connection_.SessionId) { + ev->Record.MutableRequest()->SetSessionId(Connection_.SessionId); + } + BLOG_D("Sent CancelQueryRequest to kqpProxy " << ev->Record.ShortDebugString()); + TBase::Send(NKqp::MakeKqpProxyID(TBase::SelfId().NodeId()), ev.Release()); + Response_->ErrorFields.push_back({'S', "ERROR"}); + Response_->ErrorFields.push_back({'V', "ERROR"}); + Response_->ErrorFields.push_back({'C', "57014"}); + Response_->ErrorFields.push_back({'M', "Cancelling statement due to user request"}); + return ReplyWithResponseAndPassAway(); + } }; class TPgwireKqpProxyQuery : public TPgwireKqpProxy<TPgwireKqpProxyQuery, TEvEvents::TEvSingleQuery> { @@ -330,6 +343,7 @@ public: switch (ev->GetTypeRewrite()) { hFunc(NKqp::TEvKqp::TEvQueryResponse, TBase::Handle); hFunc(NKqp::TEvKqpExecuter::TEvStreamData, TBase::Handle); + hFunc(TEvEvents::TEvCancelRequest, Handle); } } }; @@ -416,6 +430,7 @@ public: STATEFN(StateWork) { switch (ev->GetTypeRewrite()) { hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); + hFunc(TEvEvents::TEvCancelRequest, TBase::Handle); } } }; @@ -460,6 +475,7 @@ public: switch (ev->GetTypeRewrite()) { hFunc(NKqp::TEvKqp::TEvQueryResponse, TBase::Handle); hFunc(NKqp::TEvKqpExecuter::TEvStreamData, TBase::Handle); + hFunc(TEvEvents::TEvCancelRequest, Handle); } } }; diff --git a/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt b/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt index eeb3907362..19821e9cb2 100644 --- a/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(protos) add_subdirectory(ut) add_library(ydb-core-pgproxy) @@ -16,6 +17,7 @@ target_link_libraries(ydb-core-pgproxy PUBLIC cpp-actors-protos cpp-string_utils-base64 ydb-core-base + core-pgproxy-protos ydb-core-protos ydb-core-raw_socket ) diff --git a/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt b/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt index e4b220cee8..bb31f4c992 100644 --- a/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(protos) add_subdirectory(ut) add_library(ydb-core-pgproxy) @@ -17,6 +18,7 @@ target_link_libraries(ydb-core-pgproxy PUBLIC cpp-actors-protos cpp-string_utils-base64 ydb-core-base + core-pgproxy-protos ydb-core-protos ydb-core-raw_socket ) diff --git a/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt b/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt index e4b220cee8..bb31f4c992 100644 --- a/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(protos) add_subdirectory(ut) add_library(ydb-core-pgproxy) @@ -17,6 +18,7 @@ target_link_libraries(ydb-core-pgproxy PUBLIC cpp-actors-protos cpp-string_utils-base64 ydb-core-base + core-pgproxy-protos ydb-core-protos ydb-core-raw_socket ) diff --git a/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt b/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt index eeb3907362..19821e9cb2 100644 --- a/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(protos) add_subdirectory(ut) add_library(ydb-core-pgproxy) @@ -16,6 +17,7 @@ target_link_libraries(ydb-core-pgproxy PUBLIC cpp-actors-protos cpp-string_utils-base64 ydb-core-base + core-pgproxy-protos ydb-core-protos ydb-core-raw_socket ) diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index c32782bd79..b944ea582c 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -18,6 +18,7 @@ public: TSocketAddressType Address; THPTimer InactivityTimer; static constexpr TDuration InactivityTimeout = TDuration::Minutes(10); + static constexpr uint32_t BACKEND_DATA_MASK = 0x55aa55aa; TEvPollerReady* InactivityEvent = nullptr; bool IsAuthRequired = true; bool IsSslSupported = true; @@ -33,7 +34,7 @@ public: {"DateStyle", "ISO"}, {"IntervalStyle", "postgres"}, {"integer_datetimes", "on"}, - {"server_version", "14.5 (ydb stable-23-3)"}, + {"server_version", "14.5 (ydb stable-23-4)"}, }; TSocketBuffer BufferOutput; TActorId DatabaseProxy; @@ -64,7 +65,7 @@ public: void PassAway() override { //ctx.Send(Endpoint->Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests))); if (ConnectionEstablished) { - Send(DatabaseProxy, new TEvPGEvents::TEvConnectionClosed()); + Send(DatabaseProxy, new TEvPGEvents::TEvConnectionClosed(), 0, IncomingSequenceNumber); ConnectionEstablished = false; } Send(ListenerActorId, new TEvents::TEvUnsubscribe()); @@ -153,6 +154,7 @@ protected: }; static const std::unordered_map<char, TStringBuf> outgoingMessageName = { GetMessageCode<TPGAuth>(), + GetMessageCode<TPGBackendKeyData>(), GetMessageCode<TPGErrorResponse>(), GetMessageCode<TPGDataRow>(), GetMessageCode<TPGParameterStatus>(), @@ -209,6 +211,8 @@ protected: switch (message.Message) { case TPGParameterStatus::CODE: return ((const TPGParameterStatus&)message).Dump(); + case TPGBackendKeyData::CODE: + return ((const TPGBackendKeyData&)message).Dump(); case TPGReadyForQuery::CODE: return ((const TPGReadyForQuery&)message).Dump(); case TPGCommandComplete::CODE: @@ -319,30 +323,9 @@ protected: FlushAndPoll(); } - void FinishHandshake() { - const auto& clientParams(InitialMessage->GetClientParams()); - auto itOptions = clientParams.find("options"); - if (itOptions != clientParams.end()) { - TStringBuf options(itOptions->second); - TStringBuf token; - while (options.NextTok(' ', token) && token == "-c") { - TStringBuf option; - if (options.NextTok(' ', option)) { - TStringBuf name; - TStringBuf value; - if (option.NextTok('=', name)) { - value = option; - ServerParams[TString(name)] = TString(value); - } - } - } - } - for (const auto& [name, value] : ServerParams) { - SendParameterStatus(name, value); - } - SendReadyForQuery(); - ConnectionEstablished = true; - Send(DatabaseProxy, new TEvPGEvents::TEvConnectionOpened(std::move(InitialMessage), Address)); + bool IsValidBackendData(const TPGInitial::TPGBackendData& backendData) { + Y_UNUSED(backendData); + return true; } void HandleMessage(const TPGInitial* message) { @@ -371,6 +354,10 @@ protected: } if (protocol == 0x2e16d204) { // 80877102 cancellation message BLOG_D("cancellation message"); + TPGInitial::TPGBackendData backendData = message->GetBackendData(); + if (IsValidBackendData(backendData)) { + Send(DatabaseProxy, new TEvPGEvents::TEvCancelRequest(backendData.Pid ^ BACKEND_DATA_MASK, backendData.Key ^ BACKEND_DATA_MASK)); + } CloseConnection = true; return; } @@ -384,7 +371,7 @@ protected: Send(DatabaseProxy, new TEvPGEvents::TEvAuth(InitialMessage, Address), 0, IncomingSequenceNumber++); } else { SendAuthOk(); - FinishHandshake(); + BecomeConnected(); } } @@ -495,6 +482,12 @@ protected: return false; } + void BecomeConnected() { + SyncSequenceNumber = IncomingSequenceNumber; + Send(DatabaseProxy, new TEvPGEvents::TEvConnectionOpened(InitialMessage, Address), 0, IncomingSequenceNumber++); + ConnectionEstablished = true; + } + struct TEventsComparator { bool operator ()(const TAutoPtr<IEventHandle>& ev1, const TAutoPtr<IEventHandle>& ev2) const { return ev1->Cookie < ev2->Cookie; @@ -533,7 +526,7 @@ protected: } } else { SendAuthOk(); - FinishHandshake(); + BecomeConnected(); } ++OutgoingSequenceNumber; ReplayPostponedEvents(); @@ -543,6 +536,46 @@ protected: } } + void HandleConnected(TEvPGEvents::TEvFinishHandshake::TPtr& ev) { + if (IsEventExpected(ev)) { + if (ev->Get()->ErrorFields.empty()) { + const auto& clientParams(InitialMessage->GetClientParams()); + auto itOptions = clientParams.find("options"); + if (itOptions != clientParams.end()) { + TStringBuf options(itOptions->second); + TStringBuf token; + while (options.NextTok(' ', token) && token == "-c") { + TStringBuf option; + if (options.NextTok(' ', option)) { + TStringBuf name; + TStringBuf value; + if (option.NextTok('=', name)) { + value = option; + ServerParams[TString(name)] = TString(value); + } + } + } + } + + TPGStreamOutput<TPGBackendKeyData> backendKeyData; + backendKeyData << (ev->Get()->BackendData.Pid ^ BACKEND_DATA_MASK) << (ev->Get()->BackendData.Key ^ BACKEND_DATA_MASK); + SendStream(backendKeyData); + + for (const auto& [name, value] : ServerParams) { + SendParameterStatus(name, value); + } + BecomeReadyForQuery(); + } else { + SendErrorResponse(ev->Get()->ErrorFields); + BLOG_ERROR("unable to create connection"); + CloseConnection = true; + FlushAndPoll(); + } + } else { + PostponeEvent(ev); + } + } + void HandleConnected(TEvPGEvents::TEvQueryResponse::TPtr& ev) { if (IsEventExpected(ev)) { if (ev->Get()->TransactionStatus) { @@ -864,6 +897,7 @@ protected: hFunc(TEvPollerReady, HandleConnected); hFunc(TEvPollerRegisterResult, HandleConnected); hFunc(TEvPGEvents::TEvAuthResponse, HandleConnected); + hFunc(TEvPGEvents::TEvFinishHandshake, HandleConnected); hFunc(TEvPGEvents::TEvQueryResponse, HandleConnected); hFunc(TEvPGEvents::TEvParseResponse, HandleConnected); hFunc(TEvPGEvents::TEvBindResponse, HandleConnected); diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h index b93adf48e9..e341975ae8 100644 --- a/ydb/core/pgproxy/pg_proxy_events.h +++ b/ydb/core/pgproxy/pg_proxy_events.h @@ -3,6 +3,7 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/event_local.h> #include <ydb/core/raw_socket/sock_config.h> +#include <ydb/core/pgproxy/protos/pgproxy.pb.h> #include "pg_proxy_types.h" namespace NPG { @@ -12,6 +13,7 @@ using namespace NKikimr::NRawSocket; struct TEvPGEvents { enum EEv { EvConnectionOpened = EventSpaceBegin(NActors::TEvents::ES_PGWIRE), + EvFinishHandshake, EvConnectionClosed, EvAuth, EvAuthResponse, @@ -27,6 +29,7 @@ struct TEvPGEvents { EvExecuteResponse, EvClose, EvCloseResponse, + EvCancelRequest, EvEnd }; @@ -58,6 +61,13 @@ struct TEvPGEvents { {} }; + struct TEvFinishHandshake : NActors::TEventLocal<TEvFinishHandshake, EvFinishHandshake> { + TPGInitial::TPGBackendData BackendData; + std::vector<std::pair<char, TString>> ErrorFields; + + TEvFinishHandshake() = default; + }; + struct TEvConnectionClosed : NActors::TEventLocal<TEvConnectionClosed, EvConnectionClosed> { }; @@ -245,6 +255,16 @@ struct TEvPGEvents { return std::make_unique<TEvCloseResponse>(std::move(Message)); } }; + + struct TEvCancelRequest : NActors::TEventPB<TEvCancelRequest, NKikimrPgProxy::TEvCancelRequest, EvCancelRequest> { + + TEvCancelRequest() = default; + + TEvCancelRequest(int32_t pid, int32_t key) { + Record.SetProcessId(pid); + Record.SetSecretKey(key); + } + }; }; }
\ No newline at end of file diff --git a/ydb/core/pgproxy/pg_proxy_types.cpp b/ydb/core/pgproxy/pg_proxy_types.cpp index f5cfab74e3..22badbac64 100644 --- a/ydb/core/pgproxy/pg_proxy_types.cpp +++ b/ydb/core/pgproxy/pg_proxy_types.cpp @@ -41,6 +41,15 @@ uint32_t TPGInitial::GetProtocol() const { return protocol; } +TPGInitial::TPGBackendData TPGInitial::GetBackendData() const { + TPGBackendData backendData; + TPGStreamInput stream(*this); + uint32_t protocol = 0; + stream >> protocol; + stream >> backendData.Pid >> backendData.Key; + return backendData; +} + std::unordered_map<TString, TString> TPGInitial::GetClientParams() const { std::unordered_map<TString, TString> params; TPGStreamInput stream(*this); @@ -62,6 +71,14 @@ std::unordered_map<TString, TString> TPGInitial::GetClientParams() const { return params; } +TString TPGBackendKeyData::Dump() const { + TPGStreamInput stream(*this); + uint32_t pid = 0; + uint32_t key = 0; + stream >> pid >> key; + return TStringBuilder() << "cancellation PID " << pid << " KEY " << key; +} + TString TPGNoticeResponse::Dump() const { TPGStreamInput stream(*this); TStringBuilder text; diff --git a/ydb/core/pgproxy/pg_proxy_types.h b/ydb/core/pgproxy/pg_proxy_types.h index 7bb9b10814..4e4b41ef1d 100644 --- a/ydb/core/pgproxy/pg_proxy_types.h +++ b/ydb/core/pgproxy/pg_proxy_types.h @@ -58,6 +58,13 @@ struct TPGInitial : TPGMessageType<'i'> { // it's not true, because we don't rec TString Dump() const; std::unordered_map<TString, TString> GetClientParams() const; uint32_t GetProtocol() const; + + struct TPGBackendData { + uint32_t Pid; + uint32_t Key; + }; + + TPGBackendData GetBackendData() const; }; struct TPGAuth : TPGMessageType<'R'> { @@ -118,6 +125,10 @@ struct TPGParameterStatus : TPGMessageType<'S'> { } }; +struct TPGBackendKeyData : TPGMessageType<'K'> { + TString Dump() const; +}; + struct TPGSync : TPGMessageType<'S'> { }; diff --git a/ydb/core/pgproxy/pg_proxy_ut.cpp b/ydb/core/pgproxy/pg_proxy_ut.cpp index 2ff8853ad6..336d3bd352 100644 --- a/ydb/core/pgproxy/pg_proxy_ut.cpp +++ b/ydb/core/pgproxy/pg_proxy_ut.cpp @@ -73,7 +73,7 @@ Y_UNIT_TEST_SUITE(TPGTest) { UNIT_ASSERT_VALUES_EQUAL(authRequest->InitialMessage->GetClientParams()["user"], "user"); actorSystem.Send(new NActors::IEventHandle(handle->Sender, database, new NPG::TEvPGEvents::TEvAuthResponse())); TString received = Receive(s); - UNIT_ASSERT_VALUES_EQUAL(received, "520000000800000000530000002A7365727665725F76657273696F6E0031342E35202879646220737461626C652D32332D332900530000001B496E74657276616C5374796C6500706F737467726573005300000012446174655374796C650049534F005300000019636C69656E745F656E636F64696E6700555446380053000000197365727665725F656E636F64696E670055544638005300000019696E74656765725F6461746574696D6573006F6E005A0000000549"); + UNIT_ASSERT_VALUES_EQUAL(received, "520000000800000000"); } } diff --git a/ydb/core/pgproxy/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/pgproxy/protos/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..d186da4612 --- /dev/null +++ b/ydb/core/pgproxy/protos/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,43 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_library(core-pgproxy-protos) +target_link_libraries(core-pgproxy-protos PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_proto_messages(core-pgproxy-protos PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/protos/pgproxy.proto +) +target_proto_addincls(core-pgproxy-protos + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(core-pgproxy-protos + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/ydb/core/pgproxy/protos/CMakeLists.linux-aarch64.txt b/ydb/core/pgproxy/protos/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..71c3d8adf5 --- /dev/null +++ b/ydb/core/pgproxy/protos/CMakeLists.linux-aarch64.txt @@ -0,0 +1,44 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_library(core-pgproxy-protos) +target_link_libraries(core-pgproxy-protos PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_proto_messages(core-pgproxy-protos PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/protos/pgproxy.proto +) +target_proto_addincls(core-pgproxy-protos + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(core-pgproxy-protos + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/ydb/core/pgproxy/protos/CMakeLists.linux-x86_64.txt b/ydb/core/pgproxy/protos/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..71c3d8adf5 --- /dev/null +++ b/ydb/core/pgproxy/protos/CMakeLists.linux-x86_64.txt @@ -0,0 +1,44 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_library(core-pgproxy-protos) +target_link_libraries(core-pgproxy-protos PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_proto_messages(core-pgproxy-protos PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/protos/pgproxy.proto +) +target_proto_addincls(core-pgproxy-protos + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(core-pgproxy-protos + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/ydb/core/pgproxy/protos/CMakeLists.txt b/ydb/core/pgproxy/protos/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/pgproxy/protos/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/pgproxy/protos/CMakeLists.windows-x86_64.txt b/ydb/core/pgproxy/protos/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..d186da4612 --- /dev/null +++ b/ydb/core/pgproxy/protos/CMakeLists.windows-x86_64.txt @@ -0,0 +1,43 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_library(core-pgproxy-protos) +target_link_libraries(core-pgproxy-protos PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_proto_messages(core-pgproxy-protos PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/protos/pgproxy.proto +) +target_proto_addincls(core-pgproxy-protos + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(core-pgproxy-protos + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/ydb/core/pgproxy/protos/pgproxy.proto b/ydb/core/pgproxy/protos/pgproxy.proto new file mode 100644 index 0000000000..93506915aa --- /dev/null +++ b/ydb/core/pgproxy/protos/pgproxy.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +package NKikimrPgProxy; + +message TEvCancelRequest { + int32 ProcessId = 1; + int32 SecretKey = 2; +} diff --git a/ydb/core/pgproxy/protos/ya.make b/ydb/core/pgproxy/protos/ya.make new file mode 100644 index 0000000000..c5a83d6432 --- /dev/null +++ b/ydb/core/pgproxy/protos/ya.make @@ -0,0 +1,9 @@ +PROTO_LIBRARY() + +SRCS( + pgproxy.proto +) + +EXCLUDE_TAGS(GO_PROTO) + +END() diff --git a/ydb/core/pgproxy/ya.make b/ydb/core/pgproxy/ya.make index dc3387ea0e..eef8fed164 100644 --- a/ydb/core/pgproxy/ya.make +++ b/ydb/core/pgproxy/ya.make @@ -19,6 +19,7 @@ PEERDIR( library/cpp/actors/protos library/cpp/string_utils/base64 ydb/core/base + ydb/core/pgproxy/protos ydb/core/protos ydb/core/raw_socket ) |