aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/interconnect/poller_actor.cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-13 13:29:32 +0300
committeralexvru <alexvru@ydb.tech>2023-04-13 13:29:32 +0300
commitd2601d3b39a391a86e0e0abde10694815380de8f (patch)
tree3e74c951be7f97a847d81a8e6427c7a3a0bf825b /library/cpp/actors/interconnect/poller_actor.cpp
parent89ca66f252da3a017f748b167553026305555021 (diff)
downloadydb-d2601d3b39a391a86e0e0abde10694815380de8f.tar.gz
Remove excessive reading from the pipe
Diffstat (limited to 'library/cpp/actors/interconnect/poller_actor.cpp')
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp46
1 files changed, 23 insertions, 23 deletions
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index e75cbcaef4..04e17c24ab 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -146,8 +146,7 @@ namespace NActors {
wrapper.Wait();
}
- bool DrainReadEnd() {
- size_t totalRead = 0;
+ void DrainReadEnd() {
char buffer[4096];
for (;;) {
ssize_t n = ReadEnd.Read(buffer, sizeof(buffer));
@@ -162,37 +161,38 @@ namespace NActors {
}
} else {
Y_VERIFY(n);
- totalRead += n;
}
}
- return totalRead;
}
bool ProcessSyncOpQueue() {
- if (DrainReadEnd()) {
- Y_VERIFY(!SyncOperationsQ.IsEmpty());
- do {
- TPollerSyncOperationWrapper *op = SyncOperationsQ.Top();
- if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) {
- static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket);
- op->SignalDone();
- } else if (std::get_if<TPollerExitThread>(&op->Operation)) {
- op->SignalDone();
- return false; // terminate the thread
- } else if (std::get_if<TPollerWakeup>(&op->Operation)) {
- op->SignalDone();
- } else {
- Y_FAIL();
- }
- } while (SyncOperationsQ.Pop());
- }
+ Y_VERIFY(!SyncOperationsQ.IsEmpty());
+ do {
+ TPollerSyncOperationWrapper *op = SyncOperationsQ.Top();
+ if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) {
+ static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket);
+ op->SignalDone();
+ } else if (std::get_if<TPollerExitThread>(&op->Operation)) {
+ op->SignalDone();
+ return false; // terminate the thread
+ } else if (std::get_if<TPollerWakeup>(&op->Operation)) {
+ op->SignalDone();
+ } else {
+ Y_FAIL();
+ }
+ } while (SyncOperationsQ.Pop());
return true;
}
void *ThreadProc() override {
SetCurrentThreadName("network poller");
- while (ProcessSyncOpQueue()) {
- static_cast<TDerived&>(*this).ProcessEventsInLoop();
+ for (;;) {
+ if (static_cast<TDerived&>(*this).ProcessEventsInLoop()) { // need to process the queue
+ DrainReadEnd();
+ if (!ProcessSyncOpQueue()) {
+ break;
+ }
+ }
}
return nullptr;
}