aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-09-21 19:13:53 +0300
committertesseract <tesseract@yandex-team.com>2023-09-21 20:01:08 +0300
commit247be8bf0216724b69ebb739eb1b87a9352963da (patch)
treec35756aea78216de265c477808fb0c17edb81fd6
parente7d500fdc7894d82b7e2d827fdbde9cfbe0c2f72 (diff)
downloadydb-247be8bf0216724b69ebb739eb1b87a9352963da.tar.gz
Fix epoll_ctl(EPOLL_CTL_MOD) failed with Bad file descriptor
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp9
-rw-r--r--ydb/core/raw_socket/sock_impl.h3
2 files changed, 8 insertions, 4 deletions
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 6aaba8d9116..55fcaa6323d 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -101,7 +101,6 @@ public:
Become(&TKafkaConnection::StateAccepting);
Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false));
KAFKA_LOG_I("incoming connection opened " << Address);
- OnAccept();
}
void PassAway() override {
@@ -197,6 +196,7 @@ protected:
void HandleAccepting(TEvPollerRegisterResult::TPtr ev) {
PollerToken = std::move(ev->Get()->PollerToken);
+ UpgradeToSecure();
OnAccept();
}
@@ -415,18 +415,18 @@ protected:
}
}
- void DoRead(const TActorContext& ctx) {
+ void UpgradeToSecure() {
if (IsSslRequired && !IsSslActive) {
int res = Socket->TryUpgradeToSecure();
if (res < 0) {
KAFKA_LOG_ERROR("connection closed - error in UpgradeToSecure: " << strerror(-res));
return PassAway();
}
- RequestPoller();
IsSslActive = true;
- return;
}
+ }
+ void DoRead(const TActorContext& ctx) {
KAFKA_LOG_T("DoRead: Demand=" << Demand.Length << ", Step=" << static_cast<i32>(Step));
for (;;) {
while (Demand) {
@@ -589,6 +589,7 @@ protected:
void HandleConnected(TEvPollerRegisterResult::TPtr ev) {
PollerToken = std::move(ev->Get()->PollerToken);
+ UpgradeToSecure();
PollerToken->Request(true, true);
}
diff --git a/ydb/core/raw_socket/sock_impl.h b/ydb/core/raw_socket/sock_impl.h
index fc52872d1cb..642a59e2489 100644
--- a/ydb/core/raw_socket/sock_impl.h
+++ b/ydb/core/raw_socket/sock_impl.h
@@ -14,6 +14,7 @@ struct TEndpointInfo {
class TSocketDescriptor : public NActors::TSharedDescriptor, public TNetworkConfig {
std::unique_ptr<TNetworkConfig::TSocketType> Socket;
std::shared_ptr<TEndpointInfo> Endpoint;
+ TSpinLock Lock;
public:
TSocketDescriptor(TSocketType&& s, std::shared_ptr<TEndpointInfo> endpoint)
@@ -56,6 +57,7 @@ public:
int UpgradeToSecure() {
std::unique_ptr<TNetworkConfig::TSecureSocketType> socket = std::make_unique<TNetworkConfig::TSecureSocketType>(std::move(*Socket));
int res = socket->SecureAccept(Endpoint->SecureContext.get());
+ TGuard lock(Lock);
Socket.reset(socket.release());
return res;
}
@@ -81,6 +83,7 @@ public:
}
SOCKET GetRawSocket() const {
+ TGuard lock(Lock);
return *Socket;
}