summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <[email protected]>2023-11-17 21:51:27 +0300
committeralexvru <[email protected]>2023-11-17 22:24:31 +0300
commit9fb3a015a1c695b4f6ee2196404e7b2a06e6d677 (patch)
treedfef740b72a2662862caf1eadb53da9b53c97456
parent983dd8625f86eab8f1af1b53c44ad49df392dadc (diff)
Fix EPOLLET logic to keep old behaviour for inoptimal user code KIKIMR-20021
-rw-r--r--library/cpp/actors/http/http_proxy_acceptor.cpp11
-rw-r--r--library/cpp/actors/http/http_proxy_incoming.cpp27
-rw-r--r--library/cpp/actors/http/http_proxy_outgoing.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp5
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.cpp23
-rw-r--r--library/cpp/actors/interconnect/interconnect_stream.h6
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_server.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp15
-rw-r--r--library/cpp/actors/interconnect/poller_actor.cpp30
-rw-r--r--library/cpp/actors/interconnect/poller_actor.h11
-rw-r--r--library/cpp/actors/interconnect/poller_actor_darwin.h3
-rw-r--r--library/cpp/actors/interconnect/poller_actor_linux.h28
-rw-r--r--library/cpp/actors/interconnect/poller_actor_win.h3
14 files changed, 128 insertions, 48 deletions
diff --git a/library/cpp/actors/http/http_proxy_acceptor.cpp b/library/cpp/actors/http/http_proxy_acceptor.cpp
index f9ee1d8032e..c007f747eb3 100644
--- a/library/cpp/actors/http/http_proxy_acceptor.cpp
+++ b/library/cpp/actors/http/http_proxy_acceptor.cpp
@@ -116,6 +116,12 @@ protected:
SocketAddressType addr;
std::optional<SocketType> s = Socket->Socket.Accept(addr);
if (!s) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ Y_ABORT_UNLESS(PollerToken);
+ if (PollerToken->RequestReadNotificationAfterWouldBlock()) {
+ continue; // we can try it again
+ }
+ }
break;
}
TIntrusivePtr<TSocketDescriptor> socket = new TSocketDescriptor(std::move(s).value());
@@ -130,11 +136,6 @@ protected:
ctx.Send(Poller, new NActors::TEvPollerRegister(socket, connectionId, connectionId));
Connections.emplace(connectionId);
}
- int err = errno;
- if (err == EAGAIN || err == EWOULDBLOCK) { // request poller for further connection polling
- Y_ABORT_UNLESS(PollerToken);
- PollerToken->Request(true, false);
- }
}
void Handle(TEvHttpProxy::TEvHttpConnectionClosed::TPtr event, const NActors::TActorContext&) {
diff --git a/library/cpp/actors/http/http_proxy_incoming.cpp b/library/cpp/actors/http/http_proxy_incoming.cpp
index 68bd5f6ecb0..b98b3c09f36 100644
--- a/library/cpp/actors/http/http_proxy_incoming.cpp
+++ b/library/cpp/actors/http/http_proxy_incoming.cpp
@@ -79,16 +79,19 @@ protected:
void OnAccept(const NActors::TActorContext& ctx) {
int res;
bool read = false, write = false;
- if ((res = TSocketImpl::OnAccept(Endpoint, read, write)) != 1) {
- if (-res == EAGAIN) {
- if (PollerToken) {
- PollerToken->Request(read, write);
+ for (;;) {
+ if ((res = TSocketImpl::OnAccept(Endpoint, read, write)) != 1) {
+ if (-res == EAGAIN) {
+ if (PollerToken && PollerToken->RequestReadNotificationAfterWouldBlock()) {
+ continue;
+ }
+ return; // wait for further notifications
+ } else {
+ LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Accept: " << strerror(-res));
+ return Die(ctx);
}
- return; // wait for further notifications
- } else {
- LOG_ERROR_S(ctx, HttpLog, "(#" << TSocketImpl::GetRawSocket() << "," << Address << ") connection closed - error in Accept: " << strerror(-res));
- return Die(ctx);
}
+ break;
}
TBase::Become(&TIncomingConnectionActor::StateConnected);
ctx.Send(ctx.SelfID, new TEvPollerReady(nullptr, true, true));
@@ -147,7 +150,9 @@ protected:
if (!read && !write) {
read = true;
}
- PollerToken->Request(read, write);
+ if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) {
+ continue;
+ }
}
break;
} else if (-res == EINTR) {
@@ -258,7 +263,9 @@ protected:
if (!read && !write) {
write = true;
}
- PollerToken->Request(read, write);
+ if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) {
+ continue;
+ }
}
break;
} else {
diff --git a/library/cpp/actors/http/http_proxy_outgoing.cpp b/library/cpp/actors/http/http_proxy_outgoing.cpp
index 3bb23d6fae8..b1f27c2c5a1 100644
--- a/library/cpp/actors/http/http_proxy_outgoing.cpp
+++ b/library/cpp/actors/http/http_proxy_outgoing.cpp
@@ -107,7 +107,9 @@ protected:
if (!read && !write) {
write = true;
}
- PollerToken->Request(read, write);
+ if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) {
+ continue;
+ }
}
break;
} else {
@@ -144,7 +146,9 @@ protected:
if (!read && !write) {
read = true;
}
- PollerToken->Request(read, write);
+ if (PollerToken->RequestNotificationAfterWouldBlock(read, write)) {
+ continue;
+ }
}
return;
} else {
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index eae80d8c5cd..8c347507b2d 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -164,8 +164,9 @@ namespace NActors {
}
void WaitPoller(bool read, bool write, TString state) {
- PollerToken->Request(read, write);
- Actor->WaitForSpecificEvent<TEvPollerReady>(std::move(state));
+ if (!PollerToken->RequestNotificationAfterWouldBlock(read, write)) {
+ Actor->WaitForSpecificEvent<TEvPollerReady>(std::move(state));
+ }
}
template <typename TDataPtr, typename TSendRecvFunc>
diff --git a/library/cpp/actors/interconnect/interconnect_stream.cpp b/library/cpp/actors/interconnect/interconnect_stream.cpp
index 2866a5ab6c9..f8db077fa40 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.cpp
+++ b/library/cpp/actors/interconnect/interconnect_stream.cpp
@@ -214,8 +214,12 @@ namespace NInterconnect {
token.Request(read, write);
}
- bool TStreamSocket::RequestEx(NActors::TPollerToken& token, bool read, bool write) {
- return token.Request(read, write);
+ bool TStreamSocket::RequestReadNotificationAfterWouldBlock(NActors::TPollerToken& token) {
+ return token.RequestReadNotificationAfterWouldBlock();
+ }
+
+ bool TStreamSocket::RequestWriteNotificationAfterWouldBlock(NActors::TPollerToken& token) {
+ return token.RequestWriteNotificationAfterWouldBlock();
}
size_t TStreamSocket::ExpectedWriteLength() const {
@@ -655,8 +659,19 @@ namespace NInterconnect {
token.Request(WantRead(), WantWrite());
}
- bool TSecureSocket::RequestEx(NActors::TPollerToken& token, bool /*read*/, bool /*write*/) {
- return token.Request(WantRead(), WantWrite(), true);
+ bool TSecureSocket::RequestReadNotificationAfterWouldBlock(NActors::TPollerToken& token) {
+ bool result = false;
+ if (WantRead()) {
+ result |= token.RequestReadNotificationAfterWouldBlock();
+ }
+ if (WantWrite()) {
+ result |= token.RequestWriteNotificationAfterWouldBlock();
+ }
+ return result;
+ }
+
+ bool TSecureSocket::RequestWriteNotificationAfterWouldBlock(NActors::TPollerToken& token) {
+ return RequestReadNotificationAfterWouldBlock(token);
}
size_t TSecureSocket::ExpectedWriteLength() const {
diff --git a/library/cpp/actors/interconnect/interconnect_stream.h b/library/cpp/actors/interconnect/interconnect_stream.h
index ed23087f820..b9ca804e0e5 100644
--- a/library/cpp/actors/interconnect/interconnect_stream.h
+++ b/library/cpp/actors/interconnect/interconnect_stream.h
@@ -69,7 +69,8 @@ namespace NInterconnect {
ui32 GetSendBufferSize() const;
virtual void Request(NActors::TPollerToken& token, bool read, bool write);
- virtual bool RequestEx(NActors::TPollerToken& token, bool read, bool write);
+ virtual bool RequestReadNotificationAfterWouldBlock(NActors::TPollerToken& token);
+ virtual bool RequestWriteNotificationAfterWouldBlock(NActors::TPollerToken& token);
virtual size_t ExpectedWriteLength() const;
};
@@ -124,7 +125,8 @@ namespace NInterconnect {
bool WantRead() const;
bool WantWrite() const;
void Request(NActors::TPollerToken& token, bool read, bool write) override;
- bool RequestEx(NActors::TPollerToken& token, bool read, bool write) override;
+ bool RequestReadNotificationAfterWouldBlock(NActors::TPollerToken& token) override;
+ bool RequestWriteNotificationAfterWouldBlock(NActors::TPollerToken& token) override;
size_t ExpectedWriteLength() const override;
};
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 49417abd532..1409f2cf0cd 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -742,7 +742,7 @@ namespace NActors {
throw TExReestablishConnection{CloseInputSessionRequested ? TDisconnectReason::Debug() :
recvres == 0 ? TDisconnectReason::EndOfStream() : TDisconnectReason::FromErrno(-recvres)};
} else if (token && !*readPending) {
- if (socket.RequestEx(*token, true, false)) {
+ if (socket.RequestReadNotificationAfterWouldBlock(*token)) {
continue; // can try again
} else {
*readPending = true;
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
index 09c5987e81f..df0c172dc18 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_server.cpp
@@ -109,8 +109,8 @@ namespace NActors {
Listener.Reset();
PollerToken.Reset();
Become(&TThis::Initial, TDuration::Seconds(1), new TEvents::TEvBootstrap);
- } else if (PollerToken) {
- PollerToken->Request(true, false);
+ } else if (PollerToken && PollerToken->RequestReadNotificationAfterWouldBlock()) {
+ continue;
}
break;
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 28c4e5ff297..7d6f8d012f7 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -545,17 +545,18 @@ namespace NActors {
void TInterconnectSessionTCP::Handle(TEvPollerRegisterResult::TPtr ev) {
auto *msg = ev->Get();
+ bool sendPollerReady = false;
if (msg->Socket == Socket) {
PollerToken = std::move(msg->PollerToken);
- if (ReceiveContext->MainWriteBlocked) {
- Socket->Request(*PollerToken, false, true);
- }
+ sendPollerReady = ReceiveContext->MainWriteBlocked;
} else if (msg->Socket == XdcSocket) {
XdcPollerToken = std::move(msg->PollerToken);
- if (ReceiveContext->XdcWriteBlocked) {
- XdcSocket->Request(*XdcPollerToken, false, true);
- }
+ sendPollerReady = ReceiveContext->XdcWriteBlocked;
+ }
+
+ if (sendPollerReady) {
+ Send(SelfId(), new TEvPollerReady(msg->Socket, false, true));
}
}
@@ -573,7 +574,7 @@ namespace NActors {
stream.Advance(r);
totalWritten += r;
} else if (r == -1) {
- if (token && socket->RequestEx(*token, false, true)) {
+ if (token && socket->RequestWriteNotificationAfterWouldBlock(*token)) {
continue; // we can try again
}
*writeBlocked = true;
diff --git a/library/cpp/actors/interconnect/poller_actor.cpp b/library/cpp/actors/interconnect/poller_actor.cpp
index 1f381a0b5b0..040bdf86517 100644
--- a/library/cpp/actors/interconnect/poller_actor.cpp
+++ b/library/cpp/actors/interconnect/poller_actor.cpp
@@ -230,9 +230,23 @@ namespace NActors {
}
}
- bool Request(bool read, bool write, bool suppressNotify) {
+ void Request(bool read, bool write) {
if (auto thread = Thread.lock()) {
- return thread->Request(Record, read, write, suppressNotify);
+ thread->Request(Record, read, write, false, false);
+ }
+ }
+
+ bool RequestReadNotificationAfterWouldBlock() {
+ if (auto thread = Thread.lock()) {
+ return thread->Request(Record, true, false, true, true);
+ } else {
+ return false;
+ }
+ }
+
+ bool RequestWriteNotificationAfterWouldBlock() {
+ if (auto thread = Thread.lock()) {
+ return thread->Request(Record, false, true, true, true);
} else {
return false;
}
@@ -285,8 +299,16 @@ namespace NActors {
TPollerToken::~TPollerToken()
{}
- bool TPollerToken::Request(bool read, bool write, bool suppressNotify) {
- return Impl->Request(read, write, suppressNotify);
+ void TPollerToken::Request(bool read, bool write) {
+ Impl->Request(read, write);
+ }
+
+ bool TPollerToken::RequestReadNotificationAfterWouldBlock() {
+ return Impl->RequestReadNotificationAfterWouldBlock();
+ }
+
+ bool TPollerToken::RequestWriteNotificationAfterWouldBlock() {
+ return Impl->RequestWriteNotificationAfterWouldBlock();
}
IActor* CreatePollerActor() {
diff --git a/library/cpp/actors/interconnect/poller_actor.h b/library/cpp/actors/interconnect/poller_actor.h
index f21d8a615ea..4fb2d49a658 100644
--- a/library/cpp/actors/interconnect/poller_actor.h
+++ b/library/cpp/actors/interconnect/poller_actor.h
@@ -27,7 +27,16 @@ namespace NActors {
public:
~TPollerToken();
- bool Request(bool read, bool write, bool suppressNotify = false);
+ void Request(bool read, bool write);
+ bool RequestReadNotificationAfterWouldBlock();
+ bool RequestWriteNotificationAfterWouldBlock();
+
+ bool RequestNotificationAfterWouldBlock(bool read, bool write) {
+ bool status = false;
+ status |= read && RequestReadNotificationAfterWouldBlock();
+ status |= write && RequestWriteNotificationAfterWouldBlock();
+ return status;
+ }
using TPtr = TIntrusivePtr<TPollerToken>;
};
diff --git a/library/cpp/actors/interconnect/poller_actor_darwin.h b/library/cpp/actors/interconnect/poller_actor_darwin.h
index d00aa766f00..cd763ac589c 100644
--- a/library/cpp/actors/interconnect/poller_actor_darwin.h
+++ b/library/cpp/actors/interconnect/poller_actor_darwin.h
@@ -92,7 +92,8 @@ namespace NActors {
SafeKevent(ev, 2);
}
- bool Request(const TIntrusivePtr<TSocketRecord>& /*socket*/, bool /*read*/, bool /*write*/, bool /*suppressNotify*/) {
+ bool Request(const TIntrusivePtr<TSocketRecord>& /*socket*/, bool /*read*/, bool /*write*/, bool /*suppressNotify*/,
+ bool /*afterWouldBlock*/) {
return false; // no special processing here as we use kqueue in edge-triggered mode
}
};
diff --git a/library/cpp/actors/interconnect/poller_actor_linux.h b/library/cpp/actors/interconnect/poller_actor_linux.h
index 183135d6b07..2cd557e347a 100644
--- a/library/cpp/actors/interconnect/poller_actor_linux.h
+++ b/library/cpp/actors/interconnect/poller_actor_linux.h
@@ -62,7 +62,8 @@ namespace NActors {
if (auto *record = static_cast<TSocketRecord*>(ev.data.ptr)) {
const bool read = ev.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR);
const bool write = ev.events & (EPOLLOUT | EPOLLERR);
- UpdateFlags(record, (read ? ReadHit : 0) | (write ? WriteHit : 0), false);
+ UpdateFlags(record, (read ? ReadHit : 0) | (write ? WriteHit : 0), false /*suppressNotify*/,
+ false /*checkQueues*/);
} else {
res = true;
}
@@ -71,19 +72,32 @@ namespace NActors {
return res;
}
- bool UpdateFlags(TSocketRecord *record, ui32 addMask, bool suppressNotify) {
+ bool UpdateFlags(TSocketRecord *record, ui32 addMask, bool suppressNotify, bool checkQueues) {
ui32 flags = record->Flags.load(std::memory_order_acquire);
for (;;) {
ui32 updated = flags | addMask;
static constexpr ui32 fullRead = ReadExpected | ReadHit;
static constexpr ui32 fullWrite = WriteExpected | WriteHit;
- const bool read = (updated & fullRead) == fullRead;
- const bool write = (updated & fullWrite) == fullWrite;
+ bool read = (updated & fullRead) == fullRead;
+ bool write = (updated & fullWrite) == fullWrite;
updated &= ~((read ? fullRead : 0) | (write ? fullWrite : 0));
if (record->Flags.compare_exchange_weak(flags, updated, std::memory_order_acq_rel)) {
if (suppressNotify) {
return read || write;
} else {
+ if (checkQueues) {
+ pollfd fd;
+ fd.fd = record->Socket->GetDescriptor();
+ const bool queryRead = updated & ReadExpected && !read;
+ const bool queryWrite = updated & WriteExpected && !write;
+ if (queryRead || queryWrite) {
+ fd.events = (queryRead ? POLLIN : 0) | (queryWrite ? POLLOUT : 0);
+ if (poll(&fd, 1, 0) != -1) {
+ read = queryRead && fd.revents & (POLLIN | POLLHUP | POLLRDHUP | POLLERR);
+ write = queryWrite && fd.revents & (POLLOUT | POLLERR);
+ }
+ }
+ }
Notify(record, read, write);
return false;
}
@@ -106,8 +120,10 @@ namespace NActors {
}
}
- bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool suppressNotify) {
- return UpdateFlags(record.Get(), (read ? ReadExpected : 0) | (write ? WriteExpected : 0), suppressNotify);
+ bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool suppressNotify,
+ bool afterWouldBlock) {
+ return UpdateFlags(record.Get(), (read ? ReadExpected : 0) | (write ? WriteExpected : 0), suppressNotify,
+ !afterWouldBlock);
}
};
diff --git a/library/cpp/actors/interconnect/poller_actor_win.h b/library/cpp/actors/interconnect/poller_actor_win.h
index 0fdc6ad6cbe..a4b213ff8c1 100644
--- a/library/cpp/actors/interconnect/poller_actor_win.h
+++ b/library/cpp/actors/interconnect/poller_actor_win.h
@@ -94,7 +94,8 @@ namespace NActors {
ExecuteSyncOperation(TPollerWakeup());
}
- bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool /*suppressNotify*/) {
+ bool Request(const TIntrusivePtr<TSocketRecord>& record, bool read, bool write, bool /*suppressNotify*/,
+ bool /*afterWouldBlock*/) {
with_lock (Mutex) {
const auto it = Descriptors.find(record->Socket->GetDescriptor());
Y_ABORT_UNLESS(it != Descriptors.end());