aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-04-13 13:29:32 +0300
committeralexvru <alexvru@ydb.tech>2023-04-13 13:29:32 +0300
commitd2601d3b39a391a86e0e0abde10694815380de8f (patch)
tree3e74c951be7f97a847d81a8e6427c7a3a0bf825b
parent89ca66f252da3a017f748b167553026305555021 (diff)
downloadydb-d2601d3b39a391a86e0e0abde10694815380de8f.tar.gz
Remove excessive reading from the pipe
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp46
-rw-r--r--library/cpp/actors/interconnect/poller_actor_darwin.h10
-rw-r--r--library/cpp/actors/interconnect/poller_actor_linux.h12
-rw-r--r--library/cpp/actors/interconnect/poller_actor_win.h10
4 files changed, 48 insertions, 30 deletions
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index e75cbcaef4..04e17c24ab 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -146,8 +146,7 @@ namespace NActors {
wrapper.Wait();
}
- bool DrainReadEnd() {
- size_t totalRead = 0;
+ void DrainReadEnd() {
char buffer[4096];
for (;;) {
ssize_t n = ReadEnd.Read(buffer, sizeof(buffer));
@@ -162,37 +161,38 @@ namespace NActors {
}
} else {
Y_VERIFY(n);
- totalRead += n;
}
}
- return totalRead;
}
bool ProcessSyncOpQueue() {
- if (DrainReadEnd()) {
- Y_VERIFY(!SyncOperationsQ.IsEmpty());
- do {
- TPollerSyncOperationWrapper *op = SyncOperationsQ.Top();
- if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) {
- static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket);
- op->SignalDone();
- } else if (std::get_if<TPollerExitThread>(&op->Operation)) {
- op->SignalDone();
- return false; // terminate the thread
- } else if (std::get_if<TPollerWakeup>(&op->Operation)) {
- op->SignalDone();
- } else {
- Y_FAIL();
- }
- } while (SyncOperationsQ.Pop());
- }
+ Y_VERIFY(!SyncOperationsQ.IsEmpty());
+ do {
+ TPollerSyncOperationWrapper *op = SyncOperationsQ.Top();
+ if (auto *unregister = std::get_if<TPollerUnregisterSocket>(&op->Operation)) {
+ static_cast<TDerived&>(*this).UnregisterSocketInLoop(unregister->Socket);
+ op->SignalDone();
+ } else if (std::get_if<TPollerExitThread>(&op->Operation)) {
+ op->SignalDone();
+ return false; // terminate the thread
+ } else if (std::get_if<TPollerWakeup>(&op->Operation)) {
+ op->SignalDone();
+ } else {
+ Y_FAIL();
+ }
+ } while (SyncOperationsQ.Pop());
return true;
}
void *ThreadProc() override {
SetCurrentThreadName("network poller");
- while (ProcessSyncOpQueue()) {
- static_cast<TDerived&>(*this).ProcessEventsInLoop();
+ for (;;) {
+ if (static_cast<TDerived&>(*this).ProcessEventsInLoop()) { // need to process the queue
+ DrainReadEnd();
+ if (!ProcessSyncOpQueue()) {
+ break;
+ }
+ }
}
return nullptr;
}
diff --git a/library/cpp/actors/interconnect/poller_actor_darwin.h b/library/cpp/actors/interconnect/poller_actor_darwin.h
index 4cb0a58f8d..31c1144794 100644
--- a/library/cpp/actors/interconnect/poller_actor_darwin.h
+++ b/library/cpp/actors/interconnect/poller_actor_darwin.h
@@ -45,18 +45,20 @@ namespace NActors {
close(KqDescriptor);
}
- void ProcessEventsInLoop() {
+ bool ProcessEventsInLoop() {
std::array<struct kevent, 256> events;
int numReady = kevent(KqDescriptor, nullptr, 0, events.data(), events.size(), nullptr);
if (numReady == -1) {
if (errno == EINTR) {
- return;
+ return false;
} else {
Y_FAIL("kevent() failed with %s", strerror(errno));
}
}
+ bool res = false;
+
for (int i = 0; i < numReady; ++i) {
const struct kevent& ev = events[i];
if (ev.udata) {
@@ -65,8 +67,12 @@ namespace NActors {
const bool read = error || ev.filter == EVFILT_READ;
const bool write = error || ev.filter == EVFILT_WRITE;
Notify(it, read, write);
+ } else {
+ res = true;
}
}
+
+ return res;
}
void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
diff --git a/library/cpp/actors/interconnect/poller_actor_linux.h b/library/cpp/actors/interconnect/poller_actor_linux.h
index dd4f7c0124..6bd2cc258f 100644
--- a/library/cpp/actors/interconnect/poller_actor_linux.h
+++ b/library/cpp/actors/interconnect/poller_actor_linux.h
@@ -30,7 +30,7 @@ namespace NActors {
close(EpollDescriptor);
}
- void ProcessEventsInLoop() {
+ bool ProcessEventsInLoop() {
// preallocated array for events
std::array<epoll_event, 256> events;
@@ -42,12 +42,14 @@ namespace NActors {
// check return status for any errors
if (numReady == -1) {
if (errno == EINTR) {
- return; // restart the call a bit later
+ return false; // restart the call a bit later
} else {
Y_FAIL("epoll_wait() failed with %s", strerror(errno));
}
}
+ bool res = false;
+
for (int i = 0; i < numReady; ++i) {
const epoll_event& ev = events[i];
if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) {
@@ -73,8 +75,12 @@ namespace NActors {
// issue notifications
Notify(record, read, write);
+ } else {
+ res = true;
}
}
+
+ return res;
}
void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
@@ -110,5 +116,5 @@ namespace NActors {
};
using TPollerThread = TEpollThread;
-
+
} // namespace NActors
diff --git a/library/cpp/actors/interconnect/poller_actor_win.h b/library/cpp/actors/interconnect/poller_actor_win.h
index 4b4caa0ebd..e593cbafd1 100644
--- a/library/cpp/actors/interconnect/poller_actor_win.h
+++ b/library/cpp/actors/interconnect/poller_actor_win.h
@@ -23,7 +23,7 @@ namespace NActors {
Stop();
}
- void ProcessEventsInLoop() {
+ bool ProcessEventsInLoop() {
fd_set readfds, writefds, exceptfds;
FD_ZERO(&readfds);
@@ -51,12 +51,14 @@ namespace NActors {
if (res == -1) {
const int err = LastSocketError();
if (err == EINTR) {
- return; // try a bit later
+ return false; // try a bit later
} else {
Y_FAIL("select() failed with %s", strerror(err));
}
}
+ bool flag = false;
+
with_lock (Mutex) {
for (const auto& [fd, record] : Descriptors) {
if (record) {
@@ -70,9 +72,13 @@ namespace NActors {
record->Flags &= ~WRITE;
}
Notify(record.Get(), read, write);
+ } else {
+ flag = true;
}
}
}
+
+ return flag;
}
void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {