aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-11-08 18:02:27 +0300
committeralexvru <alexvru@ydb.tech>2023-11-08 18:54:10 +0300
commit97fad0e8cd21ff5617a610d368396729ac893df8 (patch)
tree7360fbd11a19906761d258889aad5b62e0ea7426 /library/cpp
parentdf03382e94a5f170d1b7b0c6e5bb949b3c9ad043 (diff)
downloadydb-97fad0e8cd21ff5617a610d368396729ac893df8.tar.gz
Switch epoll back to EPOLLET KIKIMR-20021
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.h2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp66
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp25
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp10
-rw-r--r--library/cpp/actors/interconnect/poller_actor.h2
-rw-r--r--library/cpp/actors/interconnect/poller_actor_darwin.h5
-rw-r--r--library/cpp/actors/interconnect/poller_actor_linux.h66
-rw-r--r--library/cpp/actors/interconnect/poller_actor_win.h3
9 files changed, 103 insertions, 84 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp
index ab3b74fa572..2866a5ab6c9 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.cpp
+++ b/library/cpp/actors/interconnect/interconnect_stream.cpp
@@ -214,6 +214,10 @@ namespace NInterconnect {
token.Request(read, write);
}
+ bool TStreamSocket::RequestEx(NActors::TPollerToken& token, bool read, bool write) {
+ return token.Request(read, write);
+ }
+
size_t TStreamSocket::ExpectedWriteLength() const {
return 0;
}
@@ -651,6 +655,10 @@ namespace NInterconnect {
token.Request(WantRead(), WantWrite());
}
+ bool TSecureSocket::RequestEx(NActors::TPollerToken& token, bool /*read*/, bool /*write*/) {
+ return token.Request(WantRead(), WantWrite(), true);
+ }
+
size_t TSecureSocket::ExpectedWriteLength() const {
return Impl->ExpectedWriteLength();
}
diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h
index 54139aed8dc..ed23087f820 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.h
+++ b/library/cpp/actors/interconnect/interconnect_stream.h
@@ -69,6 +69,7 @@ namespace NInterconnect {
ui32 GetSendBufferSize() const;
virtual void Request(NActors::TPollerToken& token, bool read, bool write);
+ virtual bool RequestEx(NActors::TPollerToken& token, bool read, bool write);
virtual size_t ExpectedWriteLength() const;
};
@@ -123,6 +124,7 @@ namespace NInterconnect {
bool WantRead() const;
bool WantWrite() const;
void Request(NActors::TPollerToken& token, bool read, bool write) override;
+ bool RequestEx(NActors::TPollerToken& token, bool read, bool write) override;
size_t ExpectedWriteLength() const override;
};
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 3f5ebd29870..49417abd532 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -714,39 +714,45 @@ namespace NActors {
ssize_t TInputSessionTCP::Read(NInterconnect::TStreamSocket& socket, const TPollerToken::TPtr& token,
bool *readPending, const TIoVec *iov, size_t num) {
- ssize_t recvres = 0;
- TString err;
- LWPROBE_IF_TOO_LONG(SlowICReadFromSocket, ms) {
- do {
- const ui64 begin = GetCycleCountFast();
- if (num == 1) {
- recvres = socket.Recv(iov->Data, iov->Size, &err);
- } else {
- recvres = socket.ReadV(reinterpret_cast<const iovec*>(iov), num);
- }
- const ui64 end = GetCycleCountFast();
- Metrics->IncRecvSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond());
- } while (recvres == -EINTR);
- }
+ for (;;) {
+ ssize_t recvres = 0;
+ TString err;
+ LWPROBE_IF_TOO_LONG(SlowICReadFromSocket, ms) {
+ do {
+ const ui64 begin = GetCycleCountFast();
+ if (num == 1) {
+ recvres = socket.Recv(iov->Data, iov->Size, &err);
+ } else {
+ recvres = socket.ReadV(reinterpret_cast<const iovec*>(iov), num);
+ }
+ const ui64 end = GetCycleCountFast();
+ Metrics->IncRecvSyscalls((end - begin) * 1'000'000 / GetCyclesPerMillisecond());
+ } while (recvres == -EINTR);
+ }
- LOG_DEBUG_IC_SESSION("ICIS12", "Read recvres# %zd num# %zu err# %s", recvres, num, err.data());
-
- if (recvres <= 0 || CloseInputSessionRequested) {
- if ((-recvres != EAGAIN && -recvres != EWOULDBLOCK) || CloseInputSessionRequested) {
- TString message = CloseInputSessionRequested ? "connection closed by debug command"
- : recvres == 0 ? "connection closed by peer"
- : err ? err
- : Sprintf("socket: %s", strerror(-recvres));
- LOG_NOTICE_NET(NodeId, "%s", message.data());
- throw TExReestablishConnection{CloseInputSessionRequested ? TDisconnectReason::Debug() :
- recvres == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-recvres)};
- } else if (token && !std::exchange(*readPending, true)) {
- socket.Request(*token, true, false);
+ LOG_DEBUG_IC_SESSION("ICIS12", "Read recvres# %zd num# %zu err# %s", recvres, num, err.data());
+
+ if (recvres <= 0 || CloseInputSessionRequested) {
+ if ((-recvres != EAGAIN && -recvres != EWOULDBLOCK) || CloseInputSessionRequested) {
+ TString message = CloseInputSessionRequested ? "connection closed by debug command"
+ : recvres == 0 ? "connection closed by peer"
+ : err ? err
+ : Sprintf("socket: %s", strerror(-recvres));
+ LOG_NOTICE_NET(NodeId, "%s", message.data());
+ throw TExReestablishConnection{CloseInputSessionRequested ? TDisconnectReason::Debug() :
+ recvres == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-recvres)};
+ } else if (token && !*readPending) {
+ if (socket.RequestEx(*token, true, false)) {
+ continue; // can try again
+ } else {
+ *readPending = true;
+ }
+ }
+ return -1;
}
- return -1;
- }
- return recvres;
+ return recvres;
+ }
}
constexpr ui64 GetUsageCountClearMask(size_t items, int bits) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 25c05898d2a..28c4e5ff297 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -568,18 +568,21 @@ namespace NActors {
size_t totalWritten = 0;
if (stream && socket && !*writeBlocked) {
- if (const ssize_t r = Write(stream, *socket, maxBytes); r > 0) {
- stream.Advance(r);
- totalWritten += r;
- } else if (r == -1) {
- *writeBlocked = true;
- if (token) {
- socket->Request(*token, false, true);
+ for (;;) {
+ if (const ssize_t r = Write(stream, *socket, maxBytes); r > 0) {
+ stream.Advance(r);
+ totalWritten += r;
+ } else if (r == -1) {
+ if (token && socket->RequestEx(*token, false, true)) {
+ continue; // we can try again
+ }
+ *writeBlocked = true;
+ } else if (r == 0) {
+ // error condition
+ } else {
+ Y_UNREACHABLE();
}
- } else if (r == 0) {
- // error condition
- } else {
- Y_UNREACHABLE();
+ break;
}
}
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index 1f18e72a5ba..1f381a0b5b0 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -230,9 +230,11 @@ namespace NActors {
}
}
- void Request(bool read, bool write) {
+ bool Request(bool read, bool write, bool suppressNotify) {
if (auto thread = Thread.lock()) {
- thread->Request(Record, read, write);
+ return thread->Request(Record, read, write, suppressNotify);
+ } else {
+ return false;
}
}
@@ -283,8 +285,8 @@ namespace NActors {
TPollerToken::~TPollerToken()
{}
- void TPollerToken::Request(bool read, bool write) {
- Impl->Request(read, write);
+ bool TPollerToken::Request(bool read, bool write, bool suppressNotify) {
+ return Impl->Request(read, write, suppressNotify);
}
IActor* CreatePollerActor() {
diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h
index f927b820893..f21d8a615ea 100644
--- a/library/cpp/actors/interconnect/poller_actor.h
+++ b/library/cpp/actors/interconnect/poller_actor.h
@@ -27,7 +27,7 @@ namespace NActors {
public:
~TPollerToken();
- void Request(bool read, bool write);
+ bool Request(bool read, bool write, bool suppressNotify = false);
using TPtr = TIntrusivePtr<TPollerToken>;
};
diff --git a/library/cpp/actors/interconnect/poller_actor_darwin.h b/library/cpp/actors/interconnect/poller_actor_darwin.h
index a1f750e711f..d00aa766f00 100644
--- a/library/cpp/actors/interconnect/poller_actor_darwin.h
+++ b/library/cpp/actors/interconnect/poller_actor_darwin.h
@@ -92,8 +92,9 @@ namespace NActors {
SafeKevent(ev, 2);
}
- void Request(const TIntrusivePtr<TSocketRecord>& /*socket*/, bool /*read*/, bool /*write*/)
- {} // no special processing here as we use kqueue in edge-triggered mode
+ bool Request(const TIntrusivePtr<TSocketRecord>& /*socket*/, bool /*read*/, bool /*write*/, bool /*suppressNotify*/) {
+ return false; // no special processing here as we use kqueue in edge-triggered mode
+ }
};
using TPollerThread = TKqueueThread;
diff --git a/library/cpp/actors/interconnect/poller_actor_linux.h b/library/cpp/actors/interconnect/poller_actor_linux.h
index 8831d2062b8..183135d6b07 100644
--- a/library/cpp/actors/interconnect/poller_actor_linux.h
+++ b/library/cpp/actors/interconnect/poller_actor_linux.h
@@ -4,6 +4,13 @@
namespace NActors {
+ enum {
+ ReadExpected = 1,
+ ReadHit = 2,
+ WriteExpected = 4,
+ WriteHit = 8,
+ };
+
class TEpollThread : public TPollerThreadBase<TEpollThread> {
// epoll file descriptor
int EpollDescriptor;
@@ -55,26 +62,7 @@ namespace NActors {
if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) {
const bool read = ev.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR);
const bool write = ev.events & (EPOLLOUT | EPOLLERR);
-
- // remove hit flags from the bit set
- ui32 flags = record->Flags;
- const ui32 remove = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0);
- while (!record->Flags.compare_exchange_weak(flags, flags & ~remove))
- {}
- flags &= ~remove;
-
- // rearm poller if some flags remain
- if (flags) {
- epoll_event event;
- event.events = EPOLLONESHOT | EPOLLRDHUP | flags;
- event.data.ptr = record;
- if (epoll_ctl(EpollDescriptor, EPOLL_CTL_MOD, record->Socket->GetDescriptor(), &event) == -1) {
- Y_ABORT("epoll_ctl(EPOLL_CTL_MOD) failed with %s", strerror(errno));
- }
- }
-
- // issue notifications
- Notify(record, read, write);
+ UpdateFlags(record, (read ? ReadHit : 0) | (write ? WriteHit : 0), false);
} else {
res = true;
}
@@ -83,6 +71,26 @@ namespace NActors {
return res;
}
+ bool UpdateFlags(TSocketRecord *record, ui32 addMask, bool suppressNotify) {
+ ui32 flags = record->Flags.load(std::memory_order_acquire);
+ for (;;) {
+ ui32 updated = flags | addMask;
+ static constexpr ui32 fullRead = ReadExpected | ReadHit;
+ static constexpr ui32 fullWrite = WriteExpected | WriteHit;
+ const bool read = (updated & fullRead) == fullRead;
+ const bool write = (updated & fullWrite) == fullWrite;
+ updated &= ~((read ? fullRead : 0) | (write ? fullWrite : 0));
+ if (record->Flags.compare_exchange_weak(flags, updated, std::memory_order_acq_rel)) {
+ if (suppressNotify) {
+ return read || write;
+ } else {
+ Notify(record, read, write);
+ return false;
+ }
+ }
+ }
+ }
+
void UnregisterSocketInLoop(const TIntrusivePtr<TSharedDescriptor>& socket) {
if (epoll_ctl(EpollDescriptor, EPOLL_CTL_DEL, socket->GetDescriptor(), nullptr) == -1) {
Y_ABORT("epoll_ctl(EPOLL_CTL_DEL) failed with %s", strerror(errno));
@@ -91,27 +99,15 @@ namespace NActors {
void RegisterSocket(const TIntrusivePtr<TSocketRecord>& record) {
epoll_event event;
- event.events = EPOLLONESHOT | EPOLLRDHUP;
+ event.events = EPOLLET | EPOLLRDHUP | EPOLLIN | EPOLLOUT;
event.data.ptr = record.Get();
if (epoll_ctl(EpollDescriptor, EPOLL_CTL_ADD, record->Socket->GetDescriptor(), &event) == -1) {
Y_ABORT("epoll_ctl(EPOLL_CTL_ADD) failed with %s", strerror(errno));
}
}
- void Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write) {
- const ui32 add = (read ? EPOLLIN : 0) | (write ? EPOLLOUT : 0);
- ui32 flags = record->Flags;
- while (!record->Flags.compare_exchange_weak(flags, flags | add))
- {}
- flags |= add;
- if (flags) {
- epoll_event event;
- event.events = EPOLLONESHOT | EPOLLRDHUP | flags;
- event.data.ptr = record.Get();
- if (epoll_ctl(EpollDescriptor, EPOLL_CTL_MOD, record->Socket->GetDescriptor(), &event) == -1) {
- Y_ABORT("epoll_ctl(EPOLL_CTL_MOD) failed with %s", strerror(errno));
- }
- }
+ bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool suppressNotify) {
+ return UpdateFlags(record.Get(), (read ? ReadExpected : 0) | (write ? WriteExpected : 0), suppressNotify);
}
};
diff --git a/library/cpp/actors/interconnect/poller_actor_win.h b/library/cpp/actors/interconnect/poller_actor_win.h
index f59d3ddda7d..0fdc6ad6cbe 100644
--- a/library/cpp/actors/interconnect/poller_actor_win.h
+++ b/library/cpp/actors/interconnect/poller_actor_win.h
@@ -94,13 +94,14 @@ namespace NActors {
ExecuteSyncOperation(TPollerWakeup());
}
- void Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write) {
+ bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool /*suppressNotify*/) {
with_lock (Mutex) {
const auto it = Descriptors.find(record->Socket->GetDescriptor());
Y_ABORT_UNLESS(it != Descriptors.end());
it->second->Flags |= (read ? READ : 0) | (write ? WRITE : 0);
}
ExecuteSyncOperation(TPollerWakeup());
+ return false;
}
};