diff options
author | qrort <[email protected]> | 2022-12-02 11:31:25 +0300 |
---|---|---|
committer | qrort <[email protected]> | 2022-12-02 11:31:25 +0300 |
commit | b1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806 (patch) | |
tree | 2a23209faf0fea5586a6d4b9cee60d1b318d29fe /library/cpp/coroutine | |
parent | 559174a9144de40d6bb3997ea4073c82289b4974 (diff) |
remove kikimr/driver DEPENDS
Diffstat (limited to 'library/cpp/coroutine')
-rw-r--r-- | library/cpp/coroutine/dns/async.cpp | 149 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/async.h | 32 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/cache.cpp | 131 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/cache.h | 54 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/coro.cpp | 225 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/coro.h | 22 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/helpers.cpp | 148 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/helpers.h | 34 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/iface.h | 75 | ||||
-rw-r--r-- | library/cpp/coroutine/util/cosem.h | 433 | ||||
-rw-r--r-- | library/cpp/coroutine/util/pipeevent.h | 93 | ||||
-rw-r--r-- | library/cpp/coroutine/util/pipeque.h | 262 |
12 files changed, 0 insertions, 1658 deletions
diff --git a/library/cpp/coroutine/dns/async.cpp b/library/cpp/coroutine/dns/async.cpp deleted file mode 100644 index be35135828c..00000000000 --- a/library/cpp/coroutine/dns/async.cpp +++ /dev/null @@ -1,149 +0,0 @@ -#include "async.h" - -#include <util/generic/singleton.h> -#include <util/generic/vector.h> - -#include <contrib/libs/c-ares/include/ares.h> - -using namespace NAsyncDns; - -namespace { - struct TAresError: public TDnsError { - inline TAresError(int code) { - (*this) << ares_strerror(code); - } - }; - - struct TAresInit { - inline TAresInit() { - const int code = ares_library_init(ARES_LIB_INIT_ALL); - - if (code) { - ythrow TAresError(code) << "can not init ares engine"; - } - } - - inline ~TAresInit() { - ares_library_cleanup(); - } - - static inline void InitOnce() { - Singleton<TAresInit>(); - } - }; -} - -class TAsyncDns::TImpl { -public: - inline TImpl(IPoller* poller, const TOptions& o) - : P_(poller) - { - TAresInit::InitOnce(); - - ares_options opts; - - Zero(opts); - - int optflags = 0; - - optflags |= ARES_OPT_FLAGS; - opts.flags = ARES_FLAG_STAYOPEN; - - optflags |= ARES_OPT_TIMEOUTMS; - opts.timeout = o.TimeOut.MilliSeconds(); - - optflags |= ARES_OPT_TRIES; - opts.tries = o.Retries; - - optflags |= ARES_OPT_SOCK_STATE_CB; - opts.sock_state_cb = (decltype(opts.sock_state_cb))StateCb; - static_assert(sizeof(opts.sock_state_cb) == sizeof(&StateCb), "Inconsistent socket state size"); - opts.sock_state_cb_data = this; - - const int code = ares_init_options(&H_, &opts, optflags); - - if (code) { - ythrow TAresError(code) << "can not init ares channel"; - } - } - - inline ~TImpl() { - ares_destroy(H_); - } - - inline TDuration Timeout() { - struct timeval tv; - Zero(tv); - - ares_timeout(H_, nullptr, &tv); - - return TDuration(tv); - } - - template <class T> - inline void ProcessSocket(T s) { - ares_process_fd(H_, s, s); - } - - inline void ProcessNone() { - ProcessSocket(ARES_SOCKET_BAD); - } - - inline void AsyncResolve(const TNameRequest& req) { - ares_gethostbyname(H_, req.Addr, req.Family, AsyncResolveHostCb, req.CB); - } - -private: - static void StateCb(void* arg, int s, int read, int write) { - ((TImpl*)arg)->P_->OnStateChange((SOCKET)s, (bool)read, (bool)write); - } - - static void AsyncResolveHostCb(void* arg, int status, int timeouts, hostent* he) { - const IHostResult::TResult res = { - status, timeouts, he}; - - ((IHostResult*)arg)->OnComplete(res); - } - -private: - IPoller* P_; - ares_channel H_; -}; - -void NAsyncDns::CheckAsyncStatus(int status) { - if (status) { - ythrow TAresError(status); - } -} - -void NAsyncDns::CheckPartialAsyncStatus(int status) { - if (status == ARES_ENODATA) { - return; - } - - CheckAsyncStatus(status); -} - -TAsyncDns::TAsyncDns(IPoller* poller, const TOptions& opts) - : I_(new TImpl(poller, opts)) -{ -} - -TAsyncDns::~TAsyncDns() { -} - -void TAsyncDns::AsyncResolve(const TNameRequest& req) { - I_->AsyncResolve(req); -} - -TDuration TAsyncDns::Timeout() { - return I_->Timeout(); -} - -void TAsyncDns::ProcessSocket(SOCKET s) { - I_->ProcessSocket(s); -} - -void TAsyncDns::ProcessNone() { - I_->ProcessNone(); -} diff --git a/library/cpp/coroutine/dns/async.h b/library/cpp/coroutine/dns/async.h deleted file mode 100644 index 1ec26bc91d7..00000000000 --- a/library/cpp/coroutine/dns/async.h +++ /dev/null @@ -1,32 +0,0 @@ -#pragma once - -#include "iface.h" - -#include <util/network/socket.h> -#include <util/datetime/base.h> -#include <util/generic/ptr.h> - -namespace NAsyncDns { - struct IPoller { - virtual void OnStateChange(SOCKET s, bool read, bool write) = 0; - }; - - class TAsyncDns { - public: - TAsyncDns(IPoller* poller, const TOptions& opts = TOptions()); - ~TAsyncDns(); - - void AsyncResolve(const TNameRequest& req); - - TDuration Timeout(); - void ProcessSocket(SOCKET s); - void ProcessNone(); - - private: - class TImpl; - THolder<TImpl> I_; - }; - - void CheckAsyncStatus(int status); - void CheckPartialAsyncStatus(int status); -} diff --git a/library/cpp/coroutine/dns/cache.cpp b/library/cpp/coroutine/dns/cache.cpp deleted file mode 100644 index 69910a79601..00000000000 --- a/library/cpp/coroutine/dns/cache.cpp +++ /dev/null @@ -1,131 +0,0 @@ -#include "cache.h" - -#include "async.h" -#include "coro.h" -#include "helpers.h" - -#include <library/cpp/cache/cache.h> - -#include <library/cpp/coroutine/engine/impl.h> -#include <library/cpp/coroutine/engine/events.h> - -using namespace NAddr; -using namespace NAsyncDns; - -class TContDnsCache::TImpl { - using TKey = std::pair<TString, ui16>; - - enum EResolveState { - RS_NOT_RESOLVED, - RS_IN_PROGRESS, - RS_RESOLVED, - RS_FAILED, - RS_EXPIRED - }; - - struct TEntry: public TSimpleRefCount<TEntry> { - TAddrs Addrs_; - TInstant ResolveTimestamp_; - EResolveState State_; - TContSimpleEvent ResolvedEvent_; - TString LastResolveError_; - - TEntry(TContExecutor* e) - : State_(RS_NOT_RESOLVED) - , ResolvedEvent_(e) - { - } - }; - - using TEntryRef = TIntrusivePtr<TEntry>; - -public: - inline TImpl(TContExecutor* e, const TCacheOptions& opts) - : Executor_(e) - , Opts_(opts) - , Cache_(opts.MaxSize) - { - } - - inline ~TImpl() { - } - - void LookupOrResolve(TContResolver& resolver, const TString& addr, ui16 port, TAddrs& result) { - TKey cacheKey(addr, port); - - TEntryRef e; - auto iter = Cache_.Find(cacheKey); - if (iter == Cache_.End()) { - e = MakeIntrusive<TEntry>(Executor_); - Cache_.Insert(cacheKey, e); - } else { - e = iter.Value(); - } - - while (true) { - switch (e->State_) { - case RS_NOT_RESOLVED: - case RS_EXPIRED: - e->State_ = RS_IN_PROGRESS; - try { - ResolveAddr(resolver, addr, port, result); - } catch (TDnsError& err) { - e->ResolveTimestamp_ = TInstant::Now(); - e->LastResolveError_ = err.AsStrBuf(); - e->State_ = RS_FAILED; - e->ResolvedEvent_.BroadCast(); - throw; - } catch (...) { - // errors not related to DNS - e->State_ = RS_NOT_RESOLVED; - e->ResolvedEvent_.BroadCast(); - throw; - } - e->ResolveTimestamp_ = TInstant::Now(); - e->Addrs_ = result; - e->State_ = RS_RESOLVED; - e->ResolvedEvent_.BroadCast(); - return; - case RS_IN_PROGRESS: - e->ResolvedEvent_.WaitI(); - continue; - case RS_RESOLVED: - if (e->ResolveTimestamp_ + Opts_.EntryLifetime < TInstant::Now()) { - e->State_ = RS_EXPIRED; - continue; // try and resolve again - } - result = e->Addrs_; - return; - case RS_FAILED: - if (e->ResolveTimestamp_ + Opts_.NotFoundLifetime < TInstant::Now()) { - e->State_ = RS_EXPIRED; - continue; // try and resolve again - } - ythrow TDnsError() << e->LastResolveError_; - default: - Y_FAIL("Bad state, shoult not get here"); - }; - } - } - -private: - TContExecutor* Executor_; - const TCacheOptions Opts_; - TLRUCache<TKey, TEntryRef> Cache_; -}; - -TContDnsCache::TContDnsCache(TContExecutor* e, const NAsyncDns::TCacheOptions& opts) - : I_(MakeHolder<TImpl>(e, opts)) -{ -} - -TContDnsCache::~TContDnsCache() { -} - -void TContDnsCache::LookupOrResolve(TContResolver& resolver, const TString& addr, ui16 port, TVector<NAddr::IRemoteAddrRef>& result) { - return I_->LookupOrResolve(resolver, addr, port, result); -} - -void TContDnsCache::LookupOrResolve(TContResolver& resolver, const TString& addr, TVector<NAddr::IRemoteAddrRef>& result) { - LookupOrResolve(resolver, addr, 80, result); -} diff --git a/library/cpp/coroutine/dns/cache.h b/library/cpp/coroutine/dns/cache.h deleted file mode 100644 index 7c358ce591f..00000000000 --- a/library/cpp/coroutine/dns/cache.h +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include "iface.h" - -#include <util/generic/ptr.h> -#include <util/generic/vector.h> -#include <util/network/address.h> - -class TContExecutor; - -namespace NAsyncDns { - class TContResolver; - - struct TCacheOptions { - inline TCacheOptions() { - } - - inline TCacheOptions& SetEntryLifetime(const TDuration& val) noexcept { - EntryLifetime = val; - - return *this; - } - - inline TCacheOptions& SetNotFoundLifetime(const TDuration& val) noexcept { - NotFoundLifetime = val; - - return *this; - } - - inline TCacheOptions& SetMaxSize(size_t val) noexcept { - MaxSize = val; - - return *this; - } - - size_t MaxSize = 512; - TDuration EntryLifetime = TDuration::Seconds(1800); - TDuration NotFoundLifetime = TDuration::Seconds(1); - }; - - // MT-unsafe LRU DNS cache - class TContDnsCache { - public: - TContDnsCache(TContExecutor* e, const TCacheOptions& opts = {}); - ~TContDnsCache(); - - void LookupOrResolve(TContResolver& resolver, const TString& addr, ui16 port, TVector<NAddr::IRemoteAddrRef>& result); - void LookupOrResolve(TContResolver& resolver, const TString& addr, TVector<NAddr::IRemoteAddrRef>& result); - - private: - class TImpl; - THolder<TImpl> I_; - }; -} diff --git a/library/cpp/coroutine/dns/coro.cpp b/library/cpp/coroutine/dns/coro.cpp deleted file mode 100644 index 9d6a89f6e75..00000000000 --- a/library/cpp/coroutine/dns/coro.cpp +++ /dev/null @@ -1,225 +0,0 @@ -#include "coro.h" - -#include "async.h" - -#include <library/cpp/coroutine/engine/events.h> -#include <library/cpp/coroutine/engine/impl.h> - -#include <util/generic/vector.h> -#include <util/memory/smallobj.h> - -using namespace NAsyncDns; - -class TContResolver::TImpl: public IPoller { - struct TPollEvent: - public NCoro::IPollEvent, - public TIntrusiveListItem<TPollEvent>, - public TObjectFromPool<TPollEvent> - { - inline TPollEvent(TImpl* parent, SOCKET s, int what) - : IPollEvent(s, what) - , P(parent) - { - P->E_->Poller()->Schedule(this); - } - - inline ~TPollEvent() override { - P->E_->Poller()->Remove(this); - } - - void OnPollEvent(int) noexcept override { - P->D_.ProcessSocket(Fd()); - } - - TImpl* P; - }; - - typedef TAutoPtr<TPollEvent> TEventRef; - - struct TOneShotEvent { - inline TOneShotEvent(TContExecutor* e) noexcept - : E(e) - , S(false) - { - } - - inline void Signal() noexcept { - S = true; - E.Signal(); - } - - inline void Wait() noexcept { - if (S) { - return; - } - - E.WaitI(); - } - - TContSimpleEvent E; - bool S; - }; - - struct TContSemaphore { - inline TContSemaphore(TContExecutor* e, size_t num) noexcept - : E(e) - , N(num) - { - } - - inline void Acquire() noexcept { - while (!N) { - E.WaitI(); - } - - --N; - } - - inline void Release() noexcept { - ++N; - E.Signal(); - } - - TContSimpleEvent E; - size_t N; - }; - - struct TRequest: public TIntrusiveListItem<TRequest>, public TOneShotEvent, public IHostResult { - inline TRequest(const TNameRequest* req, TContExecutor* e) - : TOneShotEvent(e) - , Req(req) - { - } - - void OnComplete(const TResult& res) override { - Req->CB->OnComplete(res); - Signal(); - } - - const TNameRequest* Req; - }; - -public: - inline TImpl(TContExecutor* e, const TOptions& opts) - : P_(TDefaultAllocator::Instance()) - , E_(e) - , RateLimit_(E_, opts.MaxRequests) - , D_(this, opts) - , DC_(nullptr) - { - } - - inline ~TImpl() { - Y_VERIFY(DC_ == nullptr, "shit happens"); - } - - inline void Resolve(const TNameRequest& hreq) { - TGuard<TContSemaphore> g(RateLimit_); - - if (hreq.Family == AF_UNSPEC) { - const TNameRequest hreq1(hreq.Copy(AF_INET)); - TRequest req1(&hreq1, E_); - - const TNameRequest hreq2(hreq.Copy(AF_INET6)); - TRequest req2(&hreq2, E_); - - Schedule(&req1); - Schedule(&req2); - - req1.Wait(); - req2.Wait(); - } else { - TRequest req(&hreq, E_); - - Schedule(&req); - - req.Wait(); - } - - if (A_.Empty() && DC_) { - DC_->ReSchedule(); - } - } - -private: - inline void Schedule(TRequest* req) { - D_.AsyncResolve(req->Req->Copy(req)); - A_.PushBack(req); - ResolveCont()->ReSchedule(); - } - - inline TCont* ResolveCont() { - if (!DC_) { - DC_ = E_->Create<TImpl, &TImpl::RunAsyncDns>(this, "async_dns"); - } - - return DC_; - } - - void RunAsyncDns(TCont*) { - while (!A_.Empty()) { - Y_VERIFY(!I_.Empty(), "shit happens"); - - const TDuration tout = D_.Timeout(); - - if (E_->Running()->SleepT(Max(tout, TDuration::MilliSeconds(10))) == ETIMEDOUT) { - D_.ProcessNone(); - } - - DC_->Yield(); - } - - DC_ = nullptr; - } - - void OnStateChange(SOCKET s, bool read, bool write) noexcept override { - static const ui16 WHAT[] = { - 0, CONT_POLL_READ, CONT_POLL_WRITE, CONT_POLL_READ | CONT_POLL_WRITE}; - - const int what = WHAT[((size_t)read) | (((size_t)write) << 1)]; - - TEventRef& ev = S_.Get(s); - - if (!ev) { - ev.Reset(new (&P_) TPollEvent(this, s, what)); - I_.PushBack(ev.Get()); - } else { - if (what) { - if (ev->What() != what) { - //may optimize - ev.Reset(new (&P_) TPollEvent(this, s, what)); - I_.PushBack(ev.Get()); - } - } else { - ev.Destroy(); - //auto unlink - } - } - - if (DC_) { - DC_->ReSchedule(); - } - } - -private: - TSocketMap<TEventRef> S_; - TPollEvent::TPool P_; - TContExecutor* E_; - TContSemaphore RateLimit_; - TAsyncDns D_; - TIntrusiveList<TPollEvent> I_; - TIntrusiveList<TRequest> A_; - TCont* DC_; -}; - -TContResolver::TContResolver(TContExecutor* e, const TOptions& opts) - : I_(new TImpl(e, opts)) -{ -} - -TContResolver::~TContResolver() { -} - -void TContResolver::Resolve(const TNameRequest& req) { - I_->Resolve(req); -} diff --git a/library/cpp/coroutine/dns/coro.h b/library/cpp/coroutine/dns/coro.h deleted file mode 100644 index 62a01891b8a..00000000000 --- a/library/cpp/coroutine/dns/coro.h +++ /dev/null @@ -1,22 +0,0 @@ -#pragma once - -#include "iface.h" - -#include <util/generic/ptr.h> -#include <util/generic/ylimits.h> - -class TContExecutor; - -namespace NAsyncDns { - class TContResolver { - public: - TContResolver(TContExecutor* e, const TOptions& opts = TOptions()); - ~TContResolver(); - - void Resolve(const TNameRequest& hreq); - - private: - class TImpl; - THolder<TImpl> I_; - }; -} diff --git a/library/cpp/coroutine/dns/helpers.cpp b/library/cpp/coroutine/dns/helpers.cpp deleted file mode 100644 index 21d17b5d674..00000000000 --- a/library/cpp/coroutine/dns/helpers.cpp +++ /dev/null @@ -1,148 +0,0 @@ -#include "helpers.h" -#include "coro.h" -#include "async.h" -#include "cache.h" - -#include <util/digest/city.h> -#include <util/generic/hash_set.h> - -using namespace NAddr; -using namespace NAsyncDns; - -namespace { - typedef ui64 TAddrHash; - - inline TAddrHash Hash(const IRemoteAddrRef& addr) { - return CityHash64((const char*)addr->Addr(), addr->Len()); - } - - inline IRemoteAddrRef ConstructIP4(void* data, ui16 port) { - return new TIPv4Addr(TIpAddress(*(ui32*)data, port)); - } - - inline IRemoteAddrRef ConstructIP6(void* data, ui16 port) { - sockaddr_in6 res; - - Zero(res); - - res.sin6_family = AF_INET6; - res.sin6_port = HostToInet(port); - memcpy(&res.sin6_addr.s6_addr, data, sizeof(res.sin6_addr.s6_addr)); - - return new TIPv6Addr(res); - } - - inline IRemoteAddrRef Construct(const hostent* h, void* data, ui16 port) { - switch (h->h_addrtype) { - case AF_INET: - return ConstructIP4(data, port); - - case AF_INET6: - return ConstructIP6(data, port); - } - - //real shit happens - abort(); - } - - template <class It, class T> - static bool FindByHash(It b, It e, T t) { - while (b != e) { - if (Hash(*b) == t) { - return true; - } - - ++b; - } - - return false; - } - - inline size_t LstLen(char** lst) noexcept { - size_t ret = 0; - - while (*lst) { - ++ret; - ++lst; - } - - return ret; - } -} - -void TResolveAddr::OnComplete(const TResult& r) { - const hostent* h = r.Result; - - if (!h) { - Status.push_back(r.Status); - - return; - } - - char** lst = h->h_addr_list; - - typedef THashSet<TAddrHash> THashes; - TAutoPtr<THashes> hashes; - - if ((Result.size() + LstLen(lst)) > 8) { - hashes.Reset(new THashes()); - - for (const auto& it : Result) { - hashes->insert(Hash(it)); - } - } - - while (*lst) { - IRemoteAddrRef addr = Construct(h, *lst, Port); - - if (!hashes) { - if (!FindByHash(Result.begin(), Result.end(), Hash(addr))) { - Result.push_back(addr); - } - } else { - const TAddrHash h = Hash(addr); - - if (hashes->find(h) == hashes->end()) { - hashes->insert(h); - Result.push_back(addr); - } - } - - ++lst; - } -} - -void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result) { - TResolveAddr cb(port); - - resolver.Resolve(TNameRequest(host.data(), AF_UNSPEC, &cb)); - - if (cb.Result) { - for (auto status : cb.Status) { - //we have some results, so skip empty responses for aaaa requests - CheckPartialAsyncStatus(status); - } - } else { - for (auto status : cb.Status) { - CheckAsyncStatus(status); - } - } - - cb.Result.swap(result); -} - -void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result) { - ResolveAddr(resolver, addr, 80, result); -} - -void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result, TContDnsCache* cache) { - if (cache) { - cache->LookupOrResolve(resolver, host, port, result); - } else { - ResolveAddr(resolver, host, port, result); - } -} - -void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result, TContDnsCache* cache) { - ResolveAddr(resolver, addr, 80, result, cache); -} diff --git a/library/cpp/coroutine/dns/helpers.h b/library/cpp/coroutine/dns/helpers.h deleted file mode 100644 index 5f3cc8676fd..00000000000 --- a/library/cpp/coroutine/dns/helpers.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include "iface.h" - -#include <util/network/address.h> -#include <util/generic/vector.h> - -namespace NAsyncDns { - class TContResolver; - class TContDnsCache; - using NAddr::IRemoteAddrRef; - - typedef TVector<IRemoteAddrRef> TAddrs; - - struct TResolveAddr: public IHostResult { - inline TResolveAddr(ui16 port) - : Port(port) - , Status(0) - { - } - - void OnComplete(const TResult& result) override; - - TAddrs Result; - ui16 Port; - TVector<int> Status; - }; - - //resolve addr(host + port), like TNetworkAddress - void ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result); - void ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result); - void ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result, TContDnsCache* cache); - void ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result, TContDnsCache* cache); -} diff --git a/library/cpp/coroutine/dns/iface.h b/library/cpp/coroutine/dns/iface.h deleted file mode 100644 index 0310f48f8fd..00000000000 --- a/library/cpp/coroutine/dns/iface.h +++ /dev/null @@ -1,75 +0,0 @@ -#pragma once - -#include <util/datetime/base.h> -#include <util/generic/yexception.h> - -struct hostent; - -namespace NAsyncDns { - struct TOptions { - inline TOptions() - : MaxRequests(Max()) - , Retries(5) - , TimeOut(TDuration::MilliSeconds(50)) - { - } - - inline TOptions& SetMaxRequests(size_t val) noexcept { - MaxRequests = val; - - return *this; - } - - inline TOptions& SetRetries(size_t val) noexcept { - Retries = val; - - return *this; - } - - inline TOptions& SetTimeOut(const TDuration& val) noexcept { - TimeOut = val; - - return *this; - } - - size_t MaxRequests; - size_t Retries; - TDuration TimeOut; - }; - - struct IHostResult { - struct TResult { - int Status; - int Timeouts; - const hostent* Result; - }; - - virtual ~IHostResult() = default; - - virtual void OnComplete(const TResult& result) = 0; - }; - - struct TNameRequest { - inline TNameRequest(const char* addr, int family, IHostResult* cb) - : Addr(addr) - , Family(family) - , CB(cb) - { - } - - inline TNameRequest Copy(IHostResult* cb) const noexcept { - return TNameRequest(Addr, Family, cb); - } - - inline TNameRequest Copy(int family) const noexcept { - return TNameRequest(Addr, family, CB); - } - - const char* Addr; - int Family; - IHostResult* CB; - }; - - struct TDnsError: public yexception { - }; -} diff --git a/library/cpp/coroutine/util/cosem.h b/library/cpp/coroutine/util/cosem.h deleted file mode 100644 index 5ed41a9669a..00000000000 --- a/library/cpp/coroutine/util/cosem.h +++ /dev/null @@ -1,433 +0,0 @@ -#pragma once - -#include <library/cpp/coroutine/engine/events.h> -#include <library/cpp/coroutine/engine/impl.h> -#include <library/cpp/coroutine/engine/mutex.h> -#include <library/cpp/coroutine/engine/network.h> - -#include <util/network/pair.h> -#include <util/network/poller.h> -#include <library/cpp/deprecated/atomic/atomic.h> -#include <util/system/yield.h> -#include <util/generic/noncopyable.h> - -/***************************************************************** - TContSemaphore - ^ - | - TCoSemaphore - ^ - | - TCoMutex - *****************************************************************/ - -/* - * Note: this semaphore does not try to be thread-safe. - * It is intended to use within coroutines in single thread only. - * Do not share it between threads. Use TContMtSpinlock instead. - */ - -class TContSemaphore { -public: - inline TContSemaphore(const unsigned int initial) - : mValue(initial) - { - } - - inline bool Acquire(TCont* cont) { - while (!mValue) { - mWaitQueue.WaitI(cont); - } - --mValue; - return true; - } - - inline bool Release(TCont*) { - ++mValue; - mWaitQueue.Signal(); - return true; - } - - inline bool TryAcquire(TCont*) { - if (!mValue) { - return false; - } - --mValue; - return true; - } - -protected: - TAtomic mValue; - TContWaitQueue mWaitQueue; -}; - -/***************************************************************** - TContMtSyncBase - ^ ^ - / \ - / \ - TContMtSpinlock TContMtSemaphore - ^ - | - TCoMtSpinlock - *****************************************************************/ -class TContMtSyncBase { -public: - static inline void Yield(TCont* cont) { - //printf("yield() called with cont=%p in thread %p\n", cont, pthread_self()); fflush(stdout); - if (cont) { - // Bugfeature workaround: coroutine Yield() in loop would make our priority too high to receive any other events - // so we probably never come out of this loop. So better use Sleep() instead. Unlike Yield(), Sleep(0) - // makes all pending I/Os have higher priority than us. - cont->SleepT(TDuration::Zero()); - } else { - ThreadYield(); - } - } - - static inline void SleepT(TCont* cont, unsigned time) { - if (cont) { - cont->SleepT(TDuration::MicroSeconds(time)); - } else { - ::usleep(time); - } - } -}; - -/* - * Thread-safe version of TContMutex - */ - -class TContMtSpinlock: private TContMtSyncBase { -public: - TContMtSpinlock() - : mToken(true) - { - } - - inline int LockI(TCont* cont) const { - // We do not want to lock the bus for a long time. However, we do not want to switch context - // immediately. Hope that 16 times is enough to finish most of the small operations. - for (int i = 0; i < 4; ++i) { - if (AtomicSwap(&mToken, false)) - return 0; - } - for (int i = 0; i < 8; ++i) { - if (AtomicSwap(&mToken, false)) - return 0; - Yield(cont); - } - for (int i = 0; i < 4; ++i) { - if (AtomicSwap(&mToken, false)) - return 0; - SleepT(cont, 1000); - } - while (!AtomicSwap(&mToken, false)) - SleepT(cont, 10000); - return 0; - } - - inline bool Acquire(TCont* cont) const { - return (LockI(cont) == 0); - } - - inline int LockD(TCont* cont, TInstant deadline) { - for (int i = 0; i < 4; ++i) { - if (AtomicSwap(&mToken, false)) - return 0; - } - for (int i = 0; i < 8; ++i) { - if (AtomicSwap(&mToken, false)) - return 0; - if (Now() > deadline) - return ETIMEDOUT; - Yield(cont); - } - for (int i = 0; i < 4; ++i) { - if (AtomicSwap(&mToken, false)) - return 0; - if (Now() > deadline) - return ETIMEDOUT; - SleepT(cont, 1000); - } - while (!AtomicSwap(&mToken, false)) { - if (Now() > deadline) - return ETIMEDOUT; - SleepT(cont, 10000); - } - return 0; - } - - inline int LockT(TCont* cont, TDuration timeout) { - return LockD(cont, Now() + timeout); - } - - inline void UnLock(TCont*) const { - mToken = true; - } - - inline bool Release(TCont* cont) const { - UnLock(cont); - return true; - } - - inline bool TryAcquire(TCont*) const { - return AtomicSwap(&mToken, false); - } - -private: - TAtomic mutable mToken; -}; - -/* Thread-safe version of TContSemaphore */ - -class TContMtSemaphore: private TContMtSyncBase { -public: - TContMtSemaphore(const unsigned int initial) - : mValue(initial) - { - } - - inline bool Acquire(TCont* cont) { - for (int i = 0; i < 1024; ++i) { - if (TryAcquire(cont)) - return true; - } - while (!TryAcquire(cont)) - Yield(cont); - return true; - } - - inline bool Release(TCont*) { - AtomicAdd(mValue, 1); - return true; - } - - inline bool TryAcquire(TCont*) { - TAtomic q = AtomicAdd(mValue, -1); - if (q < 0) { - AtomicAdd(mValue, 1); // this is safe even if some other thread accessed mValue - // since increments and decrements are always balanced - return false; - } - return true; - } - -private: - TAtomic mValue; -}; - -/***************************************************************** - TContSem - *****************************************************************/ -class TContSem { -public: - TContSem(void*) noexcept { - pfd[0] = pfd[1] = INVALID_SOCKET; - val = 0; - } - - ~TContSem() { - if (pfd[0] != INVALID_SOCKET) - closesocket(pfd[0]); - if (pfd[1] != INVALID_SOCKET) - closesocket(pfd[1]); - pfd[0] = pfd[1] = INVALID_SOCKET; - val = 0; - } - - bool Initialized() noexcept { - return pfd[0] != INVALID_SOCKET; - } - - int Init(unsigned short Val) noexcept { - assert(pfd[0] == INVALID_SOCKET && pfd[1] == INVALID_SOCKET); - - if (SocketPair(pfd) == SOCKET_ERROR) { - return errno; - } - - char ch = 0; - val = Val; - - for (unsigned short i = 0; i < val; i++) - if (send(pfd[1], &ch, 1, 0) != 1) { - int err = errno; - closesocket(pfd[0]); - closesocket(pfd[1]); - pfd[0] = pfd[1] = INVALID_SOCKET; - return err; - } -#if defined(_unix_) - if (fcntl(pfd[0], F_SETFL, O_NONBLOCK) == -1) - return errno; - if (fcntl(pfd[1], F_SETFL, O_NONBLOCK) == -1) - return errno; -#endif - - readPoller.WaitRead(pfd[0], this); - writePoller.WaitWrite(pfd[1], this); - - return 0; - } - - int DownD(TCont* cont, TInstant deadline) { - assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET); - assert(cont); - --val; - - char ch = 0; - int result = downMutex.LockI(cont); - if (result) - return result; - TContIOStatus status = NCoro::ReadD(cont, pfd[0], &ch, 1, deadline); - downMutex.UnLock(); - if (status.Status()) - return status.Status(); - return 0; - } - - int DownT(TCont* cont, TDuration timeout) { - return DownD(cont, Now() + timeout); - } - - int DownI(TCont* cont) { - return DownD(cont, TInstant::Max()); - } - - int Down() { - assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET); - --val; - - char ch = 0; - while (true) { - int result = recv(pfd[0], &ch, 1, 0); - if (result == INVALID_SOCKET) { - if (errno != EINTR && errno != EAGAIN) { - return errno; - } - } else { - break; - } - void* pollerResult = readPoller.WaitI(); - Y_ASSERT(pollerResult == this); - } - return 0; - } - - int Up(TCont* cont) { - assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET); - assert(cont); - ++val; - - char ch = 0; - upMutex.LockI(cont); - TContIOStatus status = NCoro::WriteI(cont, pfd[1], &ch, 1); - upMutex.UnLock(); - if (status.Status()) - return status.Status(); - return 0; - } - - int Up() { - assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET); - ++val; - - char ch = 0; - while (true) { - int result = send(pfd[1], &ch, 1, 0); - if (result == INVALID_SOCKET) { - if (errno != EINTR && errno != EAGAIN) { - return errno; - } - } else { - break; - } - void* pollerResult = writePoller.WaitI(); - Y_ASSERT(pollerResult == this); - } - return 0; - } - -private: - SOCKET pfd[2]; - unsigned short val; - TSocketPoller readPoller; - TSocketPoller writePoller; - TContMutex downMutex; - TContMutex upMutex; - - // forbid copying - TContSem(const TContSem&) { - } - TContSem& operator=(const TContSem&) { - return *this; - } -}; - -class TCoMtMutex { -public: - TCoMtMutex() - : Mutex(this) - { - Mutex.Init(1); - } - - void Acquire() { - Mutex.Down(); - } - - void Acquire(TCont* c) { - Mutex.DownI(c); - } - - void Release(TCont* c) { - Mutex.Up(c); - } - - void Release() { - Mutex.Up(); - } - -private: - TContSem Mutex; -}; - -/***************************************************************** - GUARDS - *****************************************************************/ -class SemaphoreGuard : TNonCopyable { -public: - inline SemaphoreGuard(TCont* cont, TContSemaphore& semaphore) - : mSemaphore(semaphore) - , mCont(cont) - { - mSemaphore.Acquire(mCont); - } - - inline ~SemaphoreGuard() { - mSemaphore.Release(mCont); - } - -private: - TContSemaphore& mSemaphore; - TCont* mCont; -}; - -class ContGuard { -public: - inline ContGuard(TCont* cont, TContMtSpinlock& lock) - : mLock(lock) - , mCont(cont) - { - mLock.LockI(mCont); - } - - inline ~ContGuard() { - mLock.UnLock(mCont); - } - -private: - TContMtSpinlock& mLock; - TCont* mCont; -}; diff --git a/library/cpp/coroutine/util/pipeevent.h b/library/cpp/coroutine/util/pipeevent.h deleted file mode 100644 index 059f2a34d9a..00000000000 --- a/library/cpp/coroutine/util/pipeevent.h +++ /dev/null @@ -1,93 +0,0 @@ -#pragma once - -#include <library/cpp/coroutine/engine/events.h> -#include <library/cpp/coroutine/engine/impl.h> -#include <library/cpp/coroutine/engine/network.h> - -#include <util/network/socket.h> -#include <library/cpp/deprecated/atomic/atomic.h> -#include <util/system/pipe.h> - -// TPipeEvent and TPipeSemaphore try to minimize number of coroutines reading from same pipe -// because its actually quite expensive with >1000 coroutines waiting on same event/semaphore - -// Using this class you can block in coroutine on waiting -// a signal from outer thread. -// Usual thread synchronization primitives are not appropriate -// because they will block all coroutines in single thread. -class TPipeEvent { -public: - explicit TPipeEvent() - : Signaled(0) - , NumWaiting(0) - { - TPipeHandle::Pipe(SignalRecvPipe, SignalSendPipe); - SetNonBlock(SignalRecvPipe); - SetNonBlock(SignalSendPipe); - } - - void Signal() { - if (AtomicCas(&Signaled, 1, 0)) { - char tmp = 1; - SignalSendPipe.Write(&tmp, 1); - } - } - - void Wait(TCont* cont) { - if (++NumWaiting > 1) { - ToWake.WaitI(cont); - } - char tmp; - NCoro::ReadI(cont, SignalRecvPipe, &tmp, 1).Checked(); - AtomicSet(Signaled, 0); - if (--NumWaiting > 0) { - ToWake.Signal(); - } - } - -private: - TAtomic Signaled; - size_t NumWaiting; - TContWaitQueue ToWake; - TPipeHandle SignalRecvPipe; - TPipeHandle SignalSendPipe; -}; - -class TPipeSemaphore { -public: - explicit TPipeSemaphore() - : Value(0) - , NumWaiting(0) - { - TPipeHandle::Pipe(SignalRecvPipe, SignalSendPipe); - SetNonBlock(SignalRecvPipe); - SetNonBlock(SignalSendPipe); - } - - void Inc() { - if (AtomicIncrement(Value) <= 0) { - char tmp = 1; - SignalSendPipe.Write(&tmp, 1); - } - } - - void Dec(TCont* cont) { - if (AtomicDecrement(Value) < 0) { - if (++NumWaiting > 1) { - ToWake.WaitI(cont); - } - char tmp; - NCoro::ReadI(cont, SignalRecvPipe, &tmp, 1).Checked(); - if (--NumWaiting > 0) { - ToWake.Signal(); - } - } - } - -private: - TAtomic Value; - size_t NumWaiting; - TContWaitQueue ToWake; - TPipeHandle SignalRecvPipe; - TPipeHandle SignalSendPipe; -}; diff --git a/library/cpp/coroutine/util/pipeque.h b/library/cpp/coroutine/util/pipeque.h deleted file mode 100644 index 8a197f5bff6..00000000000 --- a/library/cpp/coroutine/util/pipeque.h +++ /dev/null @@ -1,262 +0,0 @@ -#pragma once - -#include <util/network/sock.h> -#include <util/network/pair.h> -#include <util/generic/intrlist.h> -#include <util/system/mutex.h> -#include <util/memory/smallobj.h> - -class TConnectedStreamSocket: public TStreamSocket { -public: - TConnectedStreamSocket(SOCKET fd) - : TStreamSocket(fd) - { - } - - ssize_t Send(const void* msg, size_t len, int flags = 0) { - ssize_t ret = 0; - do { - ret = TStreamSocket::Send(msg, len, flags); - } while (ret == -EINTR); - return ret; - } - - ssize_t Recv(void* buf, size_t len, int flags = 0) { - ssize_t ret = 0; - do { - ret = TStreamSocket::Recv(buf, len, flags); - } while (ret == -EINTR); - return ret; - } -}; - -// Limited size, syncronized queue based on intrusive list. -// Reader and writer both wait on file descriptors instead -// of traditional cond vars. That allows the queue to be -// employed both in (POSIX) threads and coroutines. -template <class T> -class TBasicPipeQueue { -public: - // derive your T from TBasicPipeQueue<T>::Item - typedef TIntrusiveListItem<T> TItem; - static const size_t MaxQueueSizeDef = (1 << 6); // fits default socket buffer sizes on our Linux and FreeBSD servers - - TBasicPipeQueue(bool isPushBlocking = true, - bool isPopBlocking = true, - size_t maxQueueSize = MaxQueueSizeDef) - : CurSize(0) - , MaxQueueSize(maxQueueSize) - , PushSock(INVALID_SOCKET) - , PopSock(INVALID_SOCKET) - , ShouldContinue(true) - { - SOCKET socks[2]; - socks[0] = INVALID_SOCKET; - socks[1] = INVALID_SOCKET; - if (SocketPair(socks)) - ythrow yexception() << "TBasicPipeQueue: can't create socket pair: " << LastSystemError(); - TSocketHolder pushSock(socks[0]); - TSocketHolder popSock(socks[1]); - PushSock.Swap(pushSock); - PopSock.Swap(popSock); - if (!isPushBlocking) - SetNonBlock(PushFd(), true); - if (!isPopBlocking) - SetNonBlock(PopFd(), true); - - char b = '\0'; - for (size_t i = 0; i < MaxQueueSize; i++) { - if (PopSock.Send(&b, 1) != 1) { - Y_VERIFY(false, "TBasicPipeQueue: queue size too big (%" PRISZT "): make it less then %" PRISZT " or increase limits", MaxQueueSize, i); - } - } - } - - ~TBasicPipeQueue() { - } - - void Stop() { - ShouldContinue = false; - } - - bool Push(T* t) { - char b = '\0'; - ssize_t r = 0; - if (!ShouldContinue) - return false; - - // make sure the queue has enough space to write to (wait for reading on PushSock or block on Recv) - r = PushSock.Recv(&b, 1); - if (r == 0) { - return false; - } else if (r < 0) { - if (errno == EAGAIN) - return false; - else - ythrow yexception() << "TBasicPipeQueue: error in Push(Recv): " << LastSystemError(); - } - - Y_VERIFY(!IsFull(), "TBasicPipeQueue: no space left"); - - if (!ShouldContinue) - return false; - - { - TGuard<TMutex> guard(Mutex); - List.PushBack(t); - CurSize++; - } - - // signal that the queue has item - r = PushSock.Send(&b, 1); - if (r <= 0) - ythrow yexception() << "TBasicPipeQueue: error in Push(Send): " << LastSystemError(); - - return true; - } - - T* Pop() { - char b = '\0'; - // wait for an item to appear in the queue - ssize_t r = PopSock.Recv(&b, 1); - if (r == 0) { - return nullptr; - } else if (r < 0) { - if (errno == EAGAIN) - return nullptr; - else - ythrow yexception() << "TBasicPipeQueue: error in Pop(Recv): " << LastSystemError(); - } - - T* t = nullptr; - { - TGuard<TMutex> guard(Mutex); - t = List.PopFront(); - CurSize--; - } - - // signal that the queue has a free slot - r = PopSock.Send(&b, 1); - if (r <= 0) - ythrow yexception() << "TBasicPipeQueue: error in Pop(Send): " << LastSystemError(); - - return t; - } - - SOCKET PushFd() { - return (SOCKET)PushSock; - } - - SOCKET PopFd() { - return (SOCKET)PopSock; - } - - size_t Size() const { - TGuard<TMutex> guard(Mutex); - size_t curSize = CurSize; - return curSize; - } - - bool Empty() const { - TGuard<TMutex> guard(Mutex); - bool empty = CurSize == 0; - return empty; - } - - bool IsFull() const { - TGuard g{Mutex}; - return CurSize >= MaxQueueSize; - } - -protected: - typedef TIntrusiveList<T> TListType; - TListType List; - size_t CurSize; - size_t MaxQueueSize; - TConnectedStreamSocket PushSock; - TConnectedStreamSocket PopSock; - TMutex Mutex; - bool ShouldContinue; - /** - * --{push(write)}--> PushSock ---> PopSock --{pop(read)}--> - */ -}; - -template <class T> -class TSyncSmallObjAllocator { -public: - TSyncSmallObjAllocator(IAllocator* alloc = TDefaultAllocator::Instance()) - : Allocator(alloc) - { - } - - void Release(T* t) { - t->~T(); - { - TGuard<TMutex> guard(Mutex); - Allocator.Release(t); - } - } - - T* Alloc() { - void* t; - { - TGuard<TMutex> guard(Mutex); - t = Allocator.Allocate(); - } - return new (t) T(); - } - -protected: - TSmallObjAllocator<T> Allocator; - TMutex Mutex; -}; - -template <class T> -struct TPipeQueueItem: public TIntrusiveListItem<TPipeQueueItem<T>> { - T Val; -}; - -template <class T> -class TPipeQueue - : protected TSyncSmallObjAllocator<TPipeQueueItem<T>>, - protected TBasicPipeQueue<TPipeQueueItem<T>> { - typedef TPipeQueueItem<T> TItem; - typedef TSyncSmallObjAllocator<TItem> TAlloc; - typedef TBasicPipeQueue<TItem> TBaseQueue; - -public: - TPipeQueue(bool isPushBlocking = true, - bool isPopBlocking = true, - size_t maxQueueSize = TBaseQueue::MaxQueueSizeDef, - IAllocator* alloc = TDefaultAllocator::Instance()) - : TAlloc(alloc) - , TBaseQueue(isPushBlocking, isPopBlocking, maxQueueSize) - { - } - - bool Push(const T& t) { - TItem* item = TAlloc::Alloc(); - item->Val = t; - bool res = TBaseQueue::Push(item); - if (!res) - TAlloc::Release(item); - return res; - } - - bool Pop(T* t) { - TItem* item = TBaseQueue::Pop(); - if (item == nullptr) - return false; - - *t = item->Val; - TAlloc::Release(item); - return true; - } - - using TBaseQueue::Empty; - using TBaseQueue::PopFd; - using TBaseQueue::PushFd; - using TBaseQueue::Size; - using TBaseQueue::Stop; -}; |