diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-05-05 11:09:01 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-05-05 11:09:01 +0300 |
commit | b5a989b16cafa8a3b3bc076f1097a0eda6f48c06 (patch) | |
tree | 4da744117a5aab37758921fa43b95a3068e5aec1 /library/cpp/actors/interconnect | |
parent | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (diff) | |
download | ydb-b5a989b16cafa8a3b3bc076f1097a0eda6f48c06.tar.gz |
Ydb stable 23-1-2623.1.26
x-stable-origin-commit: 22184a7e157553d447f17a2dffc4ea2d32dfd74d
Diffstat (limited to 'library/cpp/actors/interconnect')
-rw-r--r-- | library/cpp/actors/interconnect/poller_actor.cpp | 46 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/poller_actor_darwin.h | 10 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/poller_actor_linux.h | 12 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/poller_actor_win.h | 10 |
4 files changed, 48 insertions, 30 deletions
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index e75cbcaef4..04e17c24ab 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -146,8 +146,7 @@ namespace NActors { wrapper.Wait(); } - bool DrainReadEnd() { - size_t totalRead = 0; + void DrainReadEnd() { char buffer[4096]; for (;;) { ssize_t n = ReadEnd.Read(buffer, sizeof(buffer)); @@ -162,37 +161,38 @@ namespace NActors { } } else { Y_VERIFY(n); - totalRead += n; } } - return totalRead; } bool ProcessSyncOpQueue() { - if (DrainReadEnd()) { - Y_VERIFY(!SyncOperationsQ.IsEmpty()); - do { - TPollerSyncOperationWrapper *op = SyncOperationsQ.Top(); - if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) { - static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket); - op->SignalDone(); - } else if (std::get_if<TPollerExitThread>(&op->Operation)) { - op->SignalDone(); - return false; // terminate the thread - } else if (std::get_if<TPollerWakeup>(&op->Operation)) { - op->SignalDone(); - } else { - Y_FAIL(); - } - } while (SyncOperationsQ.Pop()); - } + Y_VERIFY(!SyncOperationsQ.IsEmpty()); + do { + TPollerSyncOperationWrapper *op = SyncOperationsQ.Top(); + if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) { + static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket); + op->SignalDone(); + } else if (std::get_if<TPollerExitThread>(&op->Operation)) { + op->SignalDone(); + return false; // terminate the thread + } else if (std::get_if<TPollerWakeup>(&op->Operation)) { + op->SignalDone(); + } else { + Y_FAIL(); + } + } while (SyncOperationsQ.Pop()); return true; } void *ThreadProc() override { SetCurrentThreadName("network poller"); - while (ProcessSyncOpQueue()) { - static_cast<TDerived&>(*this).ProcessEventsInLoop(); + for (;;) { + if (static_cast<TDerived&>(*this).ProcessEventsInLoop()) { // need to process the queue + DrainReadEnd(); + if (!ProcessSyncOpQueue()) { + break; + } + } } return nullptr; } diff --git a/library/cpp/actors/interconnect/poller_actor_darwin.h b/library/cpp/actors/interconnect/poller_actor_darwin.h index 4cb0a58f8d..31c1144794 100644 --- a/library/cpp/actors/interconnect/poller_actor_darwin.h +++ b/library/cpp/actors/interconnect/poller_actor_darwin.h @@ -45,18 +45,20 @@ namespace NActors { close(KqDescriptor); } - void ProcessEventsInLoop() { + bool ProcessEventsInLoop() { std::array<struct kevent, 256> events; int numReady = kevent(KqDescriptor, nullptr, 0, events.data(), events.size(), nullptr); if (numReady == -1) { if (errno == EINTR) { - return; + return false; } else { Y_FAIL("kevent() failed with %s", strerror(errno)); } } + bool res = false; + for (int i = 0; i < numReady; ++i) { const struct kevent& ev = events[i]; if (ev.udata) { @@ -65,8 +67,12 @@ namespace NActors { const bool read = error || ev.filter == EVFILT_READ; const bool write = error || ev.filter == EVFILT_WRITE; Notify(it, read, write); + } else { + res = true; } } + + return res; } void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) { diff --git a/library/cpp/actors/interconnect/poller_actor_linux.h b/library/cpp/actors/interconnect/poller_actor_linux.h index dd4f7c0124..6bd2cc258f 100644 --- a/library/cpp/actors/interconnect/poller_actor_linux.h +++ b/library/cpp/actors/interconnect/poller_actor_linux.h @@ -30,7 +30,7 @@ namespace NActors { close(EpollDescriptor); } - void ProcessEventsInLoop() { + bool ProcessEventsInLoop() { // preallocated array for events std::array<epoll_event, 256> events; @@ -42,12 +42,14 @@ namespace NActors { // check return status for any errors if (numReady == -1) { if (errno == EINTR) { - return; // restart the call a bit later + return false; // restart the call a bit later } else { Y_FAIL("epoll_wait() failed with %s", strerror(errno)); } } + bool res = false; + for (int i = 0; i < numReady; ++i) { const epoll_event& ev = events[i]; if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) { @@ -73,8 +75,12 @@ namespace NActors { // issue notifications Notify(record, read, write); + } else { + res = true; } } + + return res; } void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) { @@ -110,5 +116,5 @@ namespace NActors { }; using TPollerThread = TEpollThread; - + } // namespace NActors diff --git a/library/cpp/actors/interconnect/poller_actor_win.h b/library/cpp/actors/interconnect/poller_actor_win.h index 4b4caa0ebd..e593cbafd1 100644 --- a/library/cpp/actors/interconnect/poller_actor_win.h +++ b/library/cpp/actors/interconnect/poller_actor_win.h @@ -23,7 +23,7 @@ namespace NActors { Stop(); } - void ProcessEventsInLoop() { + bool ProcessEventsInLoop() { fd_set readfds, writefds, exceptfds; FD_ZERO(&readfds); @@ -51,12 +51,14 @@ namespace NActors { if (res == -1) { const int err = LastSocketError(); if (err == EINTR) { - return; // try a bit later + return false; // try a bit later } else { Y_FAIL("select() failed with %s", strerror(err)); } } + bool flag = false; + with_lock (Mutex) { for (const auto& [fd, record] : Descriptors) { if (record) { @@ -70,9 +72,13 @@ namespace NActors { record->Flags &= ~WRITE; } Notify(record.Get(), read, write); + } else { + flag = true; } } } + + return flag; } void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) { |