diff options
author | alexvru <[email protected]> | 2023-11-17 21:51:27 +0300 |
---|---|---|
committer | alexvru <[email protected]> | 2023-11-17 22:24:31 +0300 |
commit | 9fb3a015a1c695b4f6ee2196404e7b2a06e6d677 (patch) | |
tree | dfef740b72a2662862caf1eadb53da9b53c97456 | |
parent | 983dd8625f86eab8f1af1b53c44ad49df392dadc (diff) |
Fix EPOLLET logic to keep old behaviour for inoptimal user code KIKIMR-20021
14 files changed, 128 insertions, 48 deletions
diff --git a/library/cpp/actors/http/http_proxy_acceptor.cpp b/library/cpp/actors/http/http_proxy_acceptor.cpp index f9ee1d8032e..c007f747eb3 100644 --- a/library/cpp/actors/http/http_proxy_acceptor.cpp +++ b/library/cpp/actors/http/http_proxy_acceptor.cpp @@ -116,6 +116,12 @@ protected: SocketAddressType addr; std::optional<SocketType> s = Socket->Socket.Accept(addr); if (!s) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + Y_ABORT_UNLESS(PollerToken); + if (PollerToken->RequestReadNotificationAfterWouldBlock()) { + continue; // we can try it again + } + } break; } TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor(std::move(s).value()); @@ -130,11 +136,6 @@ protected: ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId)); Connections.emplace(connectionId); } - int err = errno; - if (err == EAGAIN || err == EWOULDBLOCK) { // request poller for further connection polling - Y_ABORT_UNLESS(PollerToken); - PollerToken->Request(true, false); - } } void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) { diff --git a/library/cpp/actors/http/http_proxy_incoming.cpp b/library/cpp/actors/http/http_proxy_incoming.cpp index 68bd5f6ecb0..b98b3c09f36 100644 --- a/library/cpp/actors/http/http_proxy_incoming.cpp +++ b/library/cpp/actors/http/http_proxy_incoming.cpp @@ -79,16 +79,19 @@ protected: void OnAccept(const NActors::TActorContext& ctx) { int res; bool read = false, write = false; - if ((res = TSocketImpl::OnAccept(Endpoint, read, write)) != 1) { - if (-res == EAGAIN) { - if (PollerToken) { - PollerToken->Request(read, write); + for (;;) { + if ((res = TSocketImpl::OnAccept(Endpoint, read, write)) != 1) { + if (-res == EAGAIN) { + if (PollerToken && PollerToken->RequestReadNotificationAfterWouldBlock()) { + continue; + } + return; // wait for further notifications + } else { + LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Accept: " << strerror(-res)); + return Die(ctx); } - return; // wait for further notifications - } else { - LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Accept: " << strerror(-res)); - return Die(ctx); } + break; } TBase::Become(&TIncomingConnectionActor::StateConnected); ctx.Send(ctx.SelfID, new TEvPollerReady(nullptr, true, true)); @@ -147,7 +150,9 @@ protected: if (!read && !write) { read = true; } - PollerToken->Request(read, write); + if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) { + continue; + } } break; } else if (-res == EINTR) { @@ -258,7 +263,9 @@ protected: if (!read && !write) { write = true; } - PollerToken->Request(read, write); + if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) { + continue; + } } break; } else { diff --git a/library/cpp/actors/http/http_proxy_outgoing.cpp b/library/cpp/actors/http/http_proxy_outgoing.cpp index 3bb23d6fae8..b1f27c2c5a1 100644 --- a/library/cpp/actors/http/http_proxy_outgoing.cpp +++ b/library/cpp/actors/http/http_proxy_outgoing.cpp @@ -107,7 +107,9 @@ protected: if (!read && !write) { write = true; } - PollerToken->Request(read, write); + if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) { + continue; + } } break; } else { @@ -144,7 +146,9 @@ protected: if (!read && !write) { read = true; } - PollerToken->Request(read, write); + if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) { + continue; + } } return; } else { diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index eae80d8c5cd..8c347507b2d 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -164,8 +164,9 @@ namespace NActors { } void WaitPoller(bool read, bool write, TString state) { - PollerToken->Request(read, write); - Actor->WaitForSpecificEvent<TEvPollerReady>(std::move(state)); + if (!PollerToken->RequestNotificationAfterWouldBlock(read, write)) { + Actor->WaitForSpecificEvent<TEvPollerReady>(std::move(state)); + } } template <typename TDataPtr, typename TSendRecvFunc> diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp index 2866a5ab6c9..f8db077fa40 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.cpp +++ b/library/cpp/actors/interconnect/interconnect_stream.cpp @@ -214,8 +214,12 @@ namespace NInterconnect { token.Request(read, write); } - bool TStreamSocket::RequestEx(NActors::TPollerToken& token, bool read, bool write) { - return token.Request(read, write); + bool TStreamSocket::RequestReadNotificationAfterWouldBlock(NActors::TPollerToken& token) { + return token.RequestReadNotificationAfterWouldBlock(); + } + + bool TStreamSocket::RequestWriteNotificationAfterWouldBlock(NActors::TPollerToken& token) { + return token.RequestWriteNotificationAfterWouldBlock(); } size_t TStreamSocket::ExpectedWriteLength() const { @@ -655,8 +659,19 @@ namespace NInterconnect { token.Request(WantRead(), WantWrite()); } - bool TSecureSocket::RequestEx(NActors::TPollerToken& token, bool /*read*/, bool /*write*/) { - return token.Request(WantRead(), WantWrite(), true); + bool TSecureSocket::RequestReadNotificationAfterWouldBlock(NActors::TPollerToken& token) { + bool result = false; + if (WantRead()) { + result |= token.RequestReadNotificationAfterWouldBlock(); + } + if (WantWrite()) { + result |= token.RequestWriteNotificationAfterWouldBlock(); + } + return result; + } + + bool TSecureSocket::RequestWriteNotificationAfterWouldBlock(NActors::TPollerToken& token) { + return RequestReadNotificationAfterWouldBlock(token); } size_t TSecureSocket::ExpectedWriteLength() const { diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h index ed23087f820..b9ca804e0e5 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.h +++ b/library/cpp/actors/interconnect/interconnect_stream.h @@ -69,7 +69,8 @@ namespace NInterconnect { ui32 GetSendBufferSize() const; virtual void Request(NActors::TPollerToken& token, bool read, bool write); - virtual bool RequestEx(NActors::TPollerToken& token, bool read, bool write); + virtual bool RequestReadNotificationAfterWouldBlock(NActors::TPollerToken& token); + virtual bool RequestWriteNotificationAfterWouldBlock(NActors::TPollerToken& token); virtual size_t ExpectedWriteLength() const; }; @@ -124,7 +125,8 @@ namespace NInterconnect { bool WantRead() const; bool WantWrite() const; void Request(NActors::TPollerToken& token, bool read, bool write) override; - bool RequestEx(NActors::TPollerToken& token, bool read, bool write) override; + bool RequestReadNotificationAfterWouldBlock(NActors::TPollerToken& token) override; + bool RequestWriteNotificationAfterWouldBlock(NActors::TPollerToken& token) override; size_t ExpectedWriteLength() const override; }; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 49417abd532..1409f2cf0cd 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -742,7 +742,7 @@ namespace NActors { throw TExReestablishConnection{CloseInputSessionRequested ? TDisconnectReason::Debug() : recvres == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-recvres)}; } else if (token && !*readPending) { - if (socket.RequestEx(*token, true, false)) { + if (socket.RequestReadNotificationAfterWouldBlock(*token)) { continue; // can try again } else { *readPending = true; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp index 09c5987e81f..df0c172dc18 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp @@ -109,8 +109,8 @@ namespace NActors { Listener.Reset(); PollerToken.Reset(); Become(&TThis::Initial, TDuration::Seconds(1), new TEvents::TEvBootstrap); - } else if (PollerToken) { - PollerToken->Request(true, false); + } else if (PollerToken && PollerToken->RequestReadNotificationAfterWouldBlock()) { + continue; } break; } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 28c4e5ff297..7d6f8d012f7 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -545,17 +545,18 @@ namespace NActors { void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) { auto *msg = ev->Get(); + bool sendPollerReady = false; if (msg->Socket == Socket) { PollerToken = std::move(msg->PollerToken); - if (ReceiveContext->MainWriteBlocked) { - Socket->Request(*PollerToken, false, true); - } + sendPollerReady = ReceiveContext->MainWriteBlocked; } else if (msg->Socket == XdcSocket) { XdcPollerToken = std::move(msg->PollerToken); - if (ReceiveContext->XdcWriteBlocked) { - XdcSocket->Request(*XdcPollerToken, false, true); - } + sendPollerReady = ReceiveContext->XdcWriteBlocked; + } + + if (sendPollerReady) { + Send(SelfId(), new TEvPollerReady(msg->Socket, false, true)); } } @@ -573,7 +574,7 @@ namespace NActors { stream.Advance(r); totalWritten += r; } else if (r == -1) { - if (token && socket->RequestEx(*token, false, true)) { + if (token && socket->RequestWriteNotificationAfterWouldBlock(*token)) { continue; // we can try again } *writeBlocked = true; diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index 1f381a0b5b0..040bdf86517 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -230,9 +230,23 @@ namespace NActors { } } - bool Request(bool read, bool write, bool suppressNotify) { + void Request(bool read, bool write) { if (auto thread = Thread.lock()) { - return thread->Request(Record, read, write, suppressNotify); + thread->Request(Record, read, write, false, false); + } + } + + bool RequestReadNotificationAfterWouldBlock() { + if (auto thread = Thread.lock()) { + return thread->Request(Record, true, false, true, true); + } else { + return false; + } + } + + bool RequestWriteNotificationAfterWouldBlock() { + if (auto thread = Thread.lock()) { + return thread->Request(Record, false, true, true, true); } else { return false; } @@ -285,8 +299,16 @@ namespace NActors { TPollerToken::~TPollerToken() {} - bool TPollerToken::Request(bool read, bool write, bool suppressNotify) { - return Impl->Request(read, write, suppressNotify); + void TPollerToken::Request(bool read, bool write) { + Impl->Request(read, write); + } + + bool TPollerToken::RequestReadNotificationAfterWouldBlock() { + return Impl->RequestReadNotificationAfterWouldBlock(); + } + + bool TPollerToken::RequestWriteNotificationAfterWouldBlock() { + return Impl->RequestWriteNotificationAfterWouldBlock(); } IActor* CreatePollerActor() { diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h index f21d8a615ea..4fb2d49a658 100644 --- a/library/cpp/actors/interconnect/poller_actor.h +++ b/library/cpp/actors/interconnect/poller_actor.h @@ -27,7 +27,16 @@ namespace NActors { public: ~TPollerToken(); - bool Request(bool read, bool write, bool suppressNotify = false); + void Request(bool read, bool write); + bool RequestReadNotificationAfterWouldBlock(); + bool RequestWriteNotificationAfterWouldBlock(); + + bool RequestNotificationAfterWouldBlock(bool read, bool write) { + bool status = false; + status |= read && RequestReadNotificationAfterWouldBlock(); + status |= write && RequestWriteNotificationAfterWouldBlock(); + return status; + } using TPtr = TIntrusivePtr<TPollerToken>; }; diff --git a/library/cpp/actors/interconnect/poller_actor_darwin.h b/library/cpp/actors/interconnect/poller_actor_darwin.h index d00aa766f00..cd763ac589c 100644 --- a/library/cpp/actors/interconnect/poller_actor_darwin.h +++ b/library/cpp/actors/interconnect/poller_actor_darwin.h @@ -92,7 +92,8 @@ namespace NActors { SafeKevent(ev, 2); } - bool Request(const TIntrusivePtr<TSocketRecord>& /*socket*/, bool /*read*/, bool /*write*/, bool /*suppressNotify*/) { + bool Request(const TIntrusivePtr<TSocketRecord>& /*socket*/, bool /*read*/, bool /*write*/, bool /*suppressNotify*/, + bool /*afterWouldBlock*/) { return false; // no special processing here as we use kqueue in edge-triggered mode } }; diff --git a/library/cpp/actors/interconnect/poller_actor_linux.h b/library/cpp/actors/interconnect/poller_actor_linux.h index 183135d6b07..2cd557e347a 100644 --- a/library/cpp/actors/interconnect/poller_actor_linux.h +++ b/library/cpp/actors/interconnect/poller_actor_linux.h @@ -62,7 +62,8 @@ namespace NActors { if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) { const bool read = ev.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR); const bool write = ev.events & (EPOLLOUT | EPOLLERR); - UpdateFlags(record, (read ? ReadHit : 0) | (write ? WriteHit : 0), false); + UpdateFlags(record, (read ? ReadHit : 0) | (write ? WriteHit : 0), false /*suppressNotify*/, + false /*checkQueues*/); } else { res = true; } @@ -71,19 +72,32 @@ namespace NActors { return res; } - bool UpdateFlags(TSocketRecord *record, ui32 addMask, bool suppressNotify) { + bool UpdateFlags(TSocketRecord *record, ui32 addMask, bool suppressNotify, bool checkQueues) { ui32 flags = record->Flags.load(std::memory_order_acquire); for (;;) { ui32 updated = flags | addMask; static constexpr ui32 fullRead = ReadExpected | ReadHit; static constexpr ui32 fullWrite = WriteExpected | WriteHit; - const bool read = (updated & fullRead) == fullRead; - const bool write = (updated & fullWrite) == fullWrite; + bool read = (updated & fullRead) == fullRead; + bool write = (updated & fullWrite) == fullWrite; updated &= ~((read ? fullRead : 0) | (write ? fullWrite : 0)); if (record->Flags.compare_exchange_weak(flags, updated, std::memory_order_acq_rel)) { if (suppressNotify) { return read || write; } else { + if (checkQueues) { + pollfd fd; + fd.fd = record->Socket->GetDescriptor(); + const bool queryRead = updated & ReadExpected && !read; + const bool queryWrite = updated & WriteExpected && !write; + if (queryRead || queryWrite) { + fd.events = (queryRead ? POLLIN : 0) | (queryWrite ? POLLOUT : 0); + if (poll(&fd, 1, 0) != -1) { + read = queryRead && fd.revents & (POLLIN | POLLHUP | POLLRDHUP | POLLERR); + write = queryWrite && fd.revents & (POLLOUT | POLLERR); + } + } + } Notify(record, read, write); return false; } @@ -106,8 +120,10 @@ namespace NActors { } } - bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool suppressNotify) { - return UpdateFlags(record.Get(), (read ? ReadExpected : 0) | (write ? WriteExpected : 0), suppressNotify); + bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool suppressNotify, + bool afterWouldBlock) { + return UpdateFlags(record.Get(), (read ? ReadExpected : 0) | (write ? WriteExpected : 0), suppressNotify, + !afterWouldBlock); } }; diff --git a/library/cpp/actors/interconnect/poller_actor_win.h b/library/cpp/actors/interconnect/poller_actor_win.h index 0fdc6ad6cbe..a4b213ff8c1 100644 --- a/library/cpp/actors/interconnect/poller_actor_win.h +++ b/library/cpp/actors/interconnect/poller_actor_win.h @@ -94,7 +94,8 @@ namespace NActors { ExecuteSyncOperation(TPollerWakeup()); } - bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool /*suppressNotify*/) { + bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool /*suppressNotify*/, + bool /*afterWouldBlock*/) { with_lock (Mutex) { const auto it = Descriptors.find(record->Socket->GetDescriptor()); Y_ABORT_UNLESS(it != Descriptors.end()); |