diff options
author | alexvru <alexvru@ydb.tech> | 2023-11-08 18:02:27 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-11-08 18:54:10 +0300 |
commit | 97fad0e8cd21ff5617a610d368396729ac893df8 (patch) | |
tree | 7360fbd11a19906761d258889aad5b62e0ea7426 /library/cpp | |
parent | df03382e94a5f170d1b7b0c6e5bb949b3c9ad043 (diff) | |
download | ydb-97fad0e8cd21ff5617a610d368396729ac893df8.tar.gz |
Switch epoll back to EPOLLET KIKIMR-20021
Diffstat (limited to 'library/cpp')
9 files changed, 103 insertions, 84 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp index ab3b74fa572..2866a5ab6c9 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.cpp +++ b/library/cpp/actors/interconnect/interconnect_stream.cpp @@ -214,6 +214,10 @@ namespace NInterconnect { token.Request(read, write); } + bool TStreamSocket::RequestEx(NActors::TPollerToken& token, bool read, bool write) { + return token.Request(read, write); + } + size_t TStreamSocket::ExpectedWriteLength() const { return 0; } @@ -651,6 +655,10 @@ namespace NInterconnect { token.Request(WantRead(), WantWrite()); } + bool TSecureSocket::RequestEx(NActors::TPollerToken& token, bool /*read*/, bool /*write*/) { + return token.Request(WantRead(), WantWrite(), true); + } + size_t TSecureSocket::ExpectedWriteLength() const { return Impl->ExpectedWriteLength(); } diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h index 54139aed8dc..ed23087f820 100644 --- a/library/cpp/actors/interconnect/interconnect_stream.h +++ b/library/cpp/actors/interconnect/interconnect_stream.h @@ -69,6 +69,7 @@ namespace NInterconnect { ui32 GetSendBufferSize() const; virtual void Request(NActors::TPollerToken& token, bool read, bool write); + virtual bool RequestEx(NActors::TPollerToken& token, bool read, bool write); virtual size_t ExpectedWriteLength() const; }; @@ -123,6 +124,7 @@ namespace NInterconnect { bool WantRead() const; bool WantWrite() const; void Request(NActors::TPollerToken& token, bool read, bool write) override; + bool RequestEx(NActors::TPollerToken& token, bool read, bool write) override; size_t ExpectedWriteLength() const override; }; diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 3f5ebd29870..49417abd532 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -714,39 +714,45 @@ namespace NActors { ssize_t TInputSessionTCP::Read(NInterconnect::TStreamSocket& socket, const TPollerToken::TPtr& token, bool *readPending, const TIoVec *iov, size_t num) { - ssize_t recvres = 0; - TString err; - LWPROBE_IF_TOO_LONG(SlowICReadFromSocket, ms) { - do { - const ui64 begin = GetCycleCountFast(); - if (num == 1) { - recvres = socket.Recv(iov->Data, iov->Size, &err); - } else { - recvres = socket.ReadV(reinterpret_cast<const iovec*>(iov), num); - } - const ui64 end = GetCycleCountFast(); - Metrics->IncRecvSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond()); - } while (recvres == -EINTR); - } + for (;;) { + ssize_t recvres = 0; + TString err; + LWPROBE_IF_TOO_LONG(SlowICReadFromSocket, ms) { + do { + const ui64 begin = GetCycleCountFast(); + if (num == 1) { + recvres = socket.Recv(iov->Data, iov->Size, &err); + } else { + recvres = socket.ReadV(reinterpret_cast<const iovec*>(iov), num); + } + const ui64 end = GetCycleCountFast(); + Metrics->IncRecvSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond()); + } while (recvres == -EINTR); + } - LOG_DEBUG_IC_SESSION("ICIS12", "Read recvres# %zd num# %zu err# %s", recvres, num, err.data()); - - if (recvres <= 0 || CloseInputSessionRequested) { - if ((-recvres != EAGAIN && -recvres != EWOULDBLOCK) || CloseInputSessionRequested) { - TString message = CloseInputSessionRequested ? "connection closed by debug command" - : recvres == 0 ? "connection closed by peer" - : err ? err - : Sprintf("socket: %s", strerror(-recvres)); - LOG_NOTICE_NET(NodeId, "%s", message.data()); - throw TExReestablishConnection{CloseInputSessionRequested ? TDisconnectReason::Debug() : - recvres == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-recvres)}; - } else if (token && !std::exchange(*readPending, true)) { - socket.Request(*token, true, false); + LOG_DEBUG_IC_SESSION("ICIS12", "Read recvres# %zd num# %zu err# %s", recvres, num, err.data()); + + if (recvres <= 0 || CloseInputSessionRequested) { + if ((-recvres != EAGAIN && -recvres != EWOULDBLOCK) || CloseInputSessionRequested) { + TString message = CloseInputSessionRequested ? "connection closed by debug command" + : recvres == 0 ? "connection closed by peer" + : err ? err + : Sprintf("socket: %s", strerror(-recvres)); + LOG_NOTICE_NET(NodeId, "%s", message.data()); + throw TExReestablishConnection{CloseInputSessionRequested ? TDisconnectReason::Debug() : + recvres == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-recvres)}; + } else if (token && !*readPending) { + if (socket.RequestEx(*token, true, false)) { + continue; // can try again + } else { + *readPending = true; + } + } + return -1; } - return -1; - } - return recvres; + return recvres; + } } constexpr ui64 GetUsageCountClearMask(size_t items, int bits) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 25c05898d2a..28c4e5ff297 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -568,18 +568,21 @@ namespace NActors { size_t totalWritten = 0; if (stream && socket && !*writeBlocked) { - if (const ssize_t r = Write(stream, *socket, maxBytes); r > 0) { - stream.Advance(r); - totalWritten += r; - } else if (r == -1) { - *writeBlocked = true; - if (token) { - socket->Request(*token, false, true); + for (;;) { + if (const ssize_t r = Write(stream, *socket, maxBytes); r > 0) { + stream.Advance(r); + totalWritten += r; + } else if (r == -1) { + if (token && socket->RequestEx(*token, false, true)) { + continue; // we can try again + } + *writeBlocked = true; + } else if (r == 0) { + // error condition + } else { + Y_UNREACHABLE(); } - } else if (r == 0) { - // error condition - } else { - Y_UNREACHABLE(); + break; } } diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index 1f18e72a5ba..1f381a0b5b0 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -230,9 +230,11 @@ namespace NActors { } } - void Request(bool read, bool write) { + bool Request(bool read, bool write, bool suppressNotify) { if (auto thread = Thread.lock()) { - thread->Request(Record, read, write); + return thread->Request(Record, read, write, suppressNotify); + } else { + return false; } } @@ -283,8 +285,8 @@ namespace NActors { TPollerToken::~TPollerToken() {} - void TPollerToken::Request(bool read, bool write) { - Impl->Request(read, write); + bool TPollerToken::Request(bool read, bool write, bool suppressNotify) { + return Impl->Request(read, write, suppressNotify); } IActor* CreatePollerActor() { diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h index f927b820893..f21d8a615ea 100644 --- a/library/cpp/actors/interconnect/poller_actor.h +++ b/library/cpp/actors/interconnect/poller_actor.h @@ -27,7 +27,7 @@ namespace NActors { public: ~TPollerToken(); - void Request(bool read, bool write); + bool Request(bool read, bool write, bool suppressNotify = false); using TPtr = TIntrusivePtr<TPollerToken>; }; diff --git a/library/cpp/actors/interconnect/poller_actor_darwin.h b/library/cpp/actors/interconnect/poller_actor_darwin.h index a1f750e711f..d00aa766f00 100644 --- a/library/cpp/actors/interconnect/poller_actor_darwin.h +++ b/library/cpp/actors/interconnect/poller_actor_darwin.h @@ -92,8 +92,9 @@ namespace NActors { SafeKevent(ev, 2); } - void Request(const TIntrusivePtr<TSocketRecord>& /*socket*/, bool /*read*/, bool /*write*/) - {} // no special processing here as we use kqueue in edge-triggered mode + bool Request(const TIntrusivePtr<TSocketRecord>& /*socket*/, bool /*read*/, bool /*write*/, bool /*suppressNotify*/) { + return false; // no special processing here as we use kqueue in edge-triggered mode + } }; using TPollerThread = TKqueueThread; diff --git a/library/cpp/actors/interconnect/poller_actor_linux.h b/library/cpp/actors/interconnect/poller_actor_linux.h index 8831d2062b8..183135d6b07 100644 --- a/library/cpp/actors/interconnect/poller_actor_linux.h +++ b/library/cpp/actors/interconnect/poller_actor_linux.h @@ -4,6 +4,13 @@ namespace NActors { + enum { + ReadExpected = 1, + ReadHit = 2, + WriteExpected = 4, + WriteHit = 8, + }; + class TEpollThread : public TPollerThreadBase<TEpollThread> { // epoll file descriptor int EpollDescriptor; @@ -55,26 +62,7 @@ namespace NActors { if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) { const bool read = ev.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR); const bool write = ev.events & (EPOLLOUT | EPOLLERR); - - // remove hit flags from the bit set - ui32 flags = record->Flags; - const ui32 remove = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0); - while (!record->Flags.compare_exchange_weak(flags, flags & ~remove)) - {} - flags &= ~remove; - - // rearm poller if some flags remain - if (flags) { - epoll_event event; - event.events = EPOLLONESHOT | EPOLLRDHUP | flags; - event.data.ptr = record; - if (epoll_ctl(EpollDescriptor, EPOLL_CTL_MOD, record->Socket->GetDescriptor(), &event) == -1) { - Y_ABORT("epoll_ctl(EPOLL_CTL_MOD) failed with %s", strerror(errno)); - } - } - - // issue notifications - Notify(record, read, write); + UpdateFlags(record, (read ? ReadHit : 0) | (write ? WriteHit : 0), false); } else { res = true; } @@ -83,6 +71,26 @@ namespace NActors { return res; } + bool UpdateFlags(TSocketRecord *record, ui32 addMask, bool suppressNotify) { + ui32 flags = record->Flags.load(std::memory_order_acquire); + for (;;) { + ui32 updated = flags | addMask; + static constexpr ui32 fullRead = ReadExpected | ReadHit; + static constexpr ui32 fullWrite = WriteExpected | WriteHit; + const bool read = (updated & fullRead) == fullRead; + const bool write = (updated & fullWrite) == fullWrite; + updated &= ~((read ? fullRead : 0) | (write ? fullWrite : 0)); + if (record->Flags.compare_exchange_weak(flags, updated, std::memory_order_acq_rel)) { + if (suppressNotify) { + return read || write; + } else { + Notify(record, read, write); + return false; + } + } + } + } + void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) { if (epoll_ctl(EpollDescriptor, EPOLL_CTL_DEL, socket->GetDescriptor(), nullptr) == -1) { Y_ABORT("epoll_ctl(EPOLL_CTL_DEL) failed with %s", strerror(errno)); @@ -91,27 +99,15 @@ namespace NActors { void RegisterSocket(const TIntrusivePtr<TSocketRecord>& record) { epoll_event event; - event.events = EPOLLONESHOT | EPOLLRDHUP; + event.events = EPOLLET | EPOLLRDHUP | EPOLLIN | EPOLLOUT; event.data.ptr = record.Get(); if (epoll_ctl(EpollDescriptor, EPOLL_CTL_ADD, record->Socket->GetDescriptor(), &event) == -1) { Y_ABORT("epoll_ctl(EPOLL_CTL_ADD) failed with %s", strerror(errno)); } } - void Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write) { - const ui32 add = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0); - ui32 flags = record->Flags; - while (!record->Flags.compare_exchange_weak(flags, flags | add)) - {} - flags |= add; - if (flags) { - epoll_event event; - event.events = EPOLLONESHOT | EPOLLRDHUP | flags; - event.data.ptr = record.Get(); - if (epoll_ctl(EpollDescriptor, EPOLL_CTL_MOD, record->Socket->GetDescriptor(), &event) == -1) { - Y_ABORT("epoll_ctl(EPOLL_CTL_MOD) failed with %s", strerror(errno)); - } - } + bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool suppressNotify) { + return UpdateFlags(record.Get(), (read ? ReadExpected : 0) | (write ? WriteExpected : 0), suppressNotify); } }; diff --git a/library/cpp/actors/interconnect/poller_actor_win.h b/library/cpp/actors/interconnect/poller_actor_win.h index f59d3ddda7d..0fdc6ad6cbe 100644 --- a/library/cpp/actors/interconnect/poller_actor_win.h +++ b/library/cpp/actors/interconnect/poller_actor_win.h @@ -94,13 +94,14 @@ namespace NActors { ExecuteSyncOperation(TPollerWakeup()); } - void Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write) { + bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool /*suppressNotify*/) { with_lock (Mutex) { const auto it = Descriptors.find(record->Socket->GetDescriptor()); Y_ABORT_UNLESS(it != Descriptors.end()); it->second->Flags |= (read ? READ : 0) | (write ? WRITE : 0); } ExecuteSyncOperation(TPollerWakeup()); + return false; } }; |