aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-09-22 12:34:33 +0300
committertesseract <tesseract@yandex-team.com>2023-09-22 13:03:47 +0300
commit51b306d5b53fffdca02088ed82572e96781e6541 (patch)
tree02c86dc86532d0861540385c39840354faf636f0
parent57dc918a625d027c9bdabdebef3280c0c77950e2 (diff)
downloadydb-51b306d5b53fffdca02088ed82572e96781e6541.tar.gz
Fixed memory leak when closing connection
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp25
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.h4
-rw-r--r--ydb/core/kafka_proxy/kafka_listener.h4
-rw-r--r--ydb/core/pgproxy/pg_connection.cpp11
-rw-r--r--ydb/core/pgproxy/pg_connection.h4
-rw-r--r--ydb/core/pgproxy/pg_listener.h4
-rw-r--r--ydb/core/raw_socket/sock_listener.cpp8
-rw-r--r--ydb/core/raw_socket/sock_listener.h4
8 files changed, 40 insertions, 24 deletions
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 55fcaa6323d..78a21eca48c 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -50,6 +50,8 @@ public:
static constexpr TDuration InactivityTimeout = TDuration::Minutes(10);
TEvPollerReady* InactivityEvent = nullptr;
+ const TActorId ListenerActorId;
+
TIntrusivePtr<TSocketDescriptor> Socket;
TSocketAddressType Address;
TPollerToken::TPtr PollerToken;
@@ -63,7 +65,6 @@ public:
bool ConnectionEstablished = false;
bool CloseConnection = false;
- bool ActorActive = true;
NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier;
@@ -82,9 +83,12 @@ public:
TContext::TPtr Context;
- TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
+ TKafkaConnection(const TActorId& listenerActorId,
+ TIntrusivePtr<TSocketDescriptor> socket,
+ TNetworkConfig::TSocketAddressType address,
const NKikimrConfig::TKafkaProxyConfig& config)
- : Socket(std::move(socket))
+ : ListenerActorId(listenerActorId)
+ , Socket(std::move(socket))
, Address(address)
, Buffer(Socket.Get(), config.GetPacketSize())
, Step(SIZE_READ)
@@ -105,17 +109,12 @@ public:
void PassAway() override {
KAFKA_LOG_D("PassAway");
- if (!ActorActive) {
- return;
- }
- ActorActive = false;
- if (ConnectionEstablished) {
- ConnectionEstablished = false;
- }
+ ConnectionEstablished = false;
if (ProduceActorId) {
Send(ProduceActorId, new TEvents::TEvPoison());
}
+ Send(ListenerActorId, new TEvents::TEvUnsubscribe());
Shutdown();
TBase::PassAway();
}
@@ -608,9 +607,11 @@ protected:
}
};
-NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
+NActors::IActor* CreateKafkaConnection(const TActorId& listenerActorId,
+ TIntrusivePtr<TSocketDescriptor> socket,
+ TNetworkConfig::TSocketAddressType address,
const NKikimrConfig::TKafkaProxyConfig& config) {
- return new TKafkaConnection(std::move(socket), std::move(address), config);
+ return new TKafkaConnection(listenerActorId, std::move(socket), std::move(address), config);
}
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_connection.h b/ydb/core/kafka_proxy/kafka_connection.h
index acece5b24ee..95e9383800f 100644
--- a/ydb/core/kafka_proxy/kafka_connection.h
+++ b/ydb/core/kafka_proxy/kafka_connection.h
@@ -9,7 +9,9 @@ namespace NKafka {
using namespace NKikimr::NRawSocket;
-NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
+NActors::IActor* CreateKafkaConnection(const TActorId& listenerActorId,
+ TIntrusivePtr<TSocketDescriptor> socket,
+ TNetworkConfig::TSocketAddressType address,
const NKikimrConfig::TKafkaProxyConfig& config);
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_listener.h b/ydb/core/kafka_proxy/kafka_listener.h
index 10a82bf0944..49450493740 100644
--- a/ydb/core/kafka_proxy/kafka_listener.h
+++ b/ydb/core/kafka_proxy/kafka_listener.h
@@ -10,8 +10,8 @@ using namespace NKikimr::NRawSocket;
inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings, const NKikimrConfig::TKafkaProxyConfig& config) {
return CreateSocketListener(
poller, settings,
- [=](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
- return CreateKafkaConnection(socket, address, config);
+ [=](const TActorId& listenerActorId, TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
+ return CreateKafkaConnection(listenerActorId, socket, address, config);
},
NKikimrServices::EServiceKikimr::KAFKA_PROXY, EErrorAction::Abort);
}
diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp
index 0d0207c7beb..163c25bd548 100644
--- a/ydb/core/pgproxy/pg_connection.cpp
+++ b/ydb/core/pgproxy/pg_connection.cpp
@@ -13,6 +13,7 @@ using namespace NActors;
class TPGConnection : public TActorBootstrapped<TPGConnection>, public TNetworkConfig {
public:
using TBase = TActorBootstrapped<TPGConnection>;
+ const TActorId ListenerActorId;
TIntrusivePtr<TSocketDescriptor> Socket;
TSocketAddressType Address;
THPTimer InactivityTimer;
@@ -43,8 +44,9 @@ public:
char TransactionStatus = 'I'; // could be 'I' (idle), 'T' (transaction), 'E' (failed transaction)
std::deque<TAutoPtr<IEventHandle>> PostponedEvents;
- TPGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy)
- : Socket(std::move(socket))
+ TPGConnection(const TActorId& listenerActorId, TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy)
+ : ListenerActorId(listenerActorId)
+ , Socket(std::move(socket))
, Address(address)
, DatabaseProxy(databaseProxy)
{
@@ -65,6 +67,7 @@ public:
Send(DatabaseProxy, new TEvPGEvents::TEvConnectionClosed());
ConnectionEstablished = false;
}
+ Send(ListenerActorId, new TEvents::TEvUnsubscribe());
Shutdown();
TBase::PassAway();
}
@@ -854,8 +857,8 @@ protected:
}
};
-NActors::IActor* CreatePGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy) {
- return new TPGConnection(std::move(socket), std::move(address), databaseProxy);
+NActors::IActor* CreatePGConnection(const TActorId& listenerActorId, TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy) {
+ return new TPGConnection(listenerActorId, std::move(socket), std::move(address), databaseProxy);
}
} \ No newline at end of file
diff --git a/ydb/core/pgproxy/pg_connection.h b/ydb/core/pgproxy/pg_connection.h
index a9448b4a88f..682c4cd7a00 100644
--- a/ydb/core/pgproxy/pg_connection.h
+++ b/ydb/core/pgproxy/pg_connection.h
@@ -8,7 +8,9 @@ namespace NPG {
using namespace NKikimr::NRawSocket;
-NActors::IActor* CreatePGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
+NActors::IActor* CreatePGConnection(const TActorId& listenerActorId,
+ TIntrusivePtr<TSocketDescriptor> socket,
+ TNetworkConfig::TSocketAddressType address,
const NActors::TActorId& databaseProxy);
} // namespace NPG
diff --git a/ydb/core/pgproxy/pg_listener.h b/ydb/core/pgproxy/pg_listener.h
index 31488c0dd1a..1b1ab50a494 100644
--- a/ydb/core/pgproxy/pg_listener.h
+++ b/ydb/core/pgproxy/pg_listener.h
@@ -11,8 +11,8 @@ inline NActors::IActor* CreatePGListener(const NActors::TActorId& poller, const
const TListenerSettings& settings = {.Port = 5432}) {
return CreateSocketListener(
poller, settings,
- [=](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
- return CreatePGConnection(socket, address, databaseProxy);
+ [=](const TActorId& listenerActorId, TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
+ return CreatePGConnection(listenerActorId, socket, address, databaseProxy);
},
NKikimrServices::EServiceKikimr::PGWIRE);
}
diff --git a/ydb/core/raw_socket/sock_listener.cpp b/ydb/core/raw_socket/sock_listener.cpp
index 4752e30f50b..eadf7436c3c 100644
--- a/ydb/core/raw_socket/sock_listener.cpp
+++ b/ydb/core/raw_socket/sock_listener.cpp
@@ -42,6 +42,7 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(NActors::TEvPollerRegisterResult, Handle);
hFunc(NActors::TEvPollerReady, Handle);
+ hFunc(TEvents::TEvUnsubscribe, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}
@@ -93,6 +94,11 @@ public:
}
}
+ void Handle(TEvents::TEvUnsubscribe::TPtr ev) {
+ auto erased = Connections.erase(ev->Sender);
+ Y_VERIFY_DEBUG(erased);
+ }
+
void Handle(NActors::TEvPollerRegisterResult::TPtr ev) {
PollerToken = std::move(ev->Get()->PollerToken);
PollerToken->Request(true, false); // request read polling
@@ -105,7 +111,7 @@ public:
if (!socket) {
break;
}
- NActors::IActor* connectionSocket = ConnectionCreator(socket, addr);
+ NActors::IActor* connectionSocket = ConnectionCreator(SelfId(), socket, addr);
NActors::TActorId connectionId = Register(connectionSocket);
Send(Poller, new TEvPollerRegister(socket, connectionId, connectionId));
Connections.emplace(connectionId);
diff --git a/ydb/core/raw_socket/sock_listener.h b/ydb/core/raw_socket/sock_listener.h
index d328eff004e..14342b8c512 100644
--- a/ydb/core/raw_socket/sock_listener.h
+++ b/ydb/core/raw_socket/sock_listener.h
@@ -20,7 +20,9 @@ enum EErrorAction {
Abort
};
-using TConnectionCreator = std::function<NActors::IActor* (TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address)>;
+using TConnectionCreator = std::function<NActors::IActor* (const TActorId& listenerActorId,
+ TIntrusivePtr<TSocketDescriptor> socket,
+ TNetworkConfig::TSocketAddressType address)>;
NActors::IActor* CreateSocketListener(const NActors::TActorId& poller, const TListenerSettings& settings,
TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service,