aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorxenoxeno <xeno@ydb.tech>2023-10-31 21:05:43 +0300
committerxenoxeno <xeno@ydb.tech>2023-10-31 21:22:55 +0300
commit5e44523f6b7dcb158f3373a269c0495e1cadf71f (patch)
tree991b12d002609ea1da003cb93377c8b362cb238c
parent9869642e375e60baf2debe0bb01c63322878c79f (diff)
downloadydb-5e44523f6b7dcb158f3373a269c0495e1cadf71f.tar.gz
pg cancel request
-rw-r--r--.mapping.json5
-rw-r--r--ydb/apps/pgwire/pg_ydb_proxy.cpp1
-rw-r--r--ydb/core/local_pgwire/local_pgwire.cpp92
-rw-r--r--ydb/core/local_pgwire/local_pgwire.h2
-rw-r--r--ydb/core/local_pgwire/local_pgwire_connection.cpp76
-rw-r--r--ydb/core/local_pgwire/local_pgwire_util.h6
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.cpp18
-rw-r--r--ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/pgproxy/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/pgproxy/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/pgproxy/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp90
-rw-r--r--ydb/core/pgproxy/pg_proxy_events.h20
-rw-r--r--ydb/core/pgproxy/pg_proxy_types.cpp17
-rw-r--r--ydb/core/pgproxy/pg_proxy_types.h11
-rw-r--r--ydb/core/pgproxy/pg_proxy_ut.cpp2
-rw-r--r--ydb/core/pgproxy/protos/CMakeLists.darwin-x86_64.txt43
-rw-r--r--ydb/core/pgproxy/protos/CMakeLists.linux-aarch64.txt44
-rw-r--r--ydb/core/pgproxy/protos/CMakeLists.linux-x86_64.txt44
-rw-r--r--ydb/core/pgproxy/protos/CMakeLists.txt17
-rw-r--r--ydb/core/pgproxy/protos/CMakeLists.windows-x86_64.txt43
-rw-r--r--ydb/core/pgproxy/protos/pgproxy.proto8
-rw-r--r--ydb/core/pgproxy/protos/ya.make9
-rw-r--r--ydb/core/pgproxy/ya.make1
24 files changed, 485 insertions, 72 deletions
diff --git a/.mapping.json b/.mapping.json
index e303f11a27..d926e9aa61 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -4989,6 +4989,11 @@
"ydb/core/pgproxy/CMakeLists.linux-x86_64.txt":"",
"ydb/core/pgproxy/CMakeLists.txt":"",
"ydb/core/pgproxy/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/pgproxy/protos/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/pgproxy/protos/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/pgproxy/protos/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/pgproxy/protos/CMakeLists.txt":"",
+ "ydb/core/pgproxy/protos/CMakeLists.windows-x86_64.txt":"",
"ydb/core/pgproxy/ut/CMakeLists.darwin-x86_64.txt":"",
"ydb/core/pgproxy/ut/CMakeLists.linux-aarch64.txt":"",
"ydb/core/pgproxy/ut/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/apps/pgwire/pg_ydb_proxy.cpp b/ydb/apps/pgwire/pg_ydb_proxy.cpp
index 70ad4a3fd0..f8b9baf8e3 100644
--- a/ydb/apps/pgwire/pg_ydb_proxy.cpp
+++ b/ydb/apps/pgwire/pg_ydb_proxy.cpp
@@ -93,6 +93,7 @@ public:
TActorId actorId = Register(actor);
PgToYdbConnection[ev->Sender] = actorId;
BLOG_D("Created ydb connection " << actorId);
+ Send(ev->Sender, new NPG::TEvPGEvents::TEvFinishHandshake(), 0, ev->Cookie);
}
void Handle(NPG::TEvPGEvents::TEvConnectionClosed::TPtr& ev) {
diff --git a/ydb/core/local_pgwire/local_pgwire.cpp b/ydb/core/local_pgwire/local_pgwire.cpp
index 5c26127a63..d8ffcf8aed 100644
--- a/ydb/core/local_pgwire/local_pgwire.cpp
+++ b/ydb/core/local_pgwire/local_pgwire.cpp
@@ -1,4 +1,6 @@
#include "log_impl.h"
+#include "local_pgwire.h"
+#include "local_pgwire_util.h"
#include <ydb/core/pgproxy/pg_proxy_events.h>
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/public/api/grpc/ydb_auth_v1.grpc.pb.h>
@@ -10,7 +12,7 @@ namespace NLocalPgWire {
using namespace NActors;
using namespace NKikimr;
-extern IActor* CreateConnection(std::unordered_map<TString, TString> params);
+extern IActor* CreateConnection(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvConnectionOpened::TPtr&& event, const TConnectionState& connection);
class TPgYdbProxy : public TActor<TPgYdbProxy> {
using TBase = TActor<TPgYdbProxy>;
@@ -45,9 +47,15 @@ class TPgYdbProxy : public TActor<TPgYdbProxy> {
};
};
- std::unordered_map<TActorId, TActorId> PgToYdbConnection;
+ struct TConnectionState {
+ TActorId YdbConnection;
+ uint32_t ConnectionNum;
+ };
+
+ std::unordered_map<TActorId, TConnectionState> ConnectionState;
std::unordered_map<TActorId, TSecurityState> SecurityState;
std::unordered_map<TString, TTokenState> TokenState;
+ uint32_t ConnectionNum = 0;
public:
TPgYdbProxy()
@@ -97,7 +105,7 @@ public:
void Handle(NPG::TEvPGEvents::TEvAuth::TPtr& ev) {
std::unordered_map<TString, TString> clientParams = ev->Get()->InitialMessage->GetClientParams();
- BLOG_D("TEvAuth " << ev->Get()->InitialMessage->Dump());
+ BLOG_D("TEvAuth " << ev->Get()->InitialMessage->Dump() << " cookie " << ev->Cookie);
Ydb::Auth::LoginRequest request;
request.set_user(clientParams["user"]);
if (ev->Get()->PasswordMessage) {
@@ -132,7 +140,7 @@ public:
}
void Handle(NPG::TEvPGEvents::TEvConnectionOpened::TPtr& ev) {
- BLOG_D("TEvConnectionOpened " << ev->Sender);
+ BLOG_D("TEvConnectionOpened " << ev->Sender << " cookie " << ev->Cookie);
auto params = ev->Get()->Message->GetClientParams();
auto itSecurityState = SecurityState.find(ev->Sender);
if (itSecurityState != SecurityState.end()) {
@@ -143,64 +151,83 @@ public:
params["ydb-serialized-token"] = itSecurityState->second.SerializedToken;
}
}
- IActor* actor = CreateConnection(std::move(params));
+ auto& connectionState = ConnectionState[ev->Sender];
+ connectionState.ConnectionNum = ++ConnectionNum;
+ IActor* actor = CreateConnection(std::move(params), std::move(ev), {.ConnectionNum = connectionState.ConnectionNum});
TActorId actorId = Register(actor);
- PgToYdbConnection[ev->Sender] = actorId;
- BLOG_D("Created ydb connection " << actorId);
+ connectionState.YdbConnection = actorId;
+ BLOG_D("Created ydb connection " << actorId << " num " << connectionState.ConnectionNum);
}
void Handle(NPG::TEvPGEvents::TEvConnectionClosed::TPtr& ev) {
- BLOG_D("TEvConnectionClosed " << ev->Sender);
- auto itConnection = PgToYdbConnection.find(ev->Sender);
- if (itConnection != PgToYdbConnection.end()) {
- Send(itConnection->second, new TEvents::TEvPoisonPill());
- BLOG_D("Destroyed ydb connection " << itConnection->second);
+ BLOG_D("TEvConnectionClosed " << ev->Sender << " cookie " << ev->Cookie);
+ auto itConnection = ConnectionState.find(ev->Sender);
+ if (itConnection != ConnectionState.end()) {
+ Send(itConnection->second.YdbConnection, new TEvents::TEvPoisonPill());
+ BLOG_D("Destroyed ydb connection " << itConnection->second.YdbConnection << " num " << itConnection->second.ConnectionNum);
}
SecurityState.erase(ev->Sender);
+ ConnectionState.erase(itConnection);
// TODO: cleanup TokenState too
}
void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) {
- auto itConnection = PgToYdbConnection.find(ev->Sender);
- if (itConnection != PgToYdbConnection.end()) {
- Forward(ev, itConnection->second);
+ auto itConnection = ConnectionState.find(ev->Sender);
+ if (itConnection != ConnectionState.end()) {
+ Forward(ev, itConnection->second.YdbConnection);
}
}
void Handle(NPG::TEvPGEvents::TEvParse::TPtr& ev) {
- auto itConnection = PgToYdbConnection.find(ev->Sender);
- if (itConnection != PgToYdbConnection.end()) {
- Forward(ev, itConnection->second);
+ auto itConnection = ConnectionState.find(ev->Sender);
+ if (itConnection != ConnectionState.end()) {
+ Forward(ev, itConnection->second.YdbConnection);
}
}
void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) {
- auto itConnection = PgToYdbConnection.find(ev->Sender);
- if (itConnection != PgToYdbConnection.end()) {
- Forward(ev, itConnection->second);
+ auto itConnection = ConnectionState.find(ev->Sender);
+ if (itConnection != ConnectionState.end()) {
+ Forward(ev, itConnection->second.YdbConnection);
}
}
void Handle(NPG::TEvPGEvents::TEvDescribe::TPtr& ev) {
- auto itConnection = PgToYdbConnection.find(ev->Sender);
- if (itConnection != PgToYdbConnection.end()) {
- Forward(ev, itConnection->second);
+ auto itConnection = ConnectionState.find(ev->Sender);
+ if (itConnection != ConnectionState.end()) {
+ Forward(ev, itConnection->second.YdbConnection);
}
}
void Handle(NPG::TEvPGEvents::TEvExecute::TPtr& ev) {
- auto itConnection = PgToYdbConnection.find(ev->Sender);
- if (itConnection != PgToYdbConnection.end()) {
- Forward(ev, itConnection->second);
- BLOG_D("Forwarded to ydb connection " << itConnection->second);
+ auto itConnection = ConnectionState.find(ev->Sender);
+ if (itConnection != ConnectionState.end()) {
+ Forward(ev, itConnection->second.YdbConnection);
}
}
void Handle(NPG::TEvPGEvents::TEvClose::TPtr& ev) {
- auto itConnection = PgToYdbConnection.find(ev->Sender);
- if (itConnection != PgToYdbConnection.end()) {
- Forward(ev, itConnection->second);
- BLOG_D("Forwarded to ydb connection " << itConnection->second);
+ auto itConnection = ConnectionState.find(ev->Sender);
+ if (itConnection != ConnectionState.end()) {
+ Forward(ev, itConnection->second.YdbConnection);
+ }
+ }
+
+ void Handle(NPG::TEvPGEvents::TEvCancelRequest::TPtr& ev) {
+ uint32_t nodeId = ev->Get()->Record.GetProcessId();
+ if (nodeId == SelfId().NodeId()) {
+ uint32_t connectionNum = ev->Get()->Record.GetSecretKey();
+ for (const auto& [pgConnectionId, connectionState] : ConnectionState) {
+ if (connectionState.ConnectionNum == connectionNum) {
+ BLOG_D("Cancelling ConnectionNum " << connectionNum);
+ Forward(ev, connectionState.YdbConnection);
+ return;
+ }
+ }
+ BLOG_W("Cancelling ConnectionNum " << connectionNum << " - connection not found");
+ } else {
+ BLOG_D("Forwarding TEvCancelRequest to Node " << nodeId);
+ Forward(ev, CreateLocalPgWireProxyId(nodeId));
}
}
@@ -215,6 +242,7 @@ public:
hFunc(NPG::TEvPGEvents::TEvDescribe, Handle);
hFunc(NPG::TEvPGEvents::TEvExecute, Handle);
hFunc(NPG::TEvPGEvents::TEvClose, Handle);
+ hFunc(NPG::TEvPGEvents::TEvCancelRequest, Handle);
hFunc(TEvPrivate::TEvTokenReady, Handle);
hFunc(TEvTicketParser::TEvAuthorizeTicketResult, Handle);
}
diff --git a/ydb/core/local_pgwire/local_pgwire.h b/ydb/core/local_pgwire/local_pgwire.h
index b66a3d9fd2..125a373947 100644
--- a/ydb/core/local_pgwire/local_pgwire.h
+++ b/ydb/core/local_pgwire/local_pgwire.h
@@ -2,7 +2,7 @@
namespace NLocalPgWire {
-inline NActors::TActorId CreateLocalPgWireProxyId() { return NActors::TActorId(0, "localpgwire"); }
+inline NActors::TActorId CreateLocalPgWireProxyId(uint32_t nodeId = 0) { return NActors::TActorId(nodeId, "localpgwire"); }
NActors::IActor* CreateLocalPgWireProxy();
}
diff --git a/ydb/core/local_pgwire/local_pgwire_connection.cpp b/ydb/core/local_pgwire/local_pgwire_connection.cpp
index 85f43e74ec..c986e08657 100644
--- a/ydb/core/local_pgwire/local_pgwire_connection.cpp
+++ b/ydb/core/local_pgwire/local_pgwire_connection.cpp
@@ -22,22 +22,62 @@ namespace NLocalPgWire {
using namespace NActors;
using namespace NKikimr;
-class TPgYdbConnection : public TActor<TPgYdbConnection> {
- using TBase = TActor<TPgYdbConnection>;
+class TPgYdbConnection : public TActorBootstrapped<TPgYdbConnection> {
+ using TBase = TActorBootstrapped<TPgYdbConnection>;
std::unordered_map<TString, TString> ConnectionParams;
+ NPG::TEvPGEvents::TEvConnectionOpened::TPtr ConnectionEvent;
std::unordered_map<TString, TParsedStatement> ParsedStatements;
std::unordered_map<TString, TPortal> Portals;
TConnectionState Connection;
std::deque<TAutoPtr<IEventHandle>> Events;
ui32 Inflight = 0;
+ std::unordered_set<TActorId> CurrentRunningQueries;
public:
- TPgYdbConnection(std::unordered_map<TString, TString> params)
- : TActor<TPgYdbConnection>(&TPgYdbConnection::StateSchedule)
- , ConnectionParams(std::move(params))
+ TPgYdbConnection(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvConnectionOpened::TPtr&& event, const TConnectionState& connection)
+ : ConnectionParams(std::move(params))
+ , ConnectionEvent(std::move(event))
+ , Connection(connection)
{}
+ void Bootstrap() {
+ TString database;
+ if (ConnectionParams.count("database")) {
+ database = ConnectionParams["database"];
+ }
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCreateSessionRequest>();
+ NKikimrKqp::TCreateSessionRequest& request = *ev->Record.MutableRequest();
+ request.SetDatabase(database);
+ BLOG_D("Sent CreateSessionRequest to kqpProxy " << ev->Record.ShortDebugString());
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release());
+ TBase::Become(&TPgYdbConnection::StateCreateSession);
+ }
+
+ void Handle(NKqp::TEvKqp::TEvCreateSessionResponse::TPtr& ev) {
+ const auto& record(ev->Get()->Record);
+ BLOG_D("Received TEvCreateSessionResponse " << record.ShortDebugString());
+ if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ BLOG_D("Session id is " << record.GetResponse().GetSessionId());
+ Connection.SessionId = record.GetResponse().GetSessionId();
+
+ auto response = MakeHolder<NPG::TEvPGEvents::TEvFinishHandshake>();
+ response->BackendData.Pid = SelfId().NodeId();
+ response->BackendData.Key = Connection.ConnectionNum;
+ Send(ConnectionEvent->Sender, response.Release(), 0, ev->Cookie);
+ TBase::Become(&TPgYdbConnection::StateSchedule);
+ ConnectionEvent.Destroy(); // don't need it anymore
+ } else {
+ BLOG_W("Failed to create session: " << record.ShortDebugString());
+ auto response = MakeHolder<NPG::TEvPGEvents::TEvFinishHandshake>();
+ // TODO: report actuall error
+ response->ErrorFields.push_back({'E', "ERROR"});
+ response->ErrorFields.push_back({'M', record.GetError()});
+ Send(ConnectionEvent->Sender, response.Release(), 0, ev->Cookie);
+ return PassAway();
+ }
+ }
+
void ProcessEventsQueue() {
while (!Events.empty() && Inflight == 0) {
StateWork(Events.front());
@@ -58,6 +98,7 @@ public:
++Inflight;
TActorId actorId = RegisterWithSameMailbox(CreatePgwireKqpProxyQuery(SelfId(), ConnectionParams, Connection, std::move(ev)));
BLOG_D("Created pgwireKqpProxyQuery: " << actorId);
+ CurrentRunningQueries.insert(actorId);
}
void Handle(NPG::TEvPGEvents::TEvQuery::TPtr& ev) {
@@ -85,7 +126,7 @@ public:
++Inflight;
TActorId actorId = RegisterWithSameMailbox(CreatePgwireKqpProxyParse(SelfId(), ConnectionParams, Connection, std::move(ev)));
BLOG_D("Created pgwireKqpProxyParse: " << actorId);
- return;
+ CurrentRunningQueries.insert(actorId);
}
void Handle(NPG::TEvPGEvents::TEvBind::TPtr& ev) {
@@ -183,6 +224,7 @@ public:
++Inflight;
TActorId actorId = RegisterWithSameMailbox(CreatePgwireKqpProxyExecute(SelfId(), ConnectionParams, Connection, std::move(ev), it->second));
BLOG_D("Created pgwireKqpProxyExecute: " << actorId);
+ CurrentRunningQueries.insert(actorId);
}
void Handle(TEvEvents::TEvUpdateStatement::TPtr& ev) {
@@ -216,23 +258,39 @@ public:
BLOG_D("Session id is " << connection.SessionId);
Connection.SessionId = connection.SessionId;
}
+ CurrentRunningQueries.erase(ev->Sender);
ProcessEventsQueue();
}
+ void Handle(NPG::TEvPGEvents::TEvCancelRequest::TPtr&) {
+ BLOG_D("Received TEvCancelRequest");
+ for (const TActorId& actor : CurrentRunningQueries) {
+ Send(actor, new TEvEvents::TEvCancelRequest());
+ }
+ }
+
void PassAway() override {
if (Connection.SessionId) {
- BLOG_D("Closing session " << Connection.SessionId);
auto ev = MakeHolder<NKqp::TEvKqp::TEvCloseSessionRequest>();
ev->Record.MutableRequest()->SetSessionId(Connection.SessionId);
+ BLOG_D("Closing session " << Connection.SessionId << ", sent event to kqpProxy " << ev->Record.ShortDebugString());
Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), ev.Release());
}
TBase::PassAway();
}
+ STATEFN(StateCreateSession) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NKqp::TEvKqp::TEvCreateSessionResponse, Handle);
+ cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
+ }
+ }
+
STATEFN(StateSchedule) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvEvents::TEvProxyCompleted, Handle);
hFunc(TEvEvents::TEvUpdateStatement, Handle);
+ hFunc(NPG::TEvPGEvents::TEvCancelRequest, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
default: {
if (Inflight == 0) {
@@ -259,8 +317,8 @@ public:
};
-NActors::IActor* CreateConnection(std::unordered_map<TString, TString> params) {
- return new TPgYdbConnection(std::move(params));
+NActors::IActor* CreateConnection(std::unordered_map<TString, TString> params, NPG::TEvPGEvents::TEvConnectionOpened::TPtr&& event, const TConnectionState& connection) {
+ return new TPgYdbConnection(std::move(params), std::move(event), connection);
}
}
diff --git a/ydb/core/local_pgwire/local_pgwire_util.h b/ydb/core/local_pgwire/local_pgwire_util.h
index eb5ca48a5a..21ecf6dd88 100644
--- a/ydb/core/local_pgwire/local_pgwire_util.h
+++ b/ydb/core/local_pgwire/local_pgwire_util.h
@@ -27,6 +27,7 @@ struct TTransactionState {
struct TConnectionState {
TString SessionId;
TTransactionState Transaction;
+ uint32_t ConnectionNum = 0;
};
struct TParsedStatement {
@@ -54,6 +55,7 @@ struct TEvEvents {
EvProxyCompleted = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvUpdateStatement,
EvSingleQuery,
+ EvCancelRequest,
EvEnd
};
@@ -92,6 +94,10 @@ struct TEvEvents {
return std::make_unique<NPG::TEvPGEvents::TEvQueryResponse>();
}
};
+
+ struct TEvCancelRequest : NActors::TEventLocal<TEvCancelRequest, EvCancelRequest> {
+ TEvCancelRequest() = default;
+ };
};
TString ColumnPrimitiveValueToString(NYdb::TValueParser& valueParser);
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
index 17c2e3455d..23f9f067fc 100644
--- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
@@ -241,7 +241,7 @@ protected:
void ReplyWithResponseAndPassAway() {
Response_->TransactionStatus = Connection_.Transaction.Status;
TBase::Send(Owner_, new TEvEvents::TEvProxyCompleted(Connection_));
- BLOG_D("Finally replying to " << EventRequest_->Sender);
+ BLOG_D("Finally replying to " << EventRequest_->Sender << " cookie " << EventRequest_->Cookie);
TBase::Send(EventRequest_->Sender, Response_.release(), 0, EventRequest_->Cookie);
TBase::PassAway();
}
@@ -297,7 +297,20 @@ protected:
return ReplyWithResponseAndPassAway();
}
+ void Handle(TEvEvents::TEvCancelRequest::TPtr&) {
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvCancelQueryRequest>();
+ if (Connection_.SessionId) {
+ ev->Record.MutableRequest()->SetSessionId(Connection_.SessionId);
+ }
+ BLOG_D("Sent CancelQueryRequest to kqpProxy " << ev->Record.ShortDebugString());
+ TBase::Send(NKqp::MakeKqpProxyID(TBase::SelfId().NodeId()), ev.Release());
+ Response_->ErrorFields.push_back({'S', "ERROR"});
+ Response_->ErrorFields.push_back({'V', "ERROR"});
+ Response_->ErrorFields.push_back({'C', "57014"});
+ Response_->ErrorFields.push_back({'M', "Cancelling statement due to user request"});
+ return ReplyWithResponseAndPassAway();
+ }
};
class TPgwireKqpProxyQuery : public TPgwireKqpProxy<TPgwireKqpProxyQuery, TEvEvents::TEvSingleQuery> {
@@ -330,6 +343,7 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(NKqp::TEvKqp::TEvQueryResponse, TBase::Handle);
hFunc(NKqp::TEvKqpExecuter::TEvStreamData, TBase::Handle);
+ hFunc(TEvEvents::TEvCancelRequest, Handle);
}
}
};
@@ -416,6 +430,7 @@ public:
STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
+ hFunc(TEvEvents::TEvCancelRequest, TBase::Handle);
}
}
};
@@ -460,6 +475,7 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(NKqp::TEvKqp::TEvQueryResponse, TBase::Handle);
hFunc(NKqp::TEvKqpExecuter::TEvStreamData, TBase::Handle);
+ hFunc(TEvEvents::TEvCancelRequest, Handle);
}
}
};
diff --git a/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt b/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt
index eeb3907362..19821e9cb2 100644
--- a/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/pgproxy/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(protos)
add_subdirectory(ut)
add_library(ydb-core-pgproxy)
@@ -16,6 +17,7 @@ target_link_libraries(ydb-core-pgproxy PUBLIC
cpp-actors-protos
cpp-string_utils-base64
ydb-core-base
+ core-pgproxy-protos
ydb-core-protos
ydb-core-raw_socket
)
diff --git a/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt b/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt
index e4b220cee8..bb31f4c992 100644
--- a/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/pgproxy/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(protos)
add_subdirectory(ut)
add_library(ydb-core-pgproxy)
@@ -17,6 +18,7 @@ target_link_libraries(ydb-core-pgproxy PUBLIC
cpp-actors-protos
cpp-string_utils-base64
ydb-core-base
+ core-pgproxy-protos
ydb-core-protos
ydb-core-raw_socket
)
diff --git a/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt b/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt
index e4b220cee8..bb31f4c992 100644
--- a/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/pgproxy/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(protos)
add_subdirectory(ut)
add_library(ydb-core-pgproxy)
@@ -17,6 +18,7 @@ target_link_libraries(ydb-core-pgproxy PUBLIC
cpp-actors-protos
cpp-string_utils-base64
ydb-core-base
+ core-pgproxy-protos
ydb-core-protos
ydb-core-raw_socket
)
diff --git a/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt b/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt
index eeb3907362..19821e9cb2 100644
--- a/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/pgproxy/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(protos)
add_subdirectory(ut)
add_library(ydb-core-pgproxy)
@@ -16,6 +17,7 @@ target_link_libraries(ydb-core-pgproxy PUBLIC
cpp-actors-protos
cpp-string_utils-base64
ydb-core-base
+ core-pgproxy-protos
ydb-core-protos
ydb-core-raw_socket
)
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index c32782bd79..b944ea582c 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -18,6 +18,7 @@ public:
TSocketAddressType Address;
THPTimer InactivityTimer;
static constexpr TDuration InactivityTimeout = TDuration::Minutes(10);
+ static constexpr uint32_t BACKEND_DATA_MASK = 0x55aa55aa;
TEvPollerReady* InactivityEvent = nullptr;
bool IsAuthRequired = true;
bool IsSslSupported = true;
@@ -33,7 +34,7 @@ public:
{"DateStyle", "ISO"},
{"IntervalStyle", "postgres"},
{"integer_datetimes", "on"},
- {"server_version", "14.5 (ydb stable-23-3)"},
+ {"server_version", "14.5 (ydb stable-23-4)"},
};
TSocketBuffer BufferOutput;
TActorId DatabaseProxy;
@@ -64,7 +65,7 @@ public:
void PassAway() override {
//ctx.Send(Endpoint->Owner, new TEvHttpProxy::TEvHttpConnectionClosed(ctx.SelfID, std::move(RecycledRequests)));
if (ConnectionEstablished) {
- Send(DatabaseProxy, new TEvPGEvents::TEvConnectionClosed());
+ Send(DatabaseProxy, new TEvPGEvents::TEvConnectionClosed(), 0, IncomingSequenceNumber);
ConnectionEstablished = false;
}
Send(ListenerActorId, new TEvents::TEvUnsubscribe());
@@ -153,6 +154,7 @@ protected:
};
static const std::unordered_map<char, TStringBuf> outgoingMessageName = {
GetMessageCode<TPGAuth>(),
+ GetMessageCode<TPGBackendKeyData>(),
GetMessageCode<TPGErrorResponse>(),
GetMessageCode<TPGDataRow>(),
GetMessageCode<TPGParameterStatus>(),
@@ -209,6 +211,8 @@ protected:
switch (message.Message) {
case TPGParameterStatus::CODE:
return ((const TPGParameterStatus&)message).Dump();
+ case TPGBackendKeyData::CODE:
+ return ((const TPGBackendKeyData&)message).Dump();
case TPGReadyForQuery::CODE:
return ((const TPGReadyForQuery&)message).Dump();
case TPGCommandComplete::CODE:
@@ -319,30 +323,9 @@ protected:
FlushAndPoll();
}
- void FinishHandshake() {
- const auto& clientParams(InitialMessage->GetClientParams());
- auto itOptions = clientParams.find("options");
- if (itOptions != clientParams.end()) {
- TStringBuf options(itOptions->second);
- TStringBuf token;
- while (options.NextTok(' ', token) && token == "-c") {
- TStringBuf option;
- if (options.NextTok(' ', option)) {
- TStringBuf name;
- TStringBuf value;
- if (option.NextTok('=', name)) {
- value = option;
- ServerParams[TString(name)] = TString(value);
- }
- }
- }
- }
- for (const auto& [name, value] : ServerParams) {
- SendParameterStatus(name, value);
- }
- SendReadyForQuery();
- ConnectionEstablished = true;
- Send(DatabaseProxy, new TEvPGEvents::TEvConnectionOpened(std::move(InitialMessage), Address));
+ bool IsValidBackendData(const TPGInitial::TPGBackendData& backendData) {
+ Y_UNUSED(backendData);
+ return true;
}
void HandleMessage(const TPGInitial* message) {
@@ -371,6 +354,10 @@ protected:
}
if (protocol == 0x2e16d204) { // 80877102 cancellation message
BLOG_D("cancellation message");
+ TPGInitial::TPGBackendData backendData = message->GetBackendData();
+ if (IsValidBackendData(backendData)) {
+ Send(DatabaseProxy, new TEvPGEvents::TEvCancelRequest(backendData.Pid ^ BACKEND_DATA_MASK, backendData.Key ^ BACKEND_DATA_MASK));
+ }
CloseConnection = true;
return;
}
@@ -384,7 +371,7 @@ protected:
Send(DatabaseProxy, new TEvPGEvents::TEvAuth(InitialMessage, Address), 0, IncomingSequenceNumber++);
} else {
SendAuthOk();
- FinishHandshake();
+ BecomeConnected();
}
}
@@ -495,6 +482,12 @@ protected:
return false;
}
+ void BecomeConnected() {
+ SyncSequenceNumber = IncomingSequenceNumber;
+ Send(DatabaseProxy, new TEvPGEvents::TEvConnectionOpened(InitialMessage, Address), 0, IncomingSequenceNumber++);
+ ConnectionEstablished = true;
+ }
+
struct TEventsComparator {
bool operator ()(const TAutoPtr<IEventHandle>& ev1, const TAutoPtr<IEventHandle>& ev2) const {
return ev1->Cookie < ev2->Cookie;
@@ -533,7 +526,7 @@ protected:
}
} else {
SendAuthOk();
- FinishHandshake();
+ BecomeConnected();
}
++OutgoingSequenceNumber;
ReplayPostponedEvents();
@@ -543,6 +536,46 @@ protected:
}
}
+ void HandleConnected(TEvPGEvents::TEvFinishHandshake::TPtr& ev) {
+ if (IsEventExpected(ev)) {
+ if (ev->Get()->ErrorFields.empty()) {
+ const auto& clientParams(InitialMessage->GetClientParams());
+ auto itOptions = clientParams.find("options");
+ if (itOptions != clientParams.end()) {
+ TStringBuf options(itOptions->second);
+ TStringBuf token;
+ while (options.NextTok(' ', token) && token == "-c") {
+ TStringBuf option;
+ if (options.NextTok(' ', option)) {
+ TStringBuf name;
+ TStringBuf value;
+ if (option.NextTok('=', name)) {
+ value = option;
+ ServerParams[TString(name)] = TString(value);
+ }
+ }
+ }
+ }
+
+ TPGStreamOutput<TPGBackendKeyData> backendKeyData;
+ backendKeyData << (ev->Get()->BackendData.Pid ^ BACKEND_DATA_MASK) << (ev->Get()->BackendData.Key ^ BACKEND_DATA_MASK);
+ SendStream(backendKeyData);
+
+ for (const auto& [name, value] : ServerParams) {
+ SendParameterStatus(name, value);
+ }
+ BecomeReadyForQuery();
+ } else {
+ SendErrorResponse(ev->Get()->ErrorFields);
+ BLOG_ERROR("unable to create connection");
+ CloseConnection = true;
+ FlushAndPoll();
+ }
+ } else {
+ PostponeEvent(ev);
+ }
+ }
+
void HandleConnected(TEvPGEvents::TEvQueryResponse::TPtr& ev) {
if (IsEventExpected(ev)) {
if (ev->Get()->TransactionStatus) {
@@ -864,6 +897,7 @@ protected:
hFunc(TEvPollerReady, HandleConnected);
hFunc(TEvPollerRegisterResult, HandleConnected);
hFunc(TEvPGEvents::TEvAuthResponse, HandleConnected);
+ hFunc(TEvPGEvents::TEvFinishHandshake, HandleConnected);
hFunc(TEvPGEvents::TEvQueryResponse, HandleConnected);
hFunc(TEvPGEvents::TEvParseResponse, HandleConnected);
hFunc(TEvPGEvents::TEvBindResponse, HandleConnected);
diff --git a/ydb/core/pgproxy/pg_proxy_events.h b/ydb/core/pgproxy/pg_proxy_events.h
index b93adf48e9..e341975ae8 100644
--- a/ydb/core/pgproxy/pg_proxy_events.h
+++ b/ydb/core/pgproxy/pg_proxy_events.h
@@ -3,6 +3,7 @@
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/event_local.h>
#include <ydb/core/raw_socket/sock_config.h>
+#include <ydb/core/pgproxy/protos/pgproxy.pb.h>
#include "pg_proxy_types.h"
namespace NPG {
@@ -12,6 +13,7 @@ using namespace NKikimr::NRawSocket;
struct TEvPGEvents {
enum EEv {
EvConnectionOpened = EventSpaceBegin(NActors::TEvents::ES_PGWIRE),
+ EvFinishHandshake,
EvConnectionClosed,
EvAuth,
EvAuthResponse,
@@ -27,6 +29,7 @@ struct TEvPGEvents {
EvExecuteResponse,
EvClose,
EvCloseResponse,
+ EvCancelRequest,
EvEnd
};
@@ -58,6 +61,13 @@ struct TEvPGEvents {
{}
};
+ struct TEvFinishHandshake : NActors::TEventLocal<TEvFinishHandshake, EvFinishHandshake> {
+ TPGInitial::TPGBackendData BackendData;
+ std::vector<std::pair<char, TString>> ErrorFields;
+
+ TEvFinishHandshake() = default;
+ };
+
struct TEvConnectionClosed : NActors::TEventLocal<TEvConnectionClosed, EvConnectionClosed> {
};
@@ -245,6 +255,16 @@ struct TEvPGEvents {
return std::make_unique<TEvCloseResponse>(std::move(Message));
}
};
+
+ struct TEvCancelRequest : NActors::TEventPB<TEvCancelRequest, NKikimrPgProxy::TEvCancelRequest, EvCancelRequest> {
+
+ TEvCancelRequest() = default;
+
+ TEvCancelRequest(int32_t pid, int32_t key) {
+ Record.SetProcessId(pid);
+ Record.SetSecretKey(key);
+ }
+ };
};
} \ No newline at end of file
diff --git a/ydb/core/pgproxy/pg_proxy_types.cpp b/ydb/core/pgproxy/pg_proxy_types.cpp
index f5cfab74e3..22badbac64 100644
--- a/ydb/core/pgproxy/pg_proxy_types.cpp
+++ b/ydb/core/pgproxy/pg_proxy_types.cpp
@@ -41,6 +41,15 @@ uint32_t TPGInitial::GetProtocol() const {
return protocol;
}
+TPGInitial::TPGBackendData TPGInitial::GetBackendData() const {
+ TPGBackendData backendData;
+ TPGStreamInput stream(*this);
+ uint32_t protocol = 0;
+ stream >> protocol;
+ stream >> backendData.Pid >> backendData.Key;
+ return backendData;
+}
+
std::unordered_map<TString, TString> TPGInitial::GetClientParams() const {
std::unordered_map<TString, TString> params;
TPGStreamInput stream(*this);
@@ -62,6 +71,14 @@ std::unordered_map<TString, TString> TPGInitial::GetClientParams() const {
return params;
}
+TString TPGBackendKeyData::Dump() const {
+ TPGStreamInput stream(*this);
+ uint32_t pid = 0;
+ uint32_t key = 0;
+ stream >> pid >> key;
+ return TStringBuilder() << "cancellation PID " << pid << " KEY " << key;
+}
+
TString TPGNoticeResponse::Dump() const {
TPGStreamInput stream(*this);
TStringBuilder text;
diff --git a/ydb/core/pgproxy/pg_proxy_types.h b/ydb/core/pgproxy/pg_proxy_types.h
index 7bb9b10814..4e4b41ef1d 100644
--- a/ydb/core/pgproxy/pg_proxy_types.h
+++ b/ydb/core/pgproxy/pg_proxy_types.h
@@ -58,6 +58,13 @@ struct TPGInitial : TPGMessageType<'i'> { // it's not true, because we don't rec
TString Dump() const;
std::unordered_map<TString, TString> GetClientParams() const;
uint32_t GetProtocol() const;
+
+ struct TPGBackendData {
+ uint32_t Pid;
+ uint32_t Key;
+ };
+
+ TPGBackendData GetBackendData() const;
};
struct TPGAuth : TPGMessageType<'R'> {
@@ -118,6 +125,10 @@ struct TPGParameterStatus : TPGMessageType<'S'> {
}
};
+struct TPGBackendKeyData : TPGMessageType<'K'> {
+ TString Dump() const;
+};
+
struct TPGSync : TPGMessageType<'S'> {
};
diff --git a/ydb/core/pgproxy/pg_proxy_ut.cpp b/ydb/core/pgproxy/pg_proxy_ut.cpp
index 2ff8853ad6..336d3bd352 100644
--- a/ydb/core/pgproxy/pg_proxy_ut.cpp
+++ b/ydb/core/pgproxy/pg_proxy_ut.cpp
@@ -73,7 +73,7 @@ Y_UNIT_TEST_SUITE(TPGTest) {
UNIT_ASSERT_VALUES_EQUAL(authRequest->InitialMessage->GetClientParams()["user"], "user");
actorSystem.Send(new NActors::IEventHandle(handle->Sender, database, new NPG::TEvPGEvents::TEvAuthResponse()));
TString received = Receive(s);
- UNIT_ASSERT_VALUES_EQUAL(received, "520000000800000000530000002A7365727665725F76657273696F6E0031342E35202879646220737461626C652D32332D332900530000001B496E74657276616C5374796C6500706F737467726573005300000012446174655374796C650049534F005300000019636C69656E745F656E636F64696E6700555446380053000000197365727665725F656E636F64696E670055544638005300000019696E74656765725F6461746574696D6573006F6E005A0000000549");
+ UNIT_ASSERT_VALUES_EQUAL(received, "520000000800000000");
}
}
diff --git a/ydb/core/pgproxy/protos/CMakeLists.darwin-x86_64.txt b/ydb/core/pgproxy/protos/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..d186da4612
--- /dev/null
+++ b/ydb/core/pgproxy/protos/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,43 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+
+add_library(core-pgproxy-protos)
+target_link_libraries(core-pgproxy-protos PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+)
+target_proto_messages(core-pgproxy-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/protos/pgproxy.proto
+)
+target_proto_addincls(core-pgproxy-protos
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(core-pgproxy-protos
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/ydb/core/pgproxy/protos/CMakeLists.linux-aarch64.txt b/ydb/core/pgproxy/protos/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..71c3d8adf5
--- /dev/null
+++ b/ydb/core/pgproxy/protos/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,44 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+
+add_library(core-pgproxy-protos)
+target_link_libraries(core-pgproxy-protos PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+)
+target_proto_messages(core-pgproxy-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/protos/pgproxy.proto
+)
+target_proto_addincls(core-pgproxy-protos
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(core-pgproxy-protos
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/ydb/core/pgproxy/protos/CMakeLists.linux-x86_64.txt b/ydb/core/pgproxy/protos/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..71c3d8adf5
--- /dev/null
+++ b/ydb/core/pgproxy/protos/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,44 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+
+add_library(core-pgproxy-protos)
+target_link_libraries(core-pgproxy-protos PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+)
+target_proto_messages(core-pgproxy-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/protos/pgproxy.proto
+)
+target_proto_addincls(core-pgproxy-protos
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(core-pgproxy-protos
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/ydb/core/pgproxy/protos/CMakeLists.txt b/ydb/core/pgproxy/protos/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/pgproxy/protos/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/pgproxy/protos/CMakeLists.windows-x86_64.txt b/ydb/core/pgproxy/protos/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..d186da4612
--- /dev/null
+++ b/ydb/core/pgproxy/protos/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,43 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+get_built_tool_path(
+ TOOL_protoc_bin
+ TOOL_protoc_dependency
+ contrib/tools/protoc/bin
+ protoc
+)
+get_built_tool_path(
+ TOOL_cpp_styleguide_bin
+ TOOL_cpp_styleguide_dependency
+ contrib/tools/protoc/plugins/cpp_styleguide
+ cpp_styleguide
+)
+
+add_library(core-pgproxy-protos)
+target_link_libraries(core-pgproxy-protos PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+)
+target_proto_messages(core-pgproxy-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/pgproxy/protos/pgproxy.proto
+)
+target_proto_addincls(core-pgproxy-protos
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(core-pgproxy-protos
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/ydb/core/pgproxy/protos/pgproxy.proto b/ydb/core/pgproxy/protos/pgproxy.proto
new file mode 100644
index 0000000000..93506915aa
--- /dev/null
+++ b/ydb/core/pgproxy/protos/pgproxy.proto
@@ -0,0 +1,8 @@
+syntax = "proto3";
+
+package NKikimrPgProxy;
+
+message TEvCancelRequest {
+ int32 ProcessId = 1;
+ int32 SecretKey = 2;
+}
diff --git a/ydb/core/pgproxy/protos/ya.make b/ydb/core/pgproxy/protos/ya.make
new file mode 100644
index 0000000000..c5a83d6432
--- /dev/null
+++ b/ydb/core/pgproxy/protos/ya.make
@@ -0,0 +1,9 @@
+PROTO_LIBRARY()
+
+SRCS(
+ pgproxy.proto
+)
+
+EXCLUDE_TAGS(GO_PROTO)
+
+END()
diff --git a/ydb/core/pgproxy/ya.make b/ydb/core/pgproxy/ya.make
index dc3387ea0e..eef8fed164 100644
--- a/ydb/core/pgproxy/ya.make
+++ b/ydb/core/pgproxy/ya.make
@@ -19,6 +19,7 @@ PEERDIR(
library/cpp/actors/protos
library/cpp/string_utils/base64
ydb/core/base
+ ydb/core/pgproxy/protos
ydb/core/protos
ydb/core/raw_socket
)