diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/coroutine/engine/network.cpp | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/coroutine/engine/network.cpp')
-rw-r--r-- | library/cpp/coroutine/engine/network.cpp | 170 |
1 files changed, 85 insertions, 85 deletions
diff --git a/library/cpp/coroutine/engine/network.cpp b/library/cpp/coroutine/engine/network.cpp index a5c0d9282c..85b647d210 100644 --- a/library/cpp/coroutine/engine/network.cpp +++ b/library/cpp/coroutine/engine/network.cpp @@ -1,41 +1,41 @@ #include "impl.h" #include "network.h" - + #include <util/generic/scope.h> #include <util/generic/xrange.h> - + #include <sys/uio.h> - + #if defined(_bionic_) # define IOV_MAX 1024 #endif - - + + namespace NCoro { namespace { bool IsBlocked(int lasterr) noexcept { return lasterr == EAGAIN || lasterr == EWOULDBLOCK; } - + ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept { return readv(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count())); } - + ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept { return writev(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count())); } } - - + + int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept { if (cont->Cancelled()) { return ECANCELED; } - + if (nfds == 0) { return 0; } - + TTempArray<TFdEvent> events(nfds); for (auto i : xrange(nfds)) { @@ -47,7 +47,7 @@ namespace NCoro { (events.Data() + i)->~TFdEvent(); } }; - + for (auto i : xrange(nfds)) { cont->Executor()->ScheduleIoWait(events.Data() + i); } @@ -79,36 +79,36 @@ namespace NCoro { if (ret) { if (outfd) { *outfd = ret->Fd(); - } + } return ret->Status(); - } - + } + return EINPROGRESS; - } - + } + int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept { return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine()); - } - + } + int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) { return SelectD(cont, fds, what, nfds, outfd, TInstant::Max()); } - + int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept { TFdEvent event(cont, fd, (ui16)what, deadline); return ExecuteEvent(&event); - } - + } + int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept { return PollD(cont, fd, what, timeout.ToDeadLine()); } int PollI(TCont* cont, SOCKET fd, int what) noexcept { return PollD(cont, fd, what, TInstant::Max()); - } - - + } + + TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { while (true) { ssize_t res = DoReadVector(fd, vec); @@ -117,103 +117,103 @@ namespace NCoro { return TContIOStatus::Success((size_t) res); } - { - const int err = LastSystemError(); - - if (!IsBlocked(err)) { + { + const int err = LastSystemError(); + + if (!IsBlocked(err)) { return TContIOStatus::Error(err); - } - } - + } + } + if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) { return TContIOStatus::Error((int) res); - } - } - } - + } + } + } + TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine()); } - + TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { return ReadVectorD(cont, fd, vec, TInstant::Max()); } - - + + TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept { IOutputStream::TPart part(buf, len); TContIOVector vec(&part, 1); return ReadVectorD(cont, fd, &vec, deadline); } - + TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept { return ReadD(cont, fd, buf, len, timeout.ToDeadLine()); } - + TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept { return ReadD(cont, fd, buf, len, TInstant::Max()); - } - - + } + + TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept { size_t written = 0; - + while (!vec->Complete()) { ssize_t res = DoWriteVector(fd, vec); - + if (res >= 0) { written += res; - + vec->Proceed((size_t) res); } else { { const int err = LastSystemError(); - + if (!IsBlocked(err)) { return TContIOStatus(written, err); } } - + if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) { return TContIOStatus(written, (int) res); } } - } - + } + return TContIOStatus::Success(written); - } - + } + TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept { return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine()); } - + TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept { return WriteVectorD(cont, fd, vec, TInstant::Max()); } - - + + TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept { IOutputStream::TPart part(buf, len); TContIOVector vec(&part, 1); return WriteVectorD(cont, fd, &vec, deadline); } - + TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept { return WriteD(cont, fd, buf, len, timeout.ToDeadLine()); } - + TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept { return WriteD(cont, fd, buf, len, TInstant::Max()); - } - - + } + + int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept { TSocketHolder res(Socket(ai)); - + if (res.Closed()) { return LastSystemError(); } - + const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline); if (!ret) { @@ -221,49 +221,49 @@ namespace NCoro { } return ret; - } - + } + int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept { int ret = EHOSTUNREACH; - + for (auto it = addr.Begin(); it != addr.End(); ++it) { ret = ConnectD(cont, s, *it, deadline); - + if (ret == 0 || ret == ETIMEDOUT) { return ret; } } - + return ret; - } - + } + int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept { return ConnectD(cont, s, addr, timeout.ToDeadLine()); } - + int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept { return ConnectD(cont, s, addr, TInstant::Max()); - } - + } + int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept { if (connect(s, name, namelen)) { const int err = LastSystemError(); - + if (!IsBlocked(err) && err != EINPROGRESS) { return err; } - + int ret = PollD(cont, s, CONT_POLL_WRITE, deadline); - + if (ret) { return ret; } - + // check if we really connected // FIXME: Unportable ?? int serr = 0; socklen_t slen = sizeof(serr); - + ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen); if (ret) { @@ -286,27 +286,27 @@ namespace NCoro { return ConnectD(cont, s, name, namelen, TInstant::Max()); } - + int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept { SOCKET ret; - + while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) { int err = LastSystemError(); - + if (!IsBlocked(err)) { return -err; } err = PollD(cont, s, CONT_POLL_READ, deadline); - + if (err) { return -err; } } - + return (int) ret; - } - + } + int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept { return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine()); } |