diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:17 +0300 |
commit | d3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch) | |
tree | dd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/coroutine/engine/sockpool.h | |
parent | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff) | |
download | ydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/coroutine/engine/sockpool.h')
-rw-r--r-- | library/cpp/coroutine/engine/sockpool.h | 366 |
1 files changed, 183 insertions, 183 deletions
diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h index b34d5ace46..1ebb7e7b38 100644 --- a/library/cpp/coroutine/engine/sockpool.h +++ b/library/cpp/coroutine/engine/sockpool.h @@ -1,253 +1,253 @@ #pragma once - + #include "impl.h" #include "network.h" - + #include <util/network/address.h> #include <util/network/socket.h> #include <util/system/mutex.h> - + extern void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa = nullptr); - + class TSocketPool; class TPooledSocket { - class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> { - public: + class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> { + public: TImpl(SOCKET fd, TSocketPool* pool) noexcept - : Pool_(pool) - , IsKeepAlive_(false) - , Fd_(fd) - { - Touch(); - } - + : Pool_(pool) + , IsKeepAlive_(false) + , Fd_(fd) + { + Touch(); + } + static void Destroy(TImpl* impl) noexcept { - impl->DoDestroy(); - } - + impl->DoDestroy(); + } + void DoDestroy() noexcept { - if (!Closed() && IsKeepAlive() && IsInGoodState()) { - ReturnToPool(); - } else { - delete this; - } - } - + if (!Closed() && IsKeepAlive() && IsInGoodState()) { + ReturnToPool(); + } else { + delete this; + } + } + bool IsKeepAlive() const noexcept { - return IsKeepAlive_; - } - + return IsKeepAlive_; + } + void SetKeepAlive(bool ka) { - ::SetKeepAlive(Fd_, ka); - IsKeepAlive_ = ka; - } - + ::SetKeepAlive(Fd_, ka); + IsKeepAlive_ = ka; + } + SOCKET Socket() const noexcept { - return Fd_; - } - + return Fd_; + } + bool Closed() const noexcept { - return Fd_.Closed(); - } - + return Fd_.Closed(); + } + void Close() noexcept { - Fd_.Close(); - } - + Fd_.Close(); + } + bool IsInGoodState() const noexcept { - int err = 0; - socklen_t len = sizeof(err); - - getsockopt(Fd_, SOL_SOCKET, SO_ERROR, (char*)&err, &len); - - return !err; - } - + int err = 0; + socklen_t len = sizeof(err); + + getsockopt(Fd_, SOL_SOCKET, SO_ERROR, (char*)&err, &len); + + return !err; + } + bool IsOpen() const noexcept { return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_); - } - + } + void Touch() noexcept { - TouchTime_ = TInstant::Now(); - } - + TouchTime_ = TInstant::Now(); + } + const TInstant& LastTouch() const noexcept { - return TouchTime_; - } - - private: + return TouchTime_; + } + + private: inline void ReturnToPool() noexcept; - - private: - TSocketPool* Pool_; - bool IsKeepAlive_; - TSocketHolder Fd_; - TInstant TouchTime_; - }; - - friend class TSocketPool; - -public: + + private: + TSocketPool* Pool_; + bool IsKeepAlive_; + TSocketHolder Fd_; + TInstant TouchTime_; + }; + + friend class TSocketPool; + +public: TPooledSocket() : Impl_(nullptr) - { - } + { + } TPooledSocket(TImpl* impl) - : Impl_(impl) - { - } - + : Impl_(impl) + { + } + ~TPooledSocket() { - if (UncaughtException() && !!Impl_) { - Close(); + if (UncaughtException() && !!Impl_) { + Close(); } - } - + } + operator SOCKET() const noexcept { - return Impl_->Socket(); - } - + return Impl_->Socket(); + } + void SetKeepAlive(bool ka) { - Impl_->SetKeepAlive(ka); - } - + Impl_->SetKeepAlive(ka); + } + void Close() noexcept { - Impl_->Close(); - } - -private: - TIntrusivePtr<TImpl> Impl_; + Impl_->Close(); + } + +private: + TIntrusivePtr<TImpl> Impl_; }; struct TConnectData { TConnectData(TCont* cont, const TInstant& deadLine) - : Cont(cont) - , DeadLine(deadLine) - { - } - + : Cont(cont) + , DeadLine(deadLine) + { + } + TConnectData(TCont* cont, const TDuration& timeOut) - : Cont(cont) - , DeadLine(TInstant::Now() + timeOut) - { - } - - TCont* Cont; - const TInstant DeadLine; + : Cont(cont) + , DeadLine(TInstant::Now() + timeOut) + { + } + + TCont* Cont; + const TInstant DeadLine; }; class TSocketPool { - friend class TPooledSocket::TImpl; - -public: - typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef; + friend class TPooledSocket::TImpl; + +public: + typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef; TSocketPool(int ip, int port) - : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port))) - { - } - + : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port))) + { + } + TSocketPool(const TAddrRef& addr) - : Addr_(addr) - { - } + : Addr_(addr) + { + } void EraseStale(const TInstant& maxAge) noexcept { - TSockets toDelete; - - { - TGuard<TMutex> guard(Mutex_); - - for (TSockets::TIterator it = Pool_.Begin(); it != Pool_.End();) { - if (it->LastTouch() < maxAge) { - toDelete.PushBack(&*(it++)); - } else { - ++it; - } - } - } - } - + TSockets toDelete; + + { + TGuard<TMutex> guard(Mutex_); + + for (TSockets::TIterator it = Pool_.Begin(); it != Pool_.End();) { + if (it->LastTouch() < maxAge) { + toDelete.PushBack(&*(it++)); + } else { + ++it; + } + } + } + } + TPooledSocket Get(TConnectData* conn) { - TPooledSocket ret; - - if (TPooledSocket::TImpl* alive = GetImpl()) { - ret = TPooledSocket(alive); - } else { - ret = AllocateMore(conn); - } - - ret.Impl_->Touch(); - - return ret; - } - + TPooledSocket ret; + + if (TPooledSocket::TImpl* alive = GetImpl()) { + ret = TPooledSocket(alive); + } else { + ret = AllocateMore(conn); + } + + ret.Impl_->Touch(); + + return ret; + } + bool GetAlive(TPooledSocket& socket) { - if (TPooledSocket::TImpl* alive = GetImpl()) { - alive->Touch(); - socket = TPooledSocket(alive); - return true; + if (TPooledSocket::TImpl* alive = GetImpl()) { + alive->Touch(); + socket = TPooledSocket(alive); + return true; } - return false; - } + return false; + } -private: +private: TPooledSocket::TImpl* GetImpl() { - TGuard<TMutex> guard(Mutex_); - - while (!Pool_.Empty()) { - THolder<TPooledSocket::TImpl> ret(Pool_.PopFront()); - - if (ret->IsOpen()) { - return ret.Release(); + TGuard<TMutex> guard(Mutex_); + + while (!Pool_.Empty()) { + THolder<TPooledSocket::TImpl> ret(Pool_.PopFront()); + + if (ret->IsOpen()) { + return ret.Release(); } } return nullptr; - } - + } + void Release(TPooledSocket::TImpl* impl) noexcept { - TGuard<TMutex> guard(Mutex_); - - Pool_.PushFront(impl); - } - - TPooledSocket AllocateMore(TConnectData* conn); - -private: - TAddrRef Addr_; + TGuard<TMutex> guard(Mutex_); + + Pool_.PushFront(impl); + } + + TPooledSocket AllocateMore(TConnectData* conn); + +private: + TAddrRef Addr_; using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>; - TSockets Pool_; - TMutex Mutex_; + TSockets Pool_; + TMutex Mutex_; }; inline void TPooledSocket::TImpl::ReturnToPool() noexcept { - Pool_->Release(this); + Pool_->Release(this); } class TContIO: public IInputStream, public IOutputStream { -public: +public: TContIO(SOCKET fd, TCont* cont) - : Fd_(fd) - , Cont_(cont) - { - } - + : Fd_(fd) + , Cont_(cont) + { + } + void DoWrite(const void* buf, size_t len) override { NCoro::WriteI(Cont_, Fd_, buf, len).Checked(); - } - + } + size_t DoRead(void* buf, size_t len) override { return NCoro::ReadI(Cont_, Fd_, buf, len).Checked(); - } - + } + SOCKET Fd() const noexcept { - return Fd_; - } - -private: - SOCKET Fd_; - TCont* Cont_; + return Fd_; + } + +private: + SOCKET Fd_; + TCont* Cont_; }; |