diff options
author | tesseract <tesseract@yandex-team.com> | 2023-09-21 19:13:53 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-09-21 20:01:08 +0300 |
commit | 247be8bf0216724b69ebb739eb1b87a9352963da (patch) | |
tree | c35756aea78216de265c477808fb0c17edb81fd6 | |
parent | e7d500fdc7894d82b7e2d827fdbde9cfbe0c2f72 (diff) | |
download | ydb-247be8bf0216724b69ebb739eb1b87a9352963da.tar.gz |
Fix epoll_ctl(EPOLL_CTL_MOD) failed with Bad file descriptor
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 9 | ||||
-rw-r--r-- | ydb/core/raw_socket/sock_impl.h | 3 |
2 files changed, 8 insertions, 4 deletions
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 6aaba8d9116..55fcaa6323d 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -101,7 +101,6 @@ public: Become(&TKafkaConnection::StateAccepting); Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false)); KAFKA_LOG_I("incoming connection opened " << Address); - OnAccept(); } void PassAway() override { @@ -197,6 +196,7 @@ protected: void HandleAccepting(TEvPollerRegisterResult::TPtr ev) { PollerToken = std::move(ev->Get()->PollerToken); + UpgradeToSecure(); OnAccept(); } @@ -415,18 +415,18 @@ protected: } } - void DoRead(const TActorContext& ctx) { + void UpgradeToSecure() { if (IsSslRequired && !IsSslActive) { int res = Socket->TryUpgradeToSecure(); if (res < 0) { KAFKA_LOG_ERROR("connection closed - error in UpgradeToSecure: " << strerror(-res)); return PassAway(); } - RequestPoller(); IsSslActive = true; - return; } + } + void DoRead(const TActorContext& ctx) { KAFKA_LOG_T("DoRead: Demand=" << Demand.Length << ", Step=" << static_cast<i32>(Step)); for (;;) { while (Demand) { @@ -589,6 +589,7 @@ protected: void HandleConnected(TEvPollerRegisterResult::TPtr ev) { PollerToken = std::move(ev->Get()->PollerToken); + UpgradeToSecure(); PollerToken->Request(true, true); } diff --git a/ydb/core/raw_socket/sock_impl.h b/ydb/core/raw_socket/sock_impl.h index fc52872d1cb..642a59e2489 100644 --- a/ydb/core/raw_socket/sock_impl.h +++ b/ydb/core/raw_socket/sock_impl.h @@ -14,6 +14,7 @@ struct TEndpointInfo { class TSocketDescriptor : public NActors::TSharedDescriptor, public TNetworkConfig { std::unique_ptr<TNetworkConfig::TSocketType> Socket; std::shared_ptr<TEndpointInfo> Endpoint; + TSpinLock Lock; public: TSocketDescriptor(TSocketType&& s, std::shared_ptr<TEndpointInfo> endpoint) @@ -56,6 +57,7 @@ public: int UpgradeToSecure() { std::unique_ptr<TNetworkConfig::TSecureSocketType> socket = std::make_unique<TNetworkConfig::TSecureSocketType>(std::move(*Socket)); int res = socket->SecureAccept(Endpoint->SecureContext.get()); + TGuard lock(Lock); Socket.reset(socket.release()); return res; } @@ -81,6 +83,7 @@ public: } SOCKET GetRawSocket() const { + TGuard lock(Lock); return *Socket; } |