diff options
author | alexvru <alexvru@ydb.tech> | 2023-04-13 13:29:32 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-04-13 13:29:32 +0300 |
commit | d2601d3b39a391a86e0e0abde10694815380de8f (patch) | |
tree | 3e74c951be7f97a847d81a8e6427c7a3a0bf825b /library/cpp/actors/interconnect/poller_actor.cpp | |
parent | 89ca66f252da3a017f748b167553026305555021 (diff) | |
download | ydb-d2601d3b39a391a86e0e0abde10694815380de8f.tar.gz |
Remove excessive reading from the pipe
Diffstat (limited to 'library/cpp/actors/interconnect/poller_actor.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/poller_actor.cpp | 46 |
1 files changed, 23 insertions, 23 deletions
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp index e75cbcaef4..04e17c24ab 100644 --- a/library/cpp/actors/interconnect/poller_actor.cpp +++ b/library/cpp/actors/interconnect/poller_actor.cpp @@ -146,8 +146,7 @@ namespace NActors { wrapper.Wait(); } - bool DrainReadEnd() { - size_t totalRead = 0; + void DrainReadEnd() { char buffer[4096]; for (;;) { ssize_t n = ReadEnd.Read(buffer, sizeof(buffer)); @@ -162,37 +161,38 @@ namespace NActors { } } else { Y_VERIFY(n); - totalRead += n; } } - return totalRead; } bool ProcessSyncOpQueue() { - if (DrainReadEnd()) { - Y_VERIFY(!SyncOperationsQ.IsEmpty()); - do { - TPollerSyncOperationWrapper *op = SyncOperationsQ.Top(); - if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) { - static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket); - op->SignalDone(); - } else if (std::get_if<TPollerExitThread>(&op->Operation)) { - op->SignalDone(); - return false; // terminate the thread - } else if (std::get_if<TPollerWakeup>(&op->Operation)) { - op->SignalDone(); - } else { - Y_FAIL(); - } - } while (SyncOperationsQ.Pop()); - } + Y_VERIFY(!SyncOperationsQ.IsEmpty()); + do { + TPollerSyncOperationWrapper *op = SyncOperationsQ.Top(); + if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) { + static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket); + op->SignalDone(); + } else if (std::get_if<TPollerExitThread>(&op->Operation)) { + op->SignalDone(); + return false; // terminate the thread + } else if (std::get_if<TPollerWakeup>(&op->Operation)) { + op->SignalDone(); + } else { + Y_FAIL(); + } + } while (SyncOperationsQ.Pop()); return true; } void *ThreadProc() override { SetCurrentThreadName("network poller"); - while (ProcessSyncOpQueue()) { - static_cast<TDerived&>(*this).ProcessEventsInLoop(); + for (;;) { + if (static_cast<TDerived&>(*this).ProcessEventsInLoop()) { // need to process the queue + DrainReadEnd(); + if (!ProcessSyncOpQueue()) { + break; + } + } } return nullptr; } |