diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/coroutine/engine/network.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/coroutine/engine/network.cpp')
-rw-r--r-- | library/cpp/coroutine/engine/network.cpp | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/library/cpp/coroutine/engine/network.cpp b/library/cpp/coroutine/engine/network.cpp new file mode 100644 index 0000000000..85b647d210 --- /dev/null +++ b/library/cpp/coroutine/engine/network.cpp @@ -0,0 +1,325 @@ +#include "impl.h" +#include "network.h" + +#include <util/generic/scope.h> +#include <util/generic/xrange.h> + +#include <sys/uio.h> + +#if defined(_bionic_) +# define IOV_MAX 1024 +#endif + + +namespace NCoro { + namespace { + bool IsBlocked(int lasterr) noexcept { + return lasterr == EAGAIN || lasterr == EWOULDBLOCK; + } + + ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept { + return readv(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count())); + } + + ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept { + return writev(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count())); + } + } + + + int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept { + if (cont->Cancelled()) { + return ECANCELED; + } + + if (nfds == 0) { + return 0; + } + + TTempArray<TFdEvent> events(nfds); + + for (auto i : xrange(nfds)) { + new(events.Data() + i) TFdEvent(cont, fds[i], (ui16) what[i], deadline); + } + + Y_DEFER { + for (auto i : xrange(nfds)) { + (events.Data() + i)->~TFdEvent(); + } + }; + + for (auto i : xrange(nfds)) { + cont->Executor()->ScheduleIoWait(events.Data() + i); + } + cont->Switch(); + + if (cont->Cancelled()) { + return ECANCELED; + } + + TFdEvent* ret = nullptr; + int status = EINPROGRESS; + + for (auto i : xrange(nfds)) { + auto& ev = *(events.Data() + i); + switch (ev.Status()) { + case EINPROGRESS: + break; + case ETIMEDOUT: + if (status != EINPROGRESS) { + break; + } + [[fallthrough]]; + default: + status = ev.Status(); + ret = &ev; + } + } + + if (ret) { + if (outfd) { + *outfd = ret->Fd(); + } + return ret->Status(); + } + + return EINPROGRESS; + } + + int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept { + return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine()); + } + + int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) { + return SelectD(cont, fds, what, nfds, outfd, TInstant::Max()); + } + + + int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept { + TFdEvent event(cont, fd, (ui16)what, deadline); + return ExecuteEvent(&event); + } + + int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept { + return PollD(cont, fd, what, timeout.ToDeadLine()); + } + + int PollI(TCont* cont, SOCKET fd, int what) noexcept { + return PollD(cont, fd, what, TInstant::Max()); + } + + + TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { + while (true) { + ssize_t res = DoReadVector(fd, vec); + + if (res >= 0) { + return TContIOStatus::Success((size_t) res); + } + + { + const int err = LastSystemError(); + + if (!IsBlocked(err)) { + return TContIOStatus::Error(err); + } + } + + if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) { + return TContIOStatus::Error((int) res); + } + } + } + + TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { + return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine()); + } + + TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { + return ReadVectorD(cont, fd, vec, TInstant::Max()); + } + + + TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept { + IOutputStream::TPart part(buf, len); + TContIOVector vec(&part, 1); + return ReadVectorD(cont, fd, &vec, deadline); + } + + TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept { + return ReadD(cont, fd, buf, len, timeout.ToDeadLine()); + } + + TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept { + return ReadD(cont, fd, buf, len, TInstant::Max()); + } + + + TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { + size_t written = 0; + + while (!vec->Complete()) { + ssize_t res = DoWriteVector(fd, vec); + + if (res >= 0) { + written += res; + + vec->Proceed((size_t) res); + } else { + { + const int err = LastSystemError(); + + if (!IsBlocked(err)) { + return TContIOStatus(written, err); + } + } + + if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) { + return TContIOStatus(written, (int) res); + } + } + } + + return TContIOStatus::Success(written); + } + + TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { + return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine()); + } + + TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { + return WriteVectorD(cont, fd, vec, TInstant::Max()); + } + + + TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept { + IOutputStream::TPart part(buf, len); + TContIOVector vec(&part, 1); + return WriteVectorD(cont, fd, &vec, deadline); + } + + TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept { + return WriteD(cont, fd, buf, len, timeout.ToDeadLine()); + } + + TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept { + return WriteD(cont, fd, buf, len, TInstant::Max()); + } + + + int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept { + TSocketHolder res(Socket(ai)); + + if (res.Closed()) { + return LastSystemError(); + } + + const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline); + + if (!ret) { + s.Swap(res); + } + + return ret; + } + + int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept { + int ret = EHOSTUNREACH; + + for (auto it = addr.Begin(); it != addr.End(); ++it) { + ret = ConnectD(cont, s, *it, deadline); + + if (ret == 0 || ret == ETIMEDOUT) { + return ret; + } + } + + return ret; + } + + int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept { + return ConnectD(cont, s, addr, timeout.ToDeadLine()); + } + + int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept { + return ConnectD(cont, s, addr, TInstant::Max()); + } + + int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept { + if (connect(s, name, namelen)) { + const int err = LastSystemError(); + + if (!IsBlocked(err) && err != EINPROGRESS) { + return err; + } + + int ret = PollD(cont, s, CONT_POLL_WRITE, deadline); + + if (ret) { + return ret; + } + + // check if we really connected + // FIXME: Unportable ?? + int serr = 0; + socklen_t slen = sizeof(serr); + + ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen); + + if (ret) { + return LastSystemError(); + } + + if (serr) { + return serr; + } + } + + return 0; + } + + int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept { + return ConnectD(cont, s, name, namelen, timeout.ToDeadLine()); + } + + int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept { + return ConnectD(cont, s, name, namelen, TInstant::Max()); + } + + + int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept { + SOCKET ret; + + while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) { + int err = LastSystemError(); + + if (!IsBlocked(err)) { + return -err; + } + + err = PollD(cont, s, CONT_POLL_READ, deadline); + + if (err) { + return -err; + } + } + + return (int) ret; + } + + int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept { + return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine()); + } + + int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept { + return AcceptD(cont, s, addr, addrlen, TInstant::Max()); + } + + SOCKET Socket(int domain, int type, int protocol) noexcept { + return Socket4(domain, type, protocol); + } + + SOCKET Socket(const struct addrinfo& ai) noexcept { + return Socket(ai.ai_family, ai.ai_socktype, ai.ai_protocol); + } +} |