diff options
author | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2022-11-30 23:47:12 +0300 |
commit | 22f8ae0e3f5d68b92aecccdf96c1d841a0334311 (patch) | |
tree | bffa27765faf54126ad44bcafa89fadecb7a73d7 /library/cpp/coroutine | |
parent | 332b99e2173f0425444abb759eebcb2fafaa9209 (diff) | |
download | ydb-22f8ae0e3f5d68b92aecccdf96c1d841a0334311.tar.gz |
validate canons without yatest_common
Diffstat (limited to 'library/cpp/coroutine')
-rw-r--r-- | library/cpp/coroutine/dns/async.cpp | 149 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/async.h | 32 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/cache.cpp | 131 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/cache.h | 54 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/coro.cpp | 225 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/coro.h | 22 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/helpers.cpp | 148 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/helpers.h | 34 | ||||
-rw-r--r-- | library/cpp/coroutine/dns/iface.h | 75 | ||||
-rw-r--r-- | library/cpp/coroutine/util/cosem.h | 433 | ||||
-rw-r--r-- | library/cpp/coroutine/util/pipeevent.h | 93 | ||||
-rw-r--r-- | library/cpp/coroutine/util/pipeque.h | 262 |
12 files changed, 1658 insertions, 0 deletions
diff --git a/library/cpp/coroutine/dns/async.cpp b/library/cpp/coroutine/dns/async.cpp new file mode 100644 index 0000000000..be35135828 --- /dev/null +++ b/library/cpp/coroutine/dns/async.cpp @@ -0,0 +1,149 @@ +#include "async.h" + +#include <util/generic/singleton.h> +#include <util/generic/vector.h> + +#include <contrib/libs/c-ares/include/ares.h> + +using namespace NAsyncDns; + +namespace { + struct TAresError: public TDnsError { + inline TAresError(int code) { + (*this) << ares_strerror(code); + } + }; + + struct TAresInit { + inline TAresInit() { + const int code = ares_library_init(ARES_LIB_INIT_ALL); + + if (code) { + ythrow TAresError(code) << "can not init ares engine"; + } + } + + inline ~TAresInit() { + ares_library_cleanup(); + } + + static inline void InitOnce() { + Singleton<TAresInit>(); + } + }; +} + +class TAsyncDns::TImpl { +public: + inline TImpl(IPoller* poller, const TOptions& o) + : P_(poller) + { + TAresInit::InitOnce(); + + ares_options opts; + + Zero(opts); + + int optflags = 0; + + optflags |= ARES_OPT_FLAGS; + opts.flags = ARES_FLAG_STAYOPEN; + + optflags |= ARES_OPT_TIMEOUTMS; + opts.timeout = o.TimeOut.MilliSeconds(); + + optflags |= ARES_OPT_TRIES; + opts.tries = o.Retries; + + optflags |= ARES_OPT_SOCK_STATE_CB; + opts.sock_state_cb = (decltype(opts.sock_state_cb))StateCb; + static_assert(sizeof(opts.sock_state_cb) == sizeof(&StateCb), "Inconsistent socket state size"); + opts.sock_state_cb_data = this; + + const int code = ares_init_options(&H_, &opts, optflags); + + if (code) { + ythrow TAresError(code) << "can not init ares channel"; + } + } + + inline ~TImpl() { + ares_destroy(H_); + } + + inline TDuration Timeout() { + struct timeval tv; + Zero(tv); + + ares_timeout(H_, nullptr, &tv); + + return TDuration(tv); + } + + template <class T> + inline void ProcessSocket(T s) { + ares_process_fd(H_, s, s); + } + + inline void ProcessNone() { + ProcessSocket(ARES_SOCKET_BAD); + } + + inline void AsyncResolve(const TNameRequest& req) { + ares_gethostbyname(H_, req.Addr, req.Family, AsyncResolveHostCb, req.CB); + } + +private: + static void StateCb(void* arg, int s, int read, int write) { + ((TImpl*)arg)->P_->OnStateChange((SOCKET)s, (bool)read, (bool)write); + } + + static void AsyncResolveHostCb(void* arg, int status, int timeouts, hostent* he) { + const IHostResult::TResult res = { + status, timeouts, he}; + + ((IHostResult*)arg)->OnComplete(res); + } + +private: + IPoller* P_; + ares_channel H_; +}; + +void NAsyncDns::CheckAsyncStatus(int status) { + if (status) { + ythrow TAresError(status); + } +} + +void NAsyncDns::CheckPartialAsyncStatus(int status) { + if (status == ARES_ENODATA) { + return; + } + + CheckAsyncStatus(status); +} + +TAsyncDns::TAsyncDns(IPoller* poller, const TOptions& opts) + : I_(new TImpl(poller, opts)) +{ +} + +TAsyncDns::~TAsyncDns() { +} + +void TAsyncDns::AsyncResolve(const TNameRequest& req) { + I_->AsyncResolve(req); +} + +TDuration TAsyncDns::Timeout() { + return I_->Timeout(); +} + +void TAsyncDns::ProcessSocket(SOCKET s) { + I_->ProcessSocket(s); +} + +void TAsyncDns::ProcessNone() { + I_->ProcessNone(); +} diff --git a/library/cpp/coroutine/dns/async.h b/library/cpp/coroutine/dns/async.h new file mode 100644 index 0000000000..1ec26bc91d --- /dev/null +++ b/library/cpp/coroutine/dns/async.h @@ -0,0 +1,32 @@ +#pragma once + +#include "iface.h" + +#include <util/network/socket.h> +#include <util/datetime/base.h> +#include <util/generic/ptr.h> + +namespace NAsyncDns { + struct IPoller { + virtual void OnStateChange(SOCKET s, bool read, bool write) = 0; + }; + + class TAsyncDns { + public: + TAsyncDns(IPoller* poller, const TOptions& opts = TOptions()); + ~TAsyncDns(); + + void AsyncResolve(const TNameRequest& req); + + TDuration Timeout(); + void ProcessSocket(SOCKET s); + void ProcessNone(); + + private: + class TImpl; + THolder<TImpl> I_; + }; + + void CheckAsyncStatus(int status); + void CheckPartialAsyncStatus(int status); +} diff --git a/library/cpp/coroutine/dns/cache.cpp b/library/cpp/coroutine/dns/cache.cpp new file mode 100644 index 0000000000..69910a7960 --- /dev/null +++ b/library/cpp/coroutine/dns/cache.cpp @@ -0,0 +1,131 @@ +#include "cache.h" + +#include "async.h" +#include "coro.h" +#include "helpers.h" + +#include <library/cpp/cache/cache.h> + +#include <library/cpp/coroutine/engine/impl.h> +#include <library/cpp/coroutine/engine/events.h> + +using namespace NAddr; +using namespace NAsyncDns; + +class TContDnsCache::TImpl { + using TKey = std::pair<TString, ui16>; + + enum EResolveState { + RS_NOT_RESOLVED, + RS_IN_PROGRESS, + RS_RESOLVED, + RS_FAILED, + RS_EXPIRED + }; + + struct TEntry: public TSimpleRefCount<TEntry> { + TAddrs Addrs_; + TInstant ResolveTimestamp_; + EResolveState State_; + TContSimpleEvent ResolvedEvent_; + TString LastResolveError_; + + TEntry(TContExecutor* e) + : State_(RS_NOT_RESOLVED) + , ResolvedEvent_(e) + { + } + }; + + using TEntryRef = TIntrusivePtr<TEntry>; + +public: + inline TImpl(TContExecutor* e, const TCacheOptions& opts) + : Executor_(e) + , Opts_(opts) + , Cache_(opts.MaxSize) + { + } + + inline ~TImpl() { + } + + void LookupOrResolve(TContResolver& resolver, const TString& addr, ui16 port, TAddrs& result) { + TKey cacheKey(addr, port); + + TEntryRef e; + auto iter = Cache_.Find(cacheKey); + if (iter == Cache_.End()) { + e = MakeIntrusive<TEntry>(Executor_); + Cache_.Insert(cacheKey, e); + } else { + e = iter.Value(); + } + + while (true) { + switch (e->State_) { + case RS_NOT_RESOLVED: + case RS_EXPIRED: + e->State_ = RS_IN_PROGRESS; + try { + ResolveAddr(resolver, addr, port, result); + } catch (TDnsError& err) { + e->ResolveTimestamp_ = TInstant::Now(); + e->LastResolveError_ = err.AsStrBuf(); + e->State_ = RS_FAILED; + e->ResolvedEvent_.BroadCast(); + throw; + } catch (...) { + // errors not related to DNS + e->State_ = RS_NOT_RESOLVED; + e->ResolvedEvent_.BroadCast(); + throw; + } + e->ResolveTimestamp_ = TInstant::Now(); + e->Addrs_ = result; + e->State_ = RS_RESOLVED; + e->ResolvedEvent_.BroadCast(); + return; + case RS_IN_PROGRESS: + e->ResolvedEvent_.WaitI(); + continue; + case RS_RESOLVED: + if (e->ResolveTimestamp_ + Opts_.EntryLifetime < TInstant::Now()) { + e->State_ = RS_EXPIRED; + continue; // try and resolve again + } + result = e->Addrs_; + return; + case RS_FAILED: + if (e->ResolveTimestamp_ + Opts_.NotFoundLifetime < TInstant::Now()) { + e->State_ = RS_EXPIRED; + continue; // try and resolve again + } + ythrow TDnsError() << e->LastResolveError_; + default: + Y_FAIL("Bad state, shoult not get here"); + }; + } + } + +private: + TContExecutor* Executor_; + const TCacheOptions Opts_; + TLRUCache<TKey, TEntryRef> Cache_; +}; + +TContDnsCache::TContDnsCache(TContExecutor* e, const NAsyncDns::TCacheOptions& opts) + : I_(MakeHolder<TImpl>(e, opts)) +{ +} + +TContDnsCache::~TContDnsCache() { +} + +void TContDnsCache::LookupOrResolve(TContResolver& resolver, const TString& addr, ui16 port, TVector<NAddr::IRemoteAddrRef>& result) { + return I_->LookupOrResolve(resolver, addr, port, result); +} + +void TContDnsCache::LookupOrResolve(TContResolver& resolver, const TString& addr, TVector<NAddr::IRemoteAddrRef>& result) { + LookupOrResolve(resolver, addr, 80, result); +} diff --git a/library/cpp/coroutine/dns/cache.h b/library/cpp/coroutine/dns/cache.h new file mode 100644 index 0000000000..7c358ce591 --- /dev/null +++ b/library/cpp/coroutine/dns/cache.h @@ -0,0 +1,54 @@ +#pragma once + +#include "iface.h" + +#include <util/generic/ptr.h> +#include <util/generic/vector.h> +#include <util/network/address.h> + +class TContExecutor; + +namespace NAsyncDns { + class TContResolver; + + struct TCacheOptions { + inline TCacheOptions() { + } + + inline TCacheOptions& SetEntryLifetime(const TDuration& val) noexcept { + EntryLifetime = val; + + return *this; + } + + inline TCacheOptions& SetNotFoundLifetime(const TDuration& val) noexcept { + NotFoundLifetime = val; + + return *this; + } + + inline TCacheOptions& SetMaxSize(size_t val) noexcept { + MaxSize = val; + + return *this; + } + + size_t MaxSize = 512; + TDuration EntryLifetime = TDuration::Seconds(1800); + TDuration NotFoundLifetime = TDuration::Seconds(1); + }; + + // MT-unsafe LRU DNS cache + class TContDnsCache { + public: + TContDnsCache(TContExecutor* e, const TCacheOptions& opts = {}); + ~TContDnsCache(); + + void LookupOrResolve(TContResolver& resolver, const TString& addr, ui16 port, TVector<NAddr::IRemoteAddrRef>& result); + void LookupOrResolve(TContResolver& resolver, const TString& addr, TVector<NAddr::IRemoteAddrRef>& result); + + private: + class TImpl; + THolder<TImpl> I_; + }; +} diff --git a/library/cpp/coroutine/dns/coro.cpp b/library/cpp/coroutine/dns/coro.cpp new file mode 100644 index 0000000000..9d6a89f6e7 --- /dev/null +++ b/library/cpp/coroutine/dns/coro.cpp @@ -0,0 +1,225 @@ +#include "coro.h" + +#include "async.h" + +#include <library/cpp/coroutine/engine/events.h> +#include <library/cpp/coroutine/engine/impl.h> + +#include <util/generic/vector.h> +#include <util/memory/smallobj.h> + +using namespace NAsyncDns; + +class TContResolver::TImpl: public IPoller { + struct TPollEvent: + public NCoro::IPollEvent, + public TIntrusiveListItem<TPollEvent>, + public TObjectFromPool<TPollEvent> + { + inline TPollEvent(TImpl* parent, SOCKET s, int what) + : IPollEvent(s, what) + , P(parent) + { + P->E_->Poller()->Schedule(this); + } + + inline ~TPollEvent() override { + P->E_->Poller()->Remove(this); + } + + void OnPollEvent(int) noexcept override { + P->D_.ProcessSocket(Fd()); + } + + TImpl* P; + }; + + typedef TAutoPtr<TPollEvent> TEventRef; + + struct TOneShotEvent { + inline TOneShotEvent(TContExecutor* e) noexcept + : E(e) + , S(false) + { + } + + inline void Signal() noexcept { + S = true; + E.Signal(); + } + + inline void Wait() noexcept { + if (S) { + return; + } + + E.WaitI(); + } + + TContSimpleEvent E; + bool S; + }; + + struct TContSemaphore { + inline TContSemaphore(TContExecutor* e, size_t num) noexcept + : E(e) + , N(num) + { + } + + inline void Acquire() noexcept { + while (!N) { + E.WaitI(); + } + + --N; + } + + inline void Release() noexcept { + ++N; + E.Signal(); + } + + TContSimpleEvent E; + size_t N; + }; + + struct TRequest: public TIntrusiveListItem<TRequest>, public TOneShotEvent, public IHostResult { + inline TRequest(const TNameRequest* req, TContExecutor* e) + : TOneShotEvent(e) + , Req(req) + { + } + + void OnComplete(const TResult& res) override { + Req->CB->OnComplete(res); + Signal(); + } + + const TNameRequest* Req; + }; + +public: + inline TImpl(TContExecutor* e, const TOptions& opts) + : P_(TDefaultAllocator::Instance()) + , E_(e) + , RateLimit_(E_, opts.MaxRequests) + , D_(this, opts) + , DC_(nullptr) + { + } + + inline ~TImpl() { + Y_VERIFY(DC_ == nullptr, "shit happens"); + } + + inline void Resolve(const TNameRequest& hreq) { + TGuard<TContSemaphore> g(RateLimit_); + + if (hreq.Family == AF_UNSPEC) { + const TNameRequest hreq1(hreq.Copy(AF_INET)); + TRequest req1(&hreq1, E_); + + const TNameRequest hreq2(hreq.Copy(AF_INET6)); + TRequest req2(&hreq2, E_); + + Schedule(&req1); + Schedule(&req2); + + req1.Wait(); + req2.Wait(); + } else { + TRequest req(&hreq, E_); + + Schedule(&req); + + req.Wait(); + } + + if (A_.Empty() && DC_) { + DC_->ReSchedule(); + } + } + +private: + inline void Schedule(TRequest* req) { + D_.AsyncResolve(req->Req->Copy(req)); + A_.PushBack(req); + ResolveCont()->ReSchedule(); + } + + inline TCont* ResolveCont() { + if (!DC_) { + DC_ = E_->Create<TImpl, &TImpl::RunAsyncDns>(this, "async_dns"); + } + + return DC_; + } + + void RunAsyncDns(TCont*) { + while (!A_.Empty()) { + Y_VERIFY(!I_.Empty(), "shit happens"); + + const TDuration tout = D_.Timeout(); + + if (E_->Running()->SleepT(Max(tout, TDuration::MilliSeconds(10))) == ETIMEDOUT) { + D_.ProcessNone(); + } + + DC_->Yield(); + } + + DC_ = nullptr; + } + + void OnStateChange(SOCKET s, bool read, bool write) noexcept override { + static const ui16 WHAT[] = { + 0, CONT_POLL_READ, CONT_POLL_WRITE, CONT_POLL_READ | CONT_POLL_WRITE}; + + const int what = WHAT[((size_t)read) | (((size_t)write) << 1)]; + + TEventRef& ev = S_.Get(s); + + if (!ev) { + ev.Reset(new (&P_) TPollEvent(this, s, what)); + I_.PushBack(ev.Get()); + } else { + if (what) { + if (ev->What() != what) { + //may optimize + ev.Reset(new (&P_) TPollEvent(this, s, what)); + I_.PushBack(ev.Get()); + } + } else { + ev.Destroy(); + //auto unlink + } + } + + if (DC_) { + DC_->ReSchedule(); + } + } + +private: + TSocketMap<TEventRef> S_; + TPollEvent::TPool P_; + TContExecutor* E_; + TContSemaphore RateLimit_; + TAsyncDns D_; + TIntrusiveList<TPollEvent> I_; + TIntrusiveList<TRequest> A_; + TCont* DC_; +}; + +TContResolver::TContResolver(TContExecutor* e, const TOptions& opts) + : I_(new TImpl(e, opts)) +{ +} + +TContResolver::~TContResolver() { +} + +void TContResolver::Resolve(const TNameRequest& req) { + I_->Resolve(req); +} diff --git a/library/cpp/coroutine/dns/coro.h b/library/cpp/coroutine/dns/coro.h new file mode 100644 index 0000000000..62a01891b8 --- /dev/null +++ b/library/cpp/coroutine/dns/coro.h @@ -0,0 +1,22 @@ +#pragma once + +#include "iface.h" + +#include <util/generic/ptr.h> +#include <util/generic/ylimits.h> + +class TContExecutor; + +namespace NAsyncDns { + class TContResolver { + public: + TContResolver(TContExecutor* e, const TOptions& opts = TOptions()); + ~TContResolver(); + + void Resolve(const TNameRequest& hreq); + + private: + class TImpl; + THolder<TImpl> I_; + }; +} diff --git a/library/cpp/coroutine/dns/helpers.cpp b/library/cpp/coroutine/dns/helpers.cpp new file mode 100644 index 0000000000..21d17b5d67 --- /dev/null +++ b/library/cpp/coroutine/dns/helpers.cpp @@ -0,0 +1,148 @@ +#include "helpers.h" +#include "coro.h" +#include "async.h" +#include "cache.h" + +#include <util/digest/city.h> +#include <util/generic/hash_set.h> + +using namespace NAddr; +using namespace NAsyncDns; + +namespace { + typedef ui64 TAddrHash; + + inline TAddrHash Hash(const IRemoteAddrRef& addr) { + return CityHash64((const char*)addr->Addr(), addr->Len()); + } + + inline IRemoteAddrRef ConstructIP4(void* data, ui16 port) { + return new TIPv4Addr(TIpAddress(*(ui32*)data, port)); + } + + inline IRemoteAddrRef ConstructIP6(void* data, ui16 port) { + sockaddr_in6 res; + + Zero(res); + + res.sin6_family = AF_INET6; + res.sin6_port = HostToInet(port); + memcpy(&res.sin6_addr.s6_addr, data, sizeof(res.sin6_addr.s6_addr)); + + return new TIPv6Addr(res); + } + + inline IRemoteAddrRef Construct(const hostent* h, void* data, ui16 port) { + switch (h->h_addrtype) { + case AF_INET: + return ConstructIP4(data, port); + + case AF_INET6: + return ConstructIP6(data, port); + } + + //real shit happens + abort(); + } + + template <class It, class T> + static bool FindByHash(It b, It e, T t) { + while (b != e) { + if (Hash(*b) == t) { + return true; + } + + ++b; + } + + return false; + } + + inline size_t LstLen(char** lst) noexcept { + size_t ret = 0; + + while (*lst) { + ++ret; + ++lst; + } + + return ret; + } +} + +void TResolveAddr::OnComplete(const TResult& r) { + const hostent* h = r.Result; + + if (!h) { + Status.push_back(r.Status); + + return; + } + + char** lst = h->h_addr_list; + + typedef THashSet<TAddrHash> THashes; + TAutoPtr<THashes> hashes; + + if ((Result.size() + LstLen(lst)) > 8) { + hashes.Reset(new THashes()); + + for (const auto& it : Result) { + hashes->insert(Hash(it)); + } + } + + while (*lst) { + IRemoteAddrRef addr = Construct(h, *lst, Port); + + if (!hashes) { + if (!FindByHash(Result.begin(), Result.end(), Hash(addr))) { + Result.push_back(addr); + } + } else { + const TAddrHash h = Hash(addr); + + if (hashes->find(h) == hashes->end()) { + hashes->insert(h); + Result.push_back(addr); + } + } + + ++lst; + } +} + +void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result) { + TResolveAddr cb(port); + + resolver.Resolve(TNameRequest(host.data(), AF_UNSPEC, &cb)); + + if (cb.Result) { + for (auto status : cb.Status) { + //we have some results, so skip empty responses for aaaa requests + CheckPartialAsyncStatus(status); + } + } else { + for (auto status : cb.Status) { + CheckAsyncStatus(status); + } + } + + cb.Result.swap(result); +} + +void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result) { + ResolveAddr(resolver, addr, 80, result); +} + +void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result, TContDnsCache* cache) { + if (cache) { + cache->LookupOrResolve(resolver, host, port, result); + } else { + ResolveAddr(resolver, host, port, result); + } +} + +void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result, TContDnsCache* cache) { + ResolveAddr(resolver, addr, 80, result, cache); +} diff --git a/library/cpp/coroutine/dns/helpers.h b/library/cpp/coroutine/dns/helpers.h new file mode 100644 index 0000000000..5f3cc8676f --- /dev/null +++ b/library/cpp/coroutine/dns/helpers.h @@ -0,0 +1,34 @@ +#pragma once + +#include "iface.h" + +#include <util/network/address.h> +#include <util/generic/vector.h> + +namespace NAsyncDns { + class TContResolver; + class TContDnsCache; + using NAddr::IRemoteAddrRef; + + typedef TVector<IRemoteAddrRef> TAddrs; + + struct TResolveAddr: public IHostResult { + inline TResolveAddr(ui16 port) + : Port(port) + , Status(0) + { + } + + void OnComplete(const TResult& result) override; + + TAddrs Result; + ui16 Port; + TVector<int> Status; + }; + + //resolve addr(host + port), like TNetworkAddress + void ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result); + void ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result); + void ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result, TContDnsCache* cache); + void ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result, TContDnsCache* cache); +} diff --git a/library/cpp/coroutine/dns/iface.h b/library/cpp/coroutine/dns/iface.h new file mode 100644 index 0000000000..0310f48f8f --- /dev/null +++ b/library/cpp/coroutine/dns/iface.h @@ -0,0 +1,75 @@ +#pragma once + +#include <util/datetime/base.h> +#include <util/generic/yexception.h> + +struct hostent; + +namespace NAsyncDns { + struct TOptions { + inline TOptions() + : MaxRequests(Max()) + , Retries(5) + , TimeOut(TDuration::MilliSeconds(50)) + { + } + + inline TOptions& SetMaxRequests(size_t val) noexcept { + MaxRequests = val; + + return *this; + } + + inline TOptions& SetRetries(size_t val) noexcept { + Retries = val; + + return *this; + } + + inline TOptions& SetTimeOut(const TDuration& val) noexcept { + TimeOut = val; + + return *this; + } + + size_t MaxRequests; + size_t Retries; + TDuration TimeOut; + }; + + struct IHostResult { + struct TResult { + int Status; + int Timeouts; + const hostent* Result; + }; + + virtual ~IHostResult() = default; + + virtual void OnComplete(const TResult& result) = 0; + }; + + struct TNameRequest { + inline TNameRequest(const char* addr, int family, IHostResult* cb) + : Addr(addr) + , Family(family) + , CB(cb) + { + } + + inline TNameRequest Copy(IHostResult* cb) const noexcept { + return TNameRequest(Addr, Family, cb); + } + + inline TNameRequest Copy(int family) const noexcept { + return TNameRequest(Addr, family, CB); + } + + const char* Addr; + int Family; + IHostResult* CB; + }; + + struct TDnsError: public yexception { + }; +} diff --git a/library/cpp/coroutine/util/cosem.h b/library/cpp/coroutine/util/cosem.h new file mode 100644 index 0000000000..5ed41a9669 --- /dev/null +++ b/library/cpp/coroutine/util/cosem.h @@ -0,0 +1,433 @@ +#pragma once + +#include <library/cpp/coroutine/engine/events.h> +#include <library/cpp/coroutine/engine/impl.h> +#include <library/cpp/coroutine/engine/mutex.h> +#include <library/cpp/coroutine/engine/network.h> + +#include <util/network/pair.h> +#include <util/network/poller.h> +#include <library/cpp/deprecated/atomic/atomic.h> +#include <util/system/yield.h> +#include <util/generic/noncopyable.h> + +/***************************************************************** + TContSemaphore + ^ + | + TCoSemaphore + ^ + | + TCoMutex + *****************************************************************/ + +/* + * Note: this semaphore does not try to be thread-safe. + * It is intended to use within coroutines in single thread only. + * Do not share it between threads. Use TContMtSpinlock instead. + */ + +class TContSemaphore { +public: + inline TContSemaphore(const unsigned int initial) + : mValue(initial) + { + } + + inline bool Acquire(TCont* cont) { + while (!mValue) { + mWaitQueue.WaitI(cont); + } + --mValue; + return true; + } + + inline bool Release(TCont*) { + ++mValue; + mWaitQueue.Signal(); + return true; + } + + inline bool TryAcquire(TCont*) { + if (!mValue) { + return false; + } + --mValue; + return true; + } + +protected: + TAtomic mValue; + TContWaitQueue mWaitQueue; +}; + +/***************************************************************** + TContMtSyncBase + ^ ^ + / \ + / \ + TContMtSpinlock TContMtSemaphore + ^ + | + TCoMtSpinlock + *****************************************************************/ +class TContMtSyncBase { +public: + static inline void Yield(TCont* cont) { + //printf("yield() called with cont=%p in thread %p\n", cont, pthread_self()); fflush(stdout); + if (cont) { + // Bugfeature workaround: coroutine Yield() in loop would make our priority too high to receive any other events + // so we probably never come out of this loop. So better use Sleep() instead. Unlike Yield(), Sleep(0) + // makes all pending I/Os have higher priority than us. + cont->SleepT(TDuration::Zero()); + } else { + ThreadYield(); + } + } + + static inline void SleepT(TCont* cont, unsigned time) { + if (cont) { + cont->SleepT(TDuration::MicroSeconds(time)); + } else { + ::usleep(time); + } + } +}; + +/* + * Thread-safe version of TContMutex + */ + +class TContMtSpinlock: private TContMtSyncBase { +public: + TContMtSpinlock() + : mToken(true) + { + } + + inline int LockI(TCont* cont) const { + // We do not want to lock the bus for a long time. However, we do not want to switch context + // immediately. Hope that 16 times is enough to finish most of the small operations. + for (int i = 0; i < 4; ++i) { + if (AtomicSwap(&mToken, false)) + return 0; + } + for (int i = 0; i < 8; ++i) { + if (AtomicSwap(&mToken, false)) + return 0; + Yield(cont); + } + for (int i = 0; i < 4; ++i) { + if (AtomicSwap(&mToken, false)) + return 0; + SleepT(cont, 1000); + } + while (!AtomicSwap(&mToken, false)) + SleepT(cont, 10000); + return 0; + } + + inline bool Acquire(TCont* cont) const { + return (LockI(cont) == 0); + } + + inline int LockD(TCont* cont, TInstant deadline) { + for (int i = 0; i < 4; ++i) { + if (AtomicSwap(&mToken, false)) + return 0; + } + for (int i = 0; i < 8; ++i) { + if (AtomicSwap(&mToken, false)) + return 0; + if (Now() > deadline) + return ETIMEDOUT; + Yield(cont); + } + for (int i = 0; i < 4; ++i) { + if (AtomicSwap(&mToken, false)) + return 0; + if (Now() > deadline) + return ETIMEDOUT; + SleepT(cont, 1000); + } + while (!AtomicSwap(&mToken, false)) { + if (Now() > deadline) + return ETIMEDOUT; + SleepT(cont, 10000); + } + return 0; + } + + inline int LockT(TCont* cont, TDuration timeout) { + return LockD(cont, Now() + timeout); + } + + inline void UnLock(TCont*) const { + mToken = true; + } + + inline bool Release(TCont* cont) const { + UnLock(cont); + return true; + } + + inline bool TryAcquire(TCont*) const { + return AtomicSwap(&mToken, false); + } + +private: + TAtomic mutable mToken; +}; + +/* Thread-safe version of TContSemaphore */ + +class TContMtSemaphore: private TContMtSyncBase { +public: + TContMtSemaphore(const unsigned int initial) + : mValue(initial) + { + } + + inline bool Acquire(TCont* cont) { + for (int i = 0; i < 1024; ++i) { + if (TryAcquire(cont)) + return true; + } + while (!TryAcquire(cont)) + Yield(cont); + return true; + } + + inline bool Release(TCont*) { + AtomicAdd(mValue, 1); + return true; + } + + inline bool TryAcquire(TCont*) { + TAtomic q = AtomicAdd(mValue, -1); + if (q < 0) { + AtomicAdd(mValue, 1); // this is safe even if some other thread accessed mValue + // since increments and decrements are always balanced + return false; + } + return true; + } + +private: + TAtomic mValue; +}; + +/***************************************************************** + TContSem + *****************************************************************/ +class TContSem { +public: + TContSem(void*) noexcept { + pfd[0] = pfd[1] = INVALID_SOCKET; + val = 0; + } + + ~TContSem() { + if (pfd[0] != INVALID_SOCKET) + closesocket(pfd[0]); + if (pfd[1] != INVALID_SOCKET) + closesocket(pfd[1]); + pfd[0] = pfd[1] = INVALID_SOCKET; + val = 0; + } + + bool Initialized() noexcept { + return pfd[0] != INVALID_SOCKET; + } + + int Init(unsigned short Val) noexcept { + assert(pfd[0] == INVALID_SOCKET && pfd[1] == INVALID_SOCKET); + + if (SocketPair(pfd) == SOCKET_ERROR) { + return errno; + } + + char ch = 0; + val = Val; + + for (unsigned short i = 0; i < val; i++) + if (send(pfd[1], &ch, 1, 0) != 1) { + int err = errno; + closesocket(pfd[0]); + closesocket(pfd[1]); + pfd[0] = pfd[1] = INVALID_SOCKET; + return err; + } +#if defined(_unix_) + if (fcntl(pfd[0], F_SETFL, O_NONBLOCK) == -1) + return errno; + if (fcntl(pfd[1], F_SETFL, O_NONBLOCK) == -1) + return errno; +#endif + + readPoller.WaitRead(pfd[0], this); + writePoller.WaitWrite(pfd[1], this); + + return 0; + } + + int DownD(TCont* cont, TInstant deadline) { + assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET); + assert(cont); + --val; + + char ch = 0; + int result = downMutex.LockI(cont); + if (result) + return result; + TContIOStatus status = NCoro::ReadD(cont, pfd[0], &ch, 1, deadline); + downMutex.UnLock(); + if (status.Status()) + return status.Status(); + return 0; + } + + int DownT(TCont* cont, TDuration timeout) { + return DownD(cont, Now() + timeout); + } + + int DownI(TCont* cont) { + return DownD(cont, TInstant::Max()); + } + + int Down() { + assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET); + --val; + + char ch = 0; + while (true) { + int result = recv(pfd[0], &ch, 1, 0); + if (result == INVALID_SOCKET) { + if (errno != EINTR && errno != EAGAIN) { + return errno; + } + } else { + break; + } + void* pollerResult = readPoller.WaitI(); + Y_ASSERT(pollerResult == this); + } + return 0; + } + + int Up(TCont* cont) { + assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET); + assert(cont); + ++val; + + char ch = 0; + upMutex.LockI(cont); + TContIOStatus status = NCoro::WriteI(cont, pfd[1], &ch, 1); + upMutex.UnLock(); + if (status.Status()) + return status.Status(); + return 0; + } + + int Up() { + assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET); + ++val; + + char ch = 0; + while (true) { + int result = send(pfd[1], &ch, 1, 0); + if (result == INVALID_SOCKET) { + if (errno != EINTR && errno != EAGAIN) { + return errno; + } + } else { + break; + } + void* pollerResult = writePoller.WaitI(); + Y_ASSERT(pollerResult == this); + } + return 0; + } + +private: + SOCKET pfd[2]; + unsigned short val; + TSocketPoller readPoller; + TSocketPoller writePoller; + TContMutex downMutex; + TContMutex upMutex; + + // forbid copying + TContSem(const TContSem&) { + } + TContSem& operator=(const TContSem&) { + return *this; + } +}; + +class TCoMtMutex { +public: + TCoMtMutex() + : Mutex(this) + { + Mutex.Init(1); + } + + void Acquire() { + Mutex.Down(); + } + + void Acquire(TCont* c) { + Mutex.DownI(c); + } + + void Release(TCont* c) { + Mutex.Up(c); + } + + void Release() { + Mutex.Up(); + } + +private: + TContSem Mutex; +}; + +/***************************************************************** + GUARDS + *****************************************************************/ +class SemaphoreGuard : TNonCopyable { +public: + inline SemaphoreGuard(TCont* cont, TContSemaphore& semaphore) + : mSemaphore(semaphore) + , mCont(cont) + { + mSemaphore.Acquire(mCont); + } + + inline ~SemaphoreGuard() { + mSemaphore.Release(mCont); + } + +private: + TContSemaphore& mSemaphore; + TCont* mCont; +}; + +class ContGuard { +public: + inline ContGuard(TCont* cont, TContMtSpinlock& lock) + : mLock(lock) + , mCont(cont) + { + mLock.LockI(mCont); + } + + inline ~ContGuard() { + mLock.UnLock(mCont); + } + +private: + TContMtSpinlock& mLock; + TCont* mCont; +}; diff --git a/library/cpp/coroutine/util/pipeevent.h b/library/cpp/coroutine/util/pipeevent.h new file mode 100644 index 0000000000..059f2a34d9 --- /dev/null +++ b/library/cpp/coroutine/util/pipeevent.h @@ -0,0 +1,93 @@ +#pragma once + +#include <library/cpp/coroutine/engine/events.h> +#include <library/cpp/coroutine/engine/impl.h> +#include <library/cpp/coroutine/engine/network.h> + +#include <util/network/socket.h> +#include <library/cpp/deprecated/atomic/atomic.h> +#include <util/system/pipe.h> + +// TPipeEvent and TPipeSemaphore try to minimize number of coroutines reading from same pipe +// because its actually quite expensive with >1000 coroutines waiting on same event/semaphore + +// Using this class you can block in coroutine on waiting +// a signal from outer thread. +// Usual thread synchronization primitives are not appropriate +// because they will block all coroutines in single thread. +class TPipeEvent { +public: + explicit TPipeEvent() + : Signaled(0) + , NumWaiting(0) + { + TPipeHandle::Pipe(SignalRecvPipe, SignalSendPipe); + SetNonBlock(SignalRecvPipe); + SetNonBlock(SignalSendPipe); + } + + void Signal() { + if (AtomicCas(&Signaled, 1, 0)) { + char tmp = 1; + SignalSendPipe.Write(&tmp, 1); + } + } + + void Wait(TCont* cont) { + if (++NumWaiting > 1) { + ToWake.WaitI(cont); + } + char tmp; + NCoro::ReadI(cont, SignalRecvPipe, &tmp, 1).Checked(); + AtomicSet(Signaled, 0); + if (--NumWaiting > 0) { + ToWake.Signal(); + } + } + +private: + TAtomic Signaled; + size_t NumWaiting; + TContWaitQueue ToWake; + TPipeHandle SignalRecvPipe; + TPipeHandle SignalSendPipe; +}; + +class TPipeSemaphore { +public: + explicit TPipeSemaphore() + : Value(0) + , NumWaiting(0) + { + TPipeHandle::Pipe(SignalRecvPipe, SignalSendPipe); + SetNonBlock(SignalRecvPipe); + SetNonBlock(SignalSendPipe); + } + + void Inc() { + if (AtomicIncrement(Value) <= 0) { + char tmp = 1; + SignalSendPipe.Write(&tmp, 1); + } + } + + void Dec(TCont* cont) { + if (AtomicDecrement(Value) < 0) { + if (++NumWaiting > 1) { + ToWake.WaitI(cont); + } + char tmp; + NCoro::ReadI(cont, SignalRecvPipe, &tmp, 1).Checked(); + if (--NumWaiting > 0) { + ToWake.Signal(); + } + } + } + +private: + TAtomic Value; + size_t NumWaiting; + TContWaitQueue ToWake; + TPipeHandle SignalRecvPipe; + TPipeHandle SignalSendPipe; +}; diff --git a/library/cpp/coroutine/util/pipeque.h b/library/cpp/coroutine/util/pipeque.h new file mode 100644 index 0000000000..8a197f5bff --- /dev/null +++ b/library/cpp/coroutine/util/pipeque.h @@ -0,0 +1,262 @@ +#pragma once + +#include <util/network/sock.h> +#include <util/network/pair.h> +#include <util/generic/intrlist.h> +#include <util/system/mutex.h> +#include <util/memory/smallobj.h> + +class TConnectedStreamSocket: public TStreamSocket { +public: + TConnectedStreamSocket(SOCKET fd) + : TStreamSocket(fd) + { + } + + ssize_t Send(const void* msg, size_t len, int flags = 0) { + ssize_t ret = 0; + do { + ret = TStreamSocket::Send(msg, len, flags); + } while (ret == -EINTR); + return ret; + } + + ssize_t Recv(void* buf, size_t len, int flags = 0) { + ssize_t ret = 0; + do { + ret = TStreamSocket::Recv(buf, len, flags); + } while (ret == -EINTR); + return ret; + } +}; + +// Limited size, syncronized queue based on intrusive list. +// Reader and writer both wait on file descriptors instead +// of traditional cond vars. That allows the queue to be +// employed both in (POSIX) threads and coroutines. +template <class T> +class TBasicPipeQueue { +public: + // derive your T from TBasicPipeQueue<T>::Item + typedef TIntrusiveListItem<T> TItem; + static const size_t MaxQueueSizeDef = (1 << 6); // fits default socket buffer sizes on our Linux and FreeBSD servers + + TBasicPipeQueue(bool isPushBlocking = true, + bool isPopBlocking = true, + size_t maxQueueSize = MaxQueueSizeDef) + : CurSize(0) + , MaxQueueSize(maxQueueSize) + , PushSock(INVALID_SOCKET) + , PopSock(INVALID_SOCKET) + , ShouldContinue(true) + { + SOCKET socks[2]; + socks[0] = INVALID_SOCKET; + socks[1] = INVALID_SOCKET; + if (SocketPair(socks)) + ythrow yexception() << "TBasicPipeQueue: can't create socket pair: " << LastSystemError(); + TSocketHolder pushSock(socks[0]); + TSocketHolder popSock(socks[1]); + PushSock.Swap(pushSock); + PopSock.Swap(popSock); + if (!isPushBlocking) + SetNonBlock(PushFd(), true); + if (!isPopBlocking) + SetNonBlock(PopFd(), true); + + char b = '\0'; + for (size_t i = 0; i < MaxQueueSize; i++) { + if (PopSock.Send(&b, 1) != 1) { + Y_VERIFY(false, "TBasicPipeQueue: queue size too big (%" PRISZT "): make it less then %" PRISZT " or increase limits", MaxQueueSize, i); + } + } + } + + ~TBasicPipeQueue() { + } + + void Stop() { + ShouldContinue = false; + } + + bool Push(T* t) { + char b = '\0'; + ssize_t r = 0; + if (!ShouldContinue) + return false; + + // make sure the queue has enough space to write to (wait for reading on PushSock or block on Recv) + r = PushSock.Recv(&b, 1); + if (r == 0) { + return false; + } else if (r < 0) { + if (errno == EAGAIN) + return false; + else + ythrow yexception() << "TBasicPipeQueue: error in Push(Recv): " << LastSystemError(); + } + + Y_VERIFY(!IsFull(), "TBasicPipeQueue: no space left"); + + if (!ShouldContinue) + return false; + + { + TGuard<TMutex> guard(Mutex); + List.PushBack(t); + CurSize++; + } + + // signal that the queue has item + r = PushSock.Send(&b, 1); + if (r <= 0) + ythrow yexception() << "TBasicPipeQueue: error in Push(Send): " << LastSystemError(); + + return true; + } + + T* Pop() { + char b = '\0'; + // wait for an item to appear in the queue + ssize_t r = PopSock.Recv(&b, 1); + if (r == 0) { + return nullptr; + } else if (r < 0) { + if (errno == EAGAIN) + return nullptr; + else + ythrow yexception() << "TBasicPipeQueue: error in Pop(Recv): " << LastSystemError(); + } + + T* t = nullptr; + { + TGuard<TMutex> guard(Mutex); + t = List.PopFront(); + CurSize--; + } + + // signal that the queue has a free slot + r = PopSock.Send(&b, 1); + if (r <= 0) + ythrow yexception() << "TBasicPipeQueue: error in Pop(Send): " << LastSystemError(); + + return t; + } + + SOCKET PushFd() { + return (SOCKET)PushSock; + } + + SOCKET PopFd() { + return (SOCKET)PopSock; + } + + size_t Size() const { + TGuard<TMutex> guard(Mutex); + size_t curSize = CurSize; + return curSize; + } + + bool Empty() const { + TGuard<TMutex> guard(Mutex); + bool empty = CurSize == 0; + return empty; + } + + bool IsFull() const { + TGuard g{Mutex}; + return CurSize >= MaxQueueSize; + } + +protected: + typedef TIntrusiveList<T> TListType; + TListType List; + size_t CurSize; + size_t MaxQueueSize; + TConnectedStreamSocket PushSock; + TConnectedStreamSocket PopSock; + TMutex Mutex; + bool ShouldContinue; + /** + * --{push(write)}--> PushSock ---> PopSock --{pop(read)}--> + */ +}; + +template <class T> +class TSyncSmallObjAllocator { +public: + TSyncSmallObjAllocator(IAllocator* alloc = TDefaultAllocator::Instance()) + : Allocator(alloc) + { + } + + void Release(T* t) { + t->~T(); + { + TGuard<TMutex> guard(Mutex); + Allocator.Release(t); + } + } + + T* Alloc() { + void* t; + { + TGuard<TMutex> guard(Mutex); + t = Allocator.Allocate(); + } + return new (t) T(); + } + +protected: + TSmallObjAllocator<T> Allocator; + TMutex Mutex; +}; + +template <class T> +struct TPipeQueueItem: public TIntrusiveListItem<TPipeQueueItem<T>> { + T Val; +}; + +template <class T> +class TPipeQueue + : protected TSyncSmallObjAllocator<TPipeQueueItem<T>>, + protected TBasicPipeQueue<TPipeQueueItem<T>> { + typedef TPipeQueueItem<T> TItem; + typedef TSyncSmallObjAllocator<TItem> TAlloc; + typedef TBasicPipeQueue<TItem> TBaseQueue; + +public: + TPipeQueue(bool isPushBlocking = true, + bool isPopBlocking = true, + size_t maxQueueSize = TBaseQueue::MaxQueueSizeDef, + IAllocator* alloc = TDefaultAllocator::Instance()) + : TAlloc(alloc) + , TBaseQueue(isPushBlocking, isPopBlocking, maxQueueSize) + { + } + + bool Push(const T& t) { + TItem* item = TAlloc::Alloc(); + item->Val = t; + bool res = TBaseQueue::Push(item); + if (!res) + TAlloc::Release(item); + return res; + } + + bool Pop(T* t) { + TItem* item = TBaseQueue::Pop(); + if (item == nullptr) + return false; + + *t = item->Val; + TAlloc::Release(item); + return true; + } + + using TBaseQueue::Empty; + using TBaseQueue::PopFd; + using TBaseQueue::PushFd; + using TBaseQueue::Size; + using TBaseQueue::Stop; +}; |