diff options
author | tesseract <tesseract@yandex-team.com> | 2023-09-22 12:34:33 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-09-22 13:03:47 +0300 |
commit | 51b306d5b53fffdca02088ed82572e96781e6541 (patch) | |
tree | 02c86dc86532d0861540385c39840354faf636f0 | |
parent | 57dc918a625d027c9bdabdebef3280c0c77950e2 (diff) | |
download | ydb-51b306d5b53fffdca02088ed82572e96781e6541.tar.gz |
Fixed memory leak when closing connection
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 25 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.h | 4 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_listener.h | 4 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_connection.cpp | 11 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_connection.h | 4 | ||||
-rw-r--r-- | ydb/core/pgproxy/pg_listener.h | 4 | ||||
-rw-r--r-- | ydb/core/raw_socket/sock_listener.cpp | 8 | ||||
-rw-r--r-- | ydb/core/raw_socket/sock_listener.h | 4 |
8 files changed, 40 insertions, 24 deletions
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 55fcaa6323d..78a21eca48c 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -50,6 +50,8 @@ public: static constexpr TDuration InactivityTimeout = TDuration::Minutes(10); TEvPollerReady* InactivityEvent = nullptr; + const TActorId ListenerActorId; + TIntrusivePtr<TSocketDescriptor> Socket; TSocketAddressType Address; TPollerToken::TPtr PollerToken; @@ -63,7 +65,6 @@ public: bool ConnectionEstablished = false; bool CloseConnection = false; - bool ActorActive = true; NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; @@ -82,9 +83,12 @@ public: TContext::TPtr Context; - TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, + TKafkaConnection(const TActorId& listenerActorId, + TIntrusivePtr<TSocketDescriptor> socket, + TNetworkConfig::TSocketAddressType address, const NKikimrConfig::TKafkaProxyConfig& config) - : Socket(std::move(socket)) + : ListenerActorId(listenerActorId) + , Socket(std::move(socket)) , Address(address) , Buffer(Socket.Get(), config.GetPacketSize()) , Step(SIZE_READ) @@ -105,17 +109,12 @@ public: void PassAway() override { KAFKA_LOG_D("PassAway"); - if (!ActorActive) { - return; - } - ActorActive = false; - if (ConnectionEstablished) { - ConnectionEstablished = false; - } + ConnectionEstablished = false; if (ProduceActorId) { Send(ProduceActorId, new TEvents::TEvPoison()); } + Send(ListenerActorId, new TEvents::TEvUnsubscribe()); Shutdown(); TBase::PassAway(); } @@ -608,9 +607,11 @@ protected: } }; -NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, +NActors::IActor* CreateKafkaConnection(const TActorId& listenerActorId, + TIntrusivePtr<TSocketDescriptor> socket, + TNetworkConfig::TSocketAddressType address, const NKikimrConfig::TKafkaProxyConfig& config) { - return new TKafkaConnection(std::move(socket), std::move(address), config); + return new TKafkaConnection(listenerActorId, std::move(socket), std::move(address), config); } } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.h b/ydb/core/kafka_proxy/kafka_connection.h index acece5b24ee..95e9383800f 100644 --- a/ydb/core/kafka_proxy/kafka_connection.h +++ b/ydb/core/kafka_proxy/kafka_connection.h @@ -9,7 +9,9 @@ namespace NKafka { using namespace NKikimr::NRawSocket; -NActors::IActor* CreateKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, +NActors::IActor* CreateKafkaConnection(const TActorId& listenerActorId, + TIntrusivePtr<TSocketDescriptor> socket, + TNetworkConfig::TSocketAddressType address, const NKikimrConfig::TKafkaProxyConfig& config); } // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_listener.h b/ydb/core/kafka_proxy/kafka_listener.h index 10a82bf0944..49450493740 100644 --- a/ydb/core/kafka_proxy/kafka_listener.h +++ b/ydb/core/kafka_proxy/kafka_listener.h @@ -10,8 +10,8 @@ using namespace NKikimr::NRawSocket; inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, const TListenerSettings& settings, const NKikimrConfig::TKafkaProxyConfig& config) { return CreateSocketListener( poller, settings, - [=](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { - return CreateKafkaConnection(socket, address, config); + [=](const TActorId& listenerActorId, TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { + return CreateKafkaConnection(listenerActorId, socket, address, config); }, NKikimrServices::EServiceKikimr::KAFKA_PROXY, EErrorAction::Abort); } diff --git a/ydb/core/pgproxy/pg_connection.cpp b/ydb/core/pgproxy/pg_connection.cpp index 0d0207c7beb..163c25bd548 100644 --- a/ydb/core/pgproxy/pg_connection.cpp +++ b/ydb/core/pgproxy/pg_connection.cpp @@ -13,6 +13,7 @@ using namespace NActors; class TPGConnection : public TActorBootstrapped<TPGConnection>, public TNetworkConfig { public: using TBase = TActorBootstrapped<TPGConnection>; + const TActorId ListenerActorId; TIntrusivePtr<TSocketDescriptor> Socket; TSocketAddressType Address; THPTimer InactivityTimer; @@ -43,8 +44,9 @@ public: char TransactionStatus = 'I'; // could be 'I' (idle), 'T' (transaction), 'E' (failed transaction) std::deque<TAutoPtr<IEventHandle>> PostponedEvents; - TPGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy) - : Socket(std::move(socket)) + TPGConnection(const TActorId& listenerActorId, TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy) + : ListenerActorId(listenerActorId) + , Socket(std::move(socket)) , Address(address) , DatabaseProxy(databaseProxy) { @@ -65,6 +67,7 @@ public: Send(DatabaseProxy, new TEvPGEvents::TEvConnectionClosed()); ConnectionEstablished = false; } + Send(ListenerActorId, new TEvents::TEvUnsubscribe()); Shutdown(); TBase::PassAway(); } @@ -854,8 +857,8 @@ protected: } }; -NActors::IActor* CreatePGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy) { - return new TPGConnection(std::move(socket), std::move(address), databaseProxy); +NActors::IActor* CreatePGConnection(const TActorId& listenerActorId, TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const TActorId& databaseProxy) { + return new TPGConnection(listenerActorId, std::move(socket), std::move(address), databaseProxy); } }
\ No newline at end of file diff --git a/ydb/core/pgproxy/pg_connection.h b/ydb/core/pgproxy/pg_connection.h index a9448b4a88f..682c4cd7a00 100644 --- a/ydb/core/pgproxy/pg_connection.h +++ b/ydb/core/pgproxy/pg_connection.h @@ -8,7 +8,9 @@ namespace NPG { using namespace NKikimr::NRawSocket; -NActors::IActor* CreatePGConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, +NActors::IActor* CreatePGConnection(const TActorId& listenerActorId, + TIntrusivePtr<TSocketDescriptor> socket, + TNetworkConfig::TSocketAddressType address, const NActors::TActorId& databaseProxy); } // namespace NPG diff --git a/ydb/core/pgproxy/pg_listener.h b/ydb/core/pgproxy/pg_listener.h index 31488c0dd1a..1b1ab50a494 100644 --- a/ydb/core/pgproxy/pg_listener.h +++ b/ydb/core/pgproxy/pg_listener.h @@ -11,8 +11,8 @@ inline NActors::IActor* CreatePGListener(const NActors::TActorId& poller, const const TListenerSettings& settings = {.Port = 5432}) { return CreateSocketListener( poller, settings, - [=](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { - return CreatePGConnection(socket, address, databaseProxy); + [=](const TActorId& listenerActorId, TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { + return CreatePGConnection(listenerActorId, socket, address, databaseProxy); }, NKikimrServices::EServiceKikimr::PGWIRE); } diff --git a/ydb/core/raw_socket/sock_listener.cpp b/ydb/core/raw_socket/sock_listener.cpp index 4752e30f50b..eadf7436c3c 100644 --- a/ydb/core/raw_socket/sock_listener.cpp +++ b/ydb/core/raw_socket/sock_listener.cpp @@ -42,6 +42,7 @@ public: switch (ev->GetTypeRewrite()) { hFunc(NActors::TEvPollerRegisterResult, Handle); hFunc(NActors::TEvPollerReady, Handle); + hFunc(TEvents::TEvUnsubscribe, Handle); sFunc(TEvents::TEvPoison, PassAway); } } @@ -93,6 +94,11 @@ public: } } + void Handle(TEvents::TEvUnsubscribe::TPtr ev) { + auto erased = Connections.erase(ev->Sender); + Y_VERIFY_DEBUG(erased); + } + void Handle(NActors::TEvPollerRegisterResult::TPtr ev) { PollerToken = std::move(ev->Get()->PollerToken); PollerToken->Request(true, false); // request read polling @@ -105,7 +111,7 @@ public: if (!socket) { break; } - NActors::IActor* connectionSocket = ConnectionCreator(socket, addr); + NActors::IActor* connectionSocket = ConnectionCreator(SelfId(), socket, addr); NActors::TActorId connectionId = Register(connectionSocket); Send(Poller, new TEvPollerRegister(socket, connectionId, connectionId)); Connections.emplace(connectionId); diff --git a/ydb/core/raw_socket/sock_listener.h b/ydb/core/raw_socket/sock_listener.h index d328eff004e..14342b8c512 100644 --- a/ydb/core/raw_socket/sock_listener.h +++ b/ydb/core/raw_socket/sock_listener.h @@ -20,7 +20,9 @@ enum EErrorAction { Abort }; -using TConnectionCreator = std::function<NActors::IActor* (TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address)>; +using TConnectionCreator = std::function<NActors::IActor* (const TActorId& listenerActorId, + TIntrusivePtr<TSocketDescriptor> socket, + TNetworkConfig::TSocketAddressType address)>; NActors::IActor* CreateSocketListener(const NActors::TActorId& poller, const TListenerSettings& settings, TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service, |