aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine/engine/sockpool.h
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/coroutine/engine/sockpool.h
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/coroutine/engine/sockpool.h')
-rw-r--r--library/cpp/coroutine/engine/sockpool.h253
1 files changed, 253 insertions, 0 deletions
diff --git a/library/cpp/coroutine/engine/sockpool.h b/library/cpp/coroutine/engine/sockpool.h
new file mode 100644
index 0000000000..1ebb7e7b38
--- /dev/null
+++ b/library/cpp/coroutine/engine/sockpool.h
@@ -0,0 +1,253 @@
+#pragma once
+
+#include "impl.h"
+#include "network.h"
+
+#include <util/network/address.h>
+#include <util/network/socket.h>
+#include <util/system/mutex.h>
+
+extern void SetCommonSockOpts(SOCKET sock, const struct sockaddr* sa = nullptr);
+
+class TSocketPool;
+
+class TPooledSocket {
+ class TImpl: public TIntrusiveListItem<TImpl>, public TSimpleRefCount<TImpl, TImpl> {
+ public:
+ TImpl(SOCKET fd, TSocketPool* pool) noexcept
+ : Pool_(pool)
+ , IsKeepAlive_(false)
+ , Fd_(fd)
+ {
+ Touch();
+ }
+
+ static void Destroy(TImpl* impl) noexcept {
+ impl->DoDestroy();
+ }
+
+ void DoDestroy() noexcept {
+ if (!Closed() && IsKeepAlive() && IsInGoodState()) {
+ ReturnToPool();
+ } else {
+ delete this;
+ }
+ }
+
+ bool IsKeepAlive() const noexcept {
+ return IsKeepAlive_;
+ }
+
+ void SetKeepAlive(bool ka) {
+ ::SetKeepAlive(Fd_, ka);
+ IsKeepAlive_ = ka;
+ }
+
+ SOCKET Socket() const noexcept {
+ return Fd_;
+ }
+
+ bool Closed() const noexcept {
+ return Fd_.Closed();
+ }
+
+ void Close() noexcept {
+ Fd_.Close();
+ }
+
+ bool IsInGoodState() const noexcept {
+ int err = 0;
+ socklen_t len = sizeof(err);
+
+ getsockopt(Fd_, SOL_SOCKET, SO_ERROR, (char*)&err, &len);
+
+ return !err;
+ }
+
+ bool IsOpen() const noexcept {
+ return IsInGoodState() && IsNotSocketClosedByOtherSide(Fd_);
+ }
+
+ void Touch() noexcept {
+ TouchTime_ = TInstant::Now();
+ }
+
+ const TInstant& LastTouch() const noexcept {
+ return TouchTime_;
+ }
+
+ private:
+ inline void ReturnToPool() noexcept;
+
+ private:
+ TSocketPool* Pool_;
+ bool IsKeepAlive_;
+ TSocketHolder Fd_;
+ TInstant TouchTime_;
+ };
+
+ friend class TSocketPool;
+
+public:
+ TPooledSocket()
+ : Impl_(nullptr)
+ {
+ }
+
+ TPooledSocket(TImpl* impl)
+ : Impl_(impl)
+ {
+ }
+
+ ~TPooledSocket() {
+ if (UncaughtException() && !!Impl_) {
+ Close();
+ }
+ }
+
+ operator SOCKET() const noexcept {
+ return Impl_->Socket();
+ }
+
+ void SetKeepAlive(bool ka) {
+ Impl_->SetKeepAlive(ka);
+ }
+
+ void Close() noexcept {
+ Impl_->Close();
+ }
+
+private:
+ TIntrusivePtr<TImpl> Impl_;
+};
+
+struct TConnectData {
+ TConnectData(TCont* cont, const TInstant& deadLine)
+ : Cont(cont)
+ , DeadLine(deadLine)
+ {
+ }
+
+ TConnectData(TCont* cont, const TDuration& timeOut)
+ : Cont(cont)
+ , DeadLine(TInstant::Now() + timeOut)
+ {
+ }
+
+ TCont* Cont;
+ const TInstant DeadLine;
+};
+
+class TSocketPool {
+ friend class TPooledSocket::TImpl;
+
+public:
+ typedef TAtomicSharedPtr<NAddr::IRemoteAddr> TAddrRef;
+
+ TSocketPool(int ip, int port)
+ : Addr_(new NAddr::TIPv4Addr(TIpAddress((ui32)ip, (ui16)port)))
+ {
+ }
+
+ TSocketPool(const TAddrRef& addr)
+ : Addr_(addr)
+ {
+ }
+
+ void EraseStale(const TInstant& maxAge) noexcept {
+ TSockets toDelete;
+
+ {
+ TGuard<TMutex> guard(Mutex_);
+
+ for (TSockets::TIterator it = Pool_.Begin(); it != Pool_.End();) {
+ if (it->LastTouch() < maxAge) {
+ toDelete.PushBack(&*(it++));
+ } else {
+ ++it;
+ }
+ }
+ }
+ }
+
+ TPooledSocket Get(TConnectData* conn) {
+ TPooledSocket ret;
+
+ if (TPooledSocket::TImpl* alive = GetImpl()) {
+ ret = TPooledSocket(alive);
+ } else {
+ ret = AllocateMore(conn);
+ }
+
+ ret.Impl_->Touch();
+
+ return ret;
+ }
+
+ bool GetAlive(TPooledSocket& socket) {
+ if (TPooledSocket::TImpl* alive = GetImpl()) {
+ alive->Touch();
+ socket = TPooledSocket(alive);
+ return true;
+ }
+ return false;
+ }
+
+private:
+ TPooledSocket::TImpl* GetImpl() {
+ TGuard<TMutex> guard(Mutex_);
+
+ while (!Pool_.Empty()) {
+ THolder<TPooledSocket::TImpl> ret(Pool_.PopFront());
+
+ if (ret->IsOpen()) {
+ return ret.Release();
+ }
+ }
+ return nullptr;
+ }
+
+ void Release(TPooledSocket::TImpl* impl) noexcept {
+ TGuard<TMutex> guard(Mutex_);
+
+ Pool_.PushFront(impl);
+ }
+
+ TPooledSocket AllocateMore(TConnectData* conn);
+
+private:
+ TAddrRef Addr_;
+ using TSockets = TIntrusiveListWithAutoDelete<TPooledSocket::TImpl, TDelete>;
+ TSockets Pool_;
+ TMutex Mutex_;
+};
+
+inline void TPooledSocket::TImpl::ReturnToPool() noexcept {
+ Pool_->Release(this);
+}
+
+
+class TContIO: public IInputStream, public IOutputStream {
+public:
+ TContIO(SOCKET fd, TCont* cont)
+ : Fd_(fd)
+ , Cont_(cont)
+ {
+ }
+
+ void DoWrite(const void* buf, size_t len) override {
+ NCoro::WriteI(Cont_, Fd_, buf, len).Checked();
+ }
+
+ size_t DoRead(void* buf, size_t len) override {
+ return NCoro::ReadI(Cont_, Fd_, buf, len).Checked();
+ }
+
+ SOCKET Fd() const noexcept {
+ return Fd_;
+ }
+
+private:
+ SOCKET Fd_;
+ TCont* Cont_;
+};