summaryrefslogtreecommitdiffstats
path: root/library/cpp/coroutine
diff options
context:
space:
mode:
authorqrort <[email protected]>2022-12-02 11:31:25 +0300
committerqrort <[email protected]>2022-12-02 11:31:25 +0300
commitb1f4ffc9c8abff3ba58dc1ec9a9f92d2f0de6806 (patch)
tree2a23209faf0fea5586a6d4b9cee60d1b318d29fe /library/cpp/coroutine
parent559174a9144de40d6bb3997ea4073c82289b4974 (diff)
remove kikimr/driver DEPENDS
Diffstat (limited to 'library/cpp/coroutine')
-rw-r--r--library/cpp/coroutine/dns/async.cpp149
-rw-r--r--library/cpp/coroutine/dns/async.h32
-rw-r--r--library/cpp/coroutine/dns/cache.cpp131
-rw-r--r--library/cpp/coroutine/dns/cache.h54
-rw-r--r--library/cpp/coroutine/dns/coro.cpp225
-rw-r--r--library/cpp/coroutine/dns/coro.h22
-rw-r--r--library/cpp/coroutine/dns/helpers.cpp148
-rw-r--r--library/cpp/coroutine/dns/helpers.h34
-rw-r--r--library/cpp/coroutine/dns/iface.h75
-rw-r--r--library/cpp/coroutine/util/cosem.h433
-rw-r--r--library/cpp/coroutine/util/pipeevent.h93
-rw-r--r--library/cpp/coroutine/util/pipeque.h262
12 files changed, 0 insertions, 1658 deletions
diff --git a/library/cpp/coroutine/dns/async.cpp b/library/cpp/coroutine/dns/async.cpp
deleted file mode 100644
index be35135828c..00000000000
--- a/library/cpp/coroutine/dns/async.cpp
+++ /dev/null
@@ -1,149 +0,0 @@
-#include "async.h"
-
-#include <util/generic/singleton.h>
-#include <util/generic/vector.h>
-
-#include <contrib/libs/c-ares/include/ares.h>
-
-using namespace NAsyncDns;
-
-namespace {
- struct TAresError: public TDnsError {
- inline TAresError(int code) {
- (*this) << ares_strerror(code);
- }
- };
-
- struct TAresInit {
- inline TAresInit() {
- const int code = ares_library_init(ARES_LIB_INIT_ALL);
-
- if (code) {
- ythrow TAresError(code) << "can not init ares engine";
- }
- }
-
- inline ~TAresInit() {
- ares_library_cleanup();
- }
-
- static inline void InitOnce() {
- Singleton<TAresInit>();
- }
- };
-}
-
-class TAsyncDns::TImpl {
-public:
- inline TImpl(IPoller* poller, const TOptions& o)
- : P_(poller)
- {
- TAresInit::InitOnce();
-
- ares_options opts;
-
- Zero(opts);
-
- int optflags = 0;
-
- optflags |= ARES_OPT_FLAGS;
- opts.flags = ARES_FLAG_STAYOPEN;
-
- optflags |= ARES_OPT_TIMEOUTMS;
- opts.timeout = o.TimeOut.MilliSeconds();
-
- optflags |= ARES_OPT_TRIES;
- opts.tries = o.Retries;
-
- optflags |= ARES_OPT_SOCK_STATE_CB;
- opts.sock_state_cb = (decltype(opts.sock_state_cb))StateCb;
- static_assert(sizeof(opts.sock_state_cb) == sizeof(&StateCb), "Inconsistent socket state size");
- opts.sock_state_cb_data = this;
-
- const int code = ares_init_options(&H_, &opts, optflags);
-
- if (code) {
- ythrow TAresError(code) << "can not init ares channel";
- }
- }
-
- inline ~TImpl() {
- ares_destroy(H_);
- }
-
- inline TDuration Timeout() {
- struct timeval tv;
- Zero(tv);
-
- ares_timeout(H_, nullptr, &tv);
-
- return TDuration(tv);
- }
-
- template <class T>
- inline void ProcessSocket(T s) {
- ares_process_fd(H_, s, s);
- }
-
- inline void ProcessNone() {
- ProcessSocket(ARES_SOCKET_BAD);
- }
-
- inline void AsyncResolve(const TNameRequest& req) {
- ares_gethostbyname(H_, req.Addr, req.Family, AsyncResolveHostCb, req.CB);
- }
-
-private:
- static void StateCb(void* arg, int s, int read, int write) {
- ((TImpl*)arg)->P_->OnStateChange((SOCKET)s, (bool)read, (bool)write);
- }
-
- static void AsyncResolveHostCb(void* arg, int status, int timeouts, hostent* he) {
- const IHostResult::TResult res = {
- status, timeouts, he};
-
- ((IHostResult*)arg)->OnComplete(res);
- }
-
-private:
- IPoller* P_;
- ares_channel H_;
-};
-
-void NAsyncDns::CheckAsyncStatus(int status) {
- if (status) {
- ythrow TAresError(status);
- }
-}
-
-void NAsyncDns::CheckPartialAsyncStatus(int status) {
- if (status == ARES_ENODATA) {
- return;
- }
-
- CheckAsyncStatus(status);
-}
-
-TAsyncDns::TAsyncDns(IPoller* poller, const TOptions& opts)
- : I_(new TImpl(poller, opts))
-{
-}
-
-TAsyncDns::~TAsyncDns() {
-}
-
-void TAsyncDns::AsyncResolve(const TNameRequest& req) {
- I_->AsyncResolve(req);
-}
-
-TDuration TAsyncDns::Timeout() {
- return I_->Timeout();
-}
-
-void TAsyncDns::ProcessSocket(SOCKET s) {
- I_->ProcessSocket(s);
-}
-
-void TAsyncDns::ProcessNone() {
- I_->ProcessNone();
-}
diff --git a/library/cpp/coroutine/dns/async.h b/library/cpp/coroutine/dns/async.h
deleted file mode 100644
index 1ec26bc91d7..00000000000
--- a/library/cpp/coroutine/dns/async.h
+++ /dev/null
@@ -1,32 +0,0 @@
-#pragma once
-
-#include "iface.h"
-
-#include <util/network/socket.h>
-#include <util/datetime/base.h>
-#include <util/generic/ptr.h>
-
-namespace NAsyncDns {
- struct IPoller {
- virtual void OnStateChange(SOCKET s, bool read, bool write) = 0;
- };
-
- class TAsyncDns {
- public:
- TAsyncDns(IPoller* poller, const TOptions& opts = TOptions());
- ~TAsyncDns();
-
- void AsyncResolve(const TNameRequest& req);
-
- TDuration Timeout();
- void ProcessSocket(SOCKET s);
- void ProcessNone();
-
- private:
- class TImpl;
- THolder<TImpl> I_;
- };
-
- void CheckAsyncStatus(int status);
- void CheckPartialAsyncStatus(int status);
-}
diff --git a/library/cpp/coroutine/dns/cache.cpp b/library/cpp/coroutine/dns/cache.cpp
deleted file mode 100644
index 69910a79601..00000000000
--- a/library/cpp/coroutine/dns/cache.cpp
+++ /dev/null
@@ -1,131 +0,0 @@
-#include "cache.h"
-
-#include "async.h"
-#include "coro.h"
-#include "helpers.h"
-
-#include <library/cpp/cache/cache.h>
-
-#include <library/cpp/coroutine/engine/impl.h>
-#include <library/cpp/coroutine/engine/events.h>
-
-using namespace NAddr;
-using namespace NAsyncDns;
-
-class TContDnsCache::TImpl {
- using TKey = std::pair<TString, ui16>;
-
- enum EResolveState {
- RS_NOT_RESOLVED,
- RS_IN_PROGRESS,
- RS_RESOLVED,
- RS_FAILED,
- RS_EXPIRED
- };
-
- struct TEntry: public TSimpleRefCount<TEntry> {
- TAddrs Addrs_;
- TInstant ResolveTimestamp_;
- EResolveState State_;
- TContSimpleEvent ResolvedEvent_;
- TString LastResolveError_;
-
- TEntry(TContExecutor* e)
- : State_(RS_NOT_RESOLVED)
- , ResolvedEvent_(e)
- {
- }
- };
-
- using TEntryRef = TIntrusivePtr<TEntry>;
-
-public:
- inline TImpl(TContExecutor* e, const TCacheOptions& opts)
- : Executor_(e)
- , Opts_(opts)
- , Cache_(opts.MaxSize)
- {
- }
-
- inline ~TImpl() {
- }
-
- void LookupOrResolve(TContResolver& resolver, const TString& addr, ui16 port, TAddrs& result) {
- TKey cacheKey(addr, port);
-
- TEntryRef e;
- auto iter = Cache_.Find(cacheKey);
- if (iter == Cache_.End()) {
- e = MakeIntrusive<TEntry>(Executor_);
- Cache_.Insert(cacheKey, e);
- } else {
- e = iter.Value();
- }
-
- while (true) {
- switch (e->State_) {
- case RS_NOT_RESOLVED:
- case RS_EXPIRED:
- e->State_ = RS_IN_PROGRESS;
- try {
- ResolveAddr(resolver, addr, port, result);
- } catch (TDnsError& err) {
- e->ResolveTimestamp_ = TInstant::Now();
- e->LastResolveError_ = err.AsStrBuf();
- e->State_ = RS_FAILED;
- e->ResolvedEvent_.BroadCast();
- throw;
- } catch (...) {
- // errors not related to DNS
- e->State_ = RS_NOT_RESOLVED;
- e->ResolvedEvent_.BroadCast();
- throw;
- }
- e->ResolveTimestamp_ = TInstant::Now();
- e->Addrs_ = result;
- e->State_ = RS_RESOLVED;
- e->ResolvedEvent_.BroadCast();
- return;
- case RS_IN_PROGRESS:
- e->ResolvedEvent_.WaitI();
- continue;
- case RS_RESOLVED:
- if (e->ResolveTimestamp_ + Opts_.EntryLifetime < TInstant::Now()) {
- e->State_ = RS_EXPIRED;
- continue; // try and resolve again
- }
- result = e->Addrs_;
- return;
- case RS_FAILED:
- if (e->ResolveTimestamp_ + Opts_.NotFoundLifetime < TInstant::Now()) {
- e->State_ = RS_EXPIRED;
- continue; // try and resolve again
- }
- ythrow TDnsError() << e->LastResolveError_;
- default:
- Y_FAIL("Bad state, shoult not get here");
- };
- }
- }
-
-private:
- TContExecutor* Executor_;
- const TCacheOptions Opts_;
- TLRUCache<TKey, TEntryRef> Cache_;
-};
-
-TContDnsCache::TContDnsCache(TContExecutor* e, const NAsyncDns::TCacheOptions& opts)
- : I_(MakeHolder<TImpl>(e, opts))
-{
-}
-
-TContDnsCache::~TContDnsCache() {
-}
-
-void TContDnsCache::LookupOrResolve(TContResolver& resolver, const TString& addr, ui16 port, TVector<NAddr::IRemoteAddrRef>& result) {
- return I_->LookupOrResolve(resolver, addr, port, result);
-}
-
-void TContDnsCache::LookupOrResolve(TContResolver& resolver, const TString& addr, TVector<NAddr::IRemoteAddrRef>& result) {
- LookupOrResolve(resolver, addr, 80, result);
-}
diff --git a/library/cpp/coroutine/dns/cache.h b/library/cpp/coroutine/dns/cache.h
deleted file mode 100644
index 7c358ce591f..00000000000
--- a/library/cpp/coroutine/dns/cache.h
+++ /dev/null
@@ -1,54 +0,0 @@
-#pragma once
-
-#include "iface.h"
-
-#include <util/generic/ptr.h>
-#include <util/generic/vector.h>
-#include <util/network/address.h>
-
-class TContExecutor;
-
-namespace NAsyncDns {
- class TContResolver;
-
- struct TCacheOptions {
- inline TCacheOptions() {
- }
-
- inline TCacheOptions& SetEntryLifetime(const TDuration& val) noexcept {
- EntryLifetime = val;
-
- return *this;
- }
-
- inline TCacheOptions& SetNotFoundLifetime(const TDuration& val) noexcept {
- NotFoundLifetime = val;
-
- return *this;
- }
-
- inline TCacheOptions& SetMaxSize(size_t val) noexcept {
- MaxSize = val;
-
- return *this;
- }
-
- size_t MaxSize = 512;
- TDuration EntryLifetime = TDuration::Seconds(1800);
- TDuration NotFoundLifetime = TDuration::Seconds(1);
- };
-
- // MT-unsafe LRU DNS cache
- class TContDnsCache {
- public:
- TContDnsCache(TContExecutor* e, const TCacheOptions& opts = {});
- ~TContDnsCache();
-
- void LookupOrResolve(TContResolver& resolver, const TString& addr, ui16 port, TVector<NAddr::IRemoteAddrRef>& result);
- void LookupOrResolve(TContResolver& resolver, const TString& addr, TVector<NAddr::IRemoteAddrRef>& result);
-
- private:
- class TImpl;
- THolder<TImpl> I_;
- };
-}
diff --git a/library/cpp/coroutine/dns/coro.cpp b/library/cpp/coroutine/dns/coro.cpp
deleted file mode 100644
index 9d6a89f6e75..00000000000
--- a/library/cpp/coroutine/dns/coro.cpp
+++ /dev/null
@@ -1,225 +0,0 @@
-#include "coro.h"
-
-#include "async.h"
-
-#include <library/cpp/coroutine/engine/events.h>
-#include <library/cpp/coroutine/engine/impl.h>
-
-#include <util/generic/vector.h>
-#include <util/memory/smallobj.h>
-
-using namespace NAsyncDns;
-
-class TContResolver::TImpl: public IPoller {
- struct TPollEvent:
- public NCoro::IPollEvent,
- public TIntrusiveListItem<TPollEvent>,
- public TObjectFromPool<TPollEvent>
- {
- inline TPollEvent(TImpl* parent, SOCKET s, int what)
- : IPollEvent(s, what)
- , P(parent)
- {
- P->E_->Poller()->Schedule(this);
- }
-
- inline ~TPollEvent() override {
- P->E_->Poller()->Remove(this);
- }
-
- void OnPollEvent(int) noexcept override {
- P->D_.ProcessSocket(Fd());
- }
-
- TImpl* P;
- };
-
- typedef TAutoPtr<TPollEvent> TEventRef;
-
- struct TOneShotEvent {
- inline TOneShotEvent(TContExecutor* e) noexcept
- : E(e)
- , S(false)
- {
- }
-
- inline void Signal() noexcept {
- S = true;
- E.Signal();
- }
-
- inline void Wait() noexcept {
- if (S) {
- return;
- }
-
- E.WaitI();
- }
-
- TContSimpleEvent E;
- bool S;
- };
-
- struct TContSemaphore {
- inline TContSemaphore(TContExecutor* e, size_t num) noexcept
- : E(e)
- , N(num)
- {
- }
-
- inline void Acquire() noexcept {
- while (!N) {
- E.WaitI();
- }
-
- --N;
- }
-
- inline void Release() noexcept {
- ++N;
- E.Signal();
- }
-
- TContSimpleEvent E;
- size_t N;
- };
-
- struct TRequest: public TIntrusiveListItem<TRequest>, public TOneShotEvent, public IHostResult {
- inline TRequest(const TNameRequest* req, TContExecutor* e)
- : TOneShotEvent(e)
- , Req(req)
- {
- }
-
- void OnComplete(const TResult& res) override {
- Req->CB->OnComplete(res);
- Signal();
- }
-
- const TNameRequest* Req;
- };
-
-public:
- inline TImpl(TContExecutor* e, const TOptions& opts)
- : P_(TDefaultAllocator::Instance())
- , E_(e)
- , RateLimit_(E_, opts.MaxRequests)
- , D_(this, opts)
- , DC_(nullptr)
- {
- }
-
- inline ~TImpl() {
- Y_VERIFY(DC_ == nullptr, "shit happens");
- }
-
- inline void Resolve(const TNameRequest& hreq) {
- TGuard<TContSemaphore> g(RateLimit_);
-
- if (hreq.Family == AF_UNSPEC) {
- const TNameRequest hreq1(hreq.Copy(AF_INET));
- TRequest req1(&hreq1, E_);
-
- const TNameRequest hreq2(hreq.Copy(AF_INET6));
- TRequest req2(&hreq2, E_);
-
- Schedule(&req1);
- Schedule(&req2);
-
- req1.Wait();
- req2.Wait();
- } else {
- TRequest req(&hreq, E_);
-
- Schedule(&req);
-
- req.Wait();
- }
-
- if (A_.Empty() && DC_) {
- DC_->ReSchedule();
- }
- }
-
-private:
- inline void Schedule(TRequest* req) {
- D_.AsyncResolve(req->Req->Copy(req));
- A_.PushBack(req);
- ResolveCont()->ReSchedule();
- }
-
- inline TCont* ResolveCont() {
- if (!DC_) {
- DC_ = E_->Create<TImpl, &TImpl::RunAsyncDns>(this, "async_dns");
- }
-
- return DC_;
- }
-
- void RunAsyncDns(TCont*) {
- while (!A_.Empty()) {
- Y_VERIFY(!I_.Empty(), "shit happens");
-
- const TDuration tout = D_.Timeout();
-
- if (E_->Running()->SleepT(Max(tout, TDuration::MilliSeconds(10))) == ETIMEDOUT) {
- D_.ProcessNone();
- }
-
- DC_->Yield();
- }
-
- DC_ = nullptr;
- }
-
- void OnStateChange(SOCKET s, bool read, bool write) noexcept override {
- static const ui16 WHAT[] = {
- 0, CONT_POLL_READ, CONT_POLL_WRITE, CONT_POLL_READ | CONT_POLL_WRITE};
-
- const int what = WHAT[((size_t)read) | (((size_t)write) << 1)];
-
- TEventRef& ev = S_.Get(s);
-
- if (!ev) {
- ev.Reset(new (&P_) TPollEvent(this, s, what));
- I_.PushBack(ev.Get());
- } else {
- if (what) {
- if (ev->What() != what) {
- //may optimize
- ev.Reset(new (&P_) TPollEvent(this, s, what));
- I_.PushBack(ev.Get());
- }
- } else {
- ev.Destroy();
- //auto unlink
- }
- }
-
- if (DC_) {
- DC_->ReSchedule();
- }
- }
-
-private:
- TSocketMap<TEventRef> S_;
- TPollEvent::TPool P_;
- TContExecutor* E_;
- TContSemaphore RateLimit_;
- TAsyncDns D_;
- TIntrusiveList<TPollEvent> I_;
- TIntrusiveList<TRequest> A_;
- TCont* DC_;
-};
-
-TContResolver::TContResolver(TContExecutor* e, const TOptions& opts)
- : I_(new TImpl(e, opts))
-{
-}
-
-TContResolver::~TContResolver() {
-}
-
-void TContResolver::Resolve(const TNameRequest& req) {
- I_->Resolve(req);
-}
diff --git a/library/cpp/coroutine/dns/coro.h b/library/cpp/coroutine/dns/coro.h
deleted file mode 100644
index 62a01891b8a..00000000000
--- a/library/cpp/coroutine/dns/coro.h
+++ /dev/null
@@ -1,22 +0,0 @@
-#pragma once
-
-#include "iface.h"
-
-#include <util/generic/ptr.h>
-#include <util/generic/ylimits.h>
-
-class TContExecutor;
-
-namespace NAsyncDns {
- class TContResolver {
- public:
- TContResolver(TContExecutor* e, const TOptions& opts = TOptions());
- ~TContResolver();
-
- void Resolve(const TNameRequest& hreq);
-
- private:
- class TImpl;
- THolder<TImpl> I_;
- };
-}
diff --git a/library/cpp/coroutine/dns/helpers.cpp b/library/cpp/coroutine/dns/helpers.cpp
deleted file mode 100644
index 21d17b5d674..00000000000
--- a/library/cpp/coroutine/dns/helpers.cpp
+++ /dev/null
@@ -1,148 +0,0 @@
-#include "helpers.h"
-#include "coro.h"
-#include "async.h"
-#include "cache.h"
-
-#include <util/digest/city.h>
-#include <util/generic/hash_set.h>
-
-using namespace NAddr;
-using namespace NAsyncDns;
-
-namespace {
- typedef ui64 TAddrHash;
-
- inline TAddrHash Hash(const IRemoteAddrRef& addr) {
- return CityHash64((const char*)addr->Addr(), addr->Len());
- }
-
- inline IRemoteAddrRef ConstructIP4(void* data, ui16 port) {
- return new TIPv4Addr(TIpAddress(*(ui32*)data, port));
- }
-
- inline IRemoteAddrRef ConstructIP6(void* data, ui16 port) {
- sockaddr_in6 res;
-
- Zero(res);
-
- res.sin6_family = AF_INET6;
- res.sin6_port = HostToInet(port);
- memcpy(&res.sin6_addr.s6_addr, data, sizeof(res.sin6_addr.s6_addr));
-
- return new TIPv6Addr(res);
- }
-
- inline IRemoteAddrRef Construct(const hostent* h, void* data, ui16 port) {
- switch (h->h_addrtype) {
- case AF_INET:
- return ConstructIP4(data, port);
-
- case AF_INET6:
- return ConstructIP6(data, port);
- }
-
- //real shit happens
- abort();
- }
-
- template <class It, class T>
- static bool FindByHash(It b, It e, T t) {
- while (b != e) {
- if (Hash(*b) == t) {
- return true;
- }
-
- ++b;
- }
-
- return false;
- }
-
- inline size_t LstLen(char** lst) noexcept {
- size_t ret = 0;
-
- while (*lst) {
- ++ret;
- ++lst;
- }
-
- return ret;
- }
-}
-
-void TResolveAddr::OnComplete(const TResult& r) {
- const hostent* h = r.Result;
-
- if (!h) {
- Status.push_back(r.Status);
-
- return;
- }
-
- char** lst = h->h_addr_list;
-
- typedef THashSet<TAddrHash> THashes;
- TAutoPtr<THashes> hashes;
-
- if ((Result.size() + LstLen(lst)) > 8) {
- hashes.Reset(new THashes());
-
- for (const auto& it : Result) {
- hashes->insert(Hash(it));
- }
- }
-
- while (*lst) {
- IRemoteAddrRef addr = Construct(h, *lst, Port);
-
- if (!hashes) {
- if (!FindByHash(Result.begin(), Result.end(), Hash(addr))) {
- Result.push_back(addr);
- }
- } else {
- const TAddrHash h = Hash(addr);
-
- if (hashes->find(h) == hashes->end()) {
- hashes->insert(h);
- Result.push_back(addr);
- }
- }
-
- ++lst;
- }
-}
-
-void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result) {
- TResolveAddr cb(port);
-
- resolver.Resolve(TNameRequest(host.data(), AF_UNSPEC, &cb));
-
- if (cb.Result) {
- for (auto status : cb.Status) {
- //we have some results, so skip empty responses for aaaa requests
- CheckPartialAsyncStatus(status);
- }
- } else {
- for (auto status : cb.Status) {
- CheckAsyncStatus(status);
- }
- }
-
- cb.Result.swap(result);
-}
-
-void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result) {
- ResolveAddr(resolver, addr, 80, result);
-}
-
-void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result, TContDnsCache* cache) {
- if (cache) {
- cache->LookupOrResolve(resolver, host, port, result);
- } else {
- ResolveAddr(resolver, host, port, result);
- }
-}
-
-void NAsyncDns::ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result, TContDnsCache* cache) {
- ResolveAddr(resolver, addr, 80, result, cache);
-}
diff --git a/library/cpp/coroutine/dns/helpers.h b/library/cpp/coroutine/dns/helpers.h
deleted file mode 100644
index 5f3cc8676fd..00000000000
--- a/library/cpp/coroutine/dns/helpers.h
+++ /dev/null
@@ -1,34 +0,0 @@
-#pragma once
-
-#include "iface.h"
-
-#include <util/network/address.h>
-#include <util/generic/vector.h>
-
-namespace NAsyncDns {
- class TContResolver;
- class TContDnsCache;
- using NAddr::IRemoteAddrRef;
-
- typedef TVector<IRemoteAddrRef> TAddrs;
-
- struct TResolveAddr: public IHostResult {
- inline TResolveAddr(ui16 port)
- : Port(port)
- , Status(0)
- {
- }
-
- void OnComplete(const TResult& result) override;
-
- TAddrs Result;
- ui16 Port;
- TVector<int> Status;
- };
-
- //resolve addr(host + port), like TNetworkAddress
- void ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result);
- void ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result);
- void ResolveAddr(TContResolver& resolver, const TString& addr, TAddrs& result, TContDnsCache* cache);
- void ResolveAddr(TContResolver& resolver, const TString& host, ui16 port, TAddrs& result, TContDnsCache* cache);
-}
diff --git a/library/cpp/coroutine/dns/iface.h b/library/cpp/coroutine/dns/iface.h
deleted file mode 100644
index 0310f48f8fd..00000000000
--- a/library/cpp/coroutine/dns/iface.h
+++ /dev/null
@@ -1,75 +0,0 @@
-#pragma once
-
-#include <util/datetime/base.h>
-#include <util/generic/yexception.h>
-
-struct hostent;
-
-namespace NAsyncDns {
- struct TOptions {
- inline TOptions()
- : MaxRequests(Max())
- , Retries(5)
- , TimeOut(TDuration::MilliSeconds(50))
- {
- }
-
- inline TOptions& SetMaxRequests(size_t val) noexcept {
- MaxRequests = val;
-
- return *this;
- }
-
- inline TOptions& SetRetries(size_t val) noexcept {
- Retries = val;
-
- return *this;
- }
-
- inline TOptions& SetTimeOut(const TDuration& val) noexcept {
- TimeOut = val;
-
- return *this;
- }
-
- size_t MaxRequests;
- size_t Retries;
- TDuration TimeOut;
- };
-
- struct IHostResult {
- struct TResult {
- int Status;
- int Timeouts;
- const hostent* Result;
- };
-
- virtual ~IHostResult() = default;
-
- virtual void OnComplete(const TResult& result) = 0;
- };
-
- struct TNameRequest {
- inline TNameRequest(const char* addr, int family, IHostResult* cb)
- : Addr(addr)
- , Family(family)
- , CB(cb)
- {
- }
-
- inline TNameRequest Copy(IHostResult* cb) const noexcept {
- return TNameRequest(Addr, Family, cb);
- }
-
- inline TNameRequest Copy(int family) const noexcept {
- return TNameRequest(Addr, family, CB);
- }
-
- const char* Addr;
- int Family;
- IHostResult* CB;
- };
-
- struct TDnsError: public yexception {
- };
-}
diff --git a/library/cpp/coroutine/util/cosem.h b/library/cpp/coroutine/util/cosem.h
deleted file mode 100644
index 5ed41a9669a..00000000000
--- a/library/cpp/coroutine/util/cosem.h
+++ /dev/null
@@ -1,433 +0,0 @@
-#pragma once
-
-#include <library/cpp/coroutine/engine/events.h>
-#include <library/cpp/coroutine/engine/impl.h>
-#include <library/cpp/coroutine/engine/mutex.h>
-#include <library/cpp/coroutine/engine/network.h>
-
-#include <util/network/pair.h>
-#include <util/network/poller.h>
-#include <library/cpp/deprecated/atomic/atomic.h>
-#include <util/system/yield.h>
-#include <util/generic/noncopyable.h>
-
-/*****************************************************************
- TContSemaphore
- ^
- |
- TCoSemaphore
- ^
- |
- TCoMutex
- *****************************************************************/
-
-/*
- * Note: this semaphore does not try to be thread-safe.
- * It is intended to use within coroutines in single thread only.
- * Do not share it between threads. Use TContMtSpinlock instead.
- */
-
-class TContSemaphore {
-public:
- inline TContSemaphore(const unsigned int initial)
- : mValue(initial)
- {
- }
-
- inline bool Acquire(TCont* cont) {
- while (!mValue) {
- mWaitQueue.WaitI(cont);
- }
- --mValue;
- return true;
- }
-
- inline bool Release(TCont*) {
- ++mValue;
- mWaitQueue.Signal();
- return true;
- }
-
- inline bool TryAcquire(TCont*) {
- if (!mValue) {
- return false;
- }
- --mValue;
- return true;
- }
-
-protected:
- TAtomic mValue;
- TContWaitQueue mWaitQueue;
-};
-
-/*****************************************************************
- TContMtSyncBase
- ^ ^
- / \
- / \
- TContMtSpinlock TContMtSemaphore
- ^
- |
- TCoMtSpinlock
- *****************************************************************/
-class TContMtSyncBase {
-public:
- static inline void Yield(TCont* cont) {
- //printf("yield() called with cont=%p in thread %p\n", cont, pthread_self()); fflush(stdout);
- if (cont) {
- // Bugfeature workaround: coroutine Yield() in loop would make our priority too high to receive any other events
- // so we probably never come out of this loop. So better use Sleep() instead. Unlike Yield(), Sleep(0)
- // makes all pending I/Os have higher priority than us.
- cont->SleepT(TDuration::Zero());
- } else {
- ThreadYield();
- }
- }
-
- static inline void SleepT(TCont* cont, unsigned time) {
- if (cont) {
- cont->SleepT(TDuration::MicroSeconds(time));
- } else {
- ::usleep(time);
- }
- }
-};
-
-/*
- * Thread-safe version of TContMutex
- */
-
-class TContMtSpinlock: private TContMtSyncBase {
-public:
- TContMtSpinlock()
- : mToken(true)
- {
- }
-
- inline int LockI(TCont* cont) const {
- // We do not want to lock the bus for a long time. However, we do not want to switch context
- // immediately. Hope that 16 times is enough to finish most of the small operations.
- for (int i = 0; i < 4; ++i) {
- if (AtomicSwap(&mToken, false))
- return 0;
- }
- for (int i = 0; i < 8; ++i) {
- if (AtomicSwap(&mToken, false))
- return 0;
- Yield(cont);
- }
- for (int i = 0; i < 4; ++i) {
- if (AtomicSwap(&mToken, false))
- return 0;
- SleepT(cont, 1000);
- }
- while (!AtomicSwap(&mToken, false))
- SleepT(cont, 10000);
- return 0;
- }
-
- inline bool Acquire(TCont* cont) const {
- return (LockI(cont) == 0);
- }
-
- inline int LockD(TCont* cont, TInstant deadline) {
- for (int i = 0; i < 4; ++i) {
- if (AtomicSwap(&mToken, false))
- return 0;
- }
- for (int i = 0; i < 8; ++i) {
- if (AtomicSwap(&mToken, false))
- return 0;
- if (Now() > deadline)
- return ETIMEDOUT;
- Yield(cont);
- }
- for (int i = 0; i < 4; ++i) {
- if (AtomicSwap(&mToken, false))
- return 0;
- if (Now() > deadline)
- return ETIMEDOUT;
- SleepT(cont, 1000);
- }
- while (!AtomicSwap(&mToken, false)) {
- if (Now() > deadline)
- return ETIMEDOUT;
- SleepT(cont, 10000);
- }
- return 0;
- }
-
- inline int LockT(TCont* cont, TDuration timeout) {
- return LockD(cont, Now() + timeout);
- }
-
- inline void UnLock(TCont*) const {
- mToken = true;
- }
-
- inline bool Release(TCont* cont) const {
- UnLock(cont);
- return true;
- }
-
- inline bool TryAcquire(TCont*) const {
- return AtomicSwap(&mToken, false);
- }
-
-private:
- TAtomic mutable mToken;
-};
-
-/* Thread-safe version of TContSemaphore */
-
-class TContMtSemaphore: private TContMtSyncBase {
-public:
- TContMtSemaphore(const unsigned int initial)
- : mValue(initial)
- {
- }
-
- inline bool Acquire(TCont* cont) {
- for (int i = 0; i < 1024; ++i) {
- if (TryAcquire(cont))
- return true;
- }
- while (!TryAcquire(cont))
- Yield(cont);
- return true;
- }
-
- inline bool Release(TCont*) {
- AtomicAdd(mValue, 1);
- return true;
- }
-
- inline bool TryAcquire(TCont*) {
- TAtomic q = AtomicAdd(mValue, -1);
- if (q < 0) {
- AtomicAdd(mValue, 1); // this is safe even if some other thread accessed mValue
- // since increments and decrements are always balanced
- return false;
- }
- return true;
- }
-
-private:
- TAtomic mValue;
-};
-
-/*****************************************************************
- TContSem
- *****************************************************************/
-class TContSem {
-public:
- TContSem(void*) noexcept {
- pfd[0] = pfd[1] = INVALID_SOCKET;
- val = 0;
- }
-
- ~TContSem() {
- if (pfd[0] != INVALID_SOCKET)
- closesocket(pfd[0]);
- if (pfd[1] != INVALID_SOCKET)
- closesocket(pfd[1]);
- pfd[0] = pfd[1] = INVALID_SOCKET;
- val = 0;
- }
-
- bool Initialized() noexcept {
- return pfd[0] != INVALID_SOCKET;
- }
-
- int Init(unsigned short Val) noexcept {
- assert(pfd[0] == INVALID_SOCKET && pfd[1] == INVALID_SOCKET);
-
- if (SocketPair(pfd) == SOCKET_ERROR) {
- return errno;
- }
-
- char ch = 0;
- val = Val;
-
- for (unsigned short i = 0; i < val; i++)
- if (send(pfd[1], &ch, 1, 0) != 1) {
- int err = errno;
- closesocket(pfd[0]);
- closesocket(pfd[1]);
- pfd[0] = pfd[1] = INVALID_SOCKET;
- return err;
- }
-#if defined(_unix_)
- if (fcntl(pfd[0], F_SETFL, O_NONBLOCK) == -1)
- return errno;
- if (fcntl(pfd[1], F_SETFL, O_NONBLOCK) == -1)
- return errno;
-#endif
-
- readPoller.WaitRead(pfd[0], this);
- writePoller.WaitWrite(pfd[1], this);
-
- return 0;
- }
-
- int DownD(TCont* cont, TInstant deadline) {
- assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET);
- assert(cont);
- --val;
-
- char ch = 0;
- int result = downMutex.LockI(cont);
- if (result)
- return result;
- TContIOStatus status = NCoro::ReadD(cont, pfd[0], &ch, 1, deadline);
- downMutex.UnLock();
- if (status.Status())
- return status.Status();
- return 0;
- }
-
- int DownT(TCont* cont, TDuration timeout) {
- return DownD(cont, Now() + timeout);
- }
-
- int DownI(TCont* cont) {
- return DownD(cont, TInstant::Max());
- }
-
- int Down() {
- assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET);
- --val;
-
- char ch = 0;
- while (true) {
- int result = recv(pfd[0], &ch, 1, 0);
- if (result == INVALID_SOCKET) {
- if (errno != EINTR && errno != EAGAIN) {
- return errno;
- }
- } else {
- break;
- }
- void* pollerResult = readPoller.WaitI();
- Y_ASSERT(pollerResult == this);
- }
- return 0;
- }
-
- int Up(TCont* cont) {
- assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET);
- assert(cont);
- ++val;
-
- char ch = 0;
- upMutex.LockI(cont);
- TContIOStatus status = NCoro::WriteI(cont, pfd[1], &ch, 1);
- upMutex.UnLock();
- if (status.Status())
- return status.Status();
- return 0;
- }
-
- int Up() {
- assert(pfd[0] != INVALID_SOCKET && pfd[1] != INVALID_SOCKET);
- ++val;
-
- char ch = 0;
- while (true) {
- int result = send(pfd[1], &ch, 1, 0);
- if (result == INVALID_SOCKET) {
- if (errno != EINTR && errno != EAGAIN) {
- return errno;
- }
- } else {
- break;
- }
- void* pollerResult = writePoller.WaitI();
- Y_ASSERT(pollerResult == this);
- }
- return 0;
- }
-
-private:
- SOCKET pfd[2];
- unsigned short val;
- TSocketPoller readPoller;
- TSocketPoller writePoller;
- TContMutex downMutex;
- TContMutex upMutex;
-
- // forbid copying
- TContSem(const TContSem&) {
- }
- TContSem& operator=(const TContSem&) {
- return *this;
- }
-};
-
-class TCoMtMutex {
-public:
- TCoMtMutex()
- : Mutex(this)
- {
- Mutex.Init(1);
- }
-
- void Acquire() {
- Mutex.Down();
- }
-
- void Acquire(TCont* c) {
- Mutex.DownI(c);
- }
-
- void Release(TCont* c) {
- Mutex.Up(c);
- }
-
- void Release() {
- Mutex.Up();
- }
-
-private:
- TContSem Mutex;
-};
-
-/*****************************************************************
- GUARDS
- *****************************************************************/
-class SemaphoreGuard : TNonCopyable {
-public:
- inline SemaphoreGuard(TCont* cont, TContSemaphore& semaphore)
- : mSemaphore(semaphore)
- , mCont(cont)
- {
- mSemaphore.Acquire(mCont);
- }
-
- inline ~SemaphoreGuard() {
- mSemaphore.Release(mCont);
- }
-
-private:
- TContSemaphore& mSemaphore;
- TCont* mCont;
-};
-
-class ContGuard {
-public:
- inline ContGuard(TCont* cont, TContMtSpinlock& lock)
- : mLock(lock)
- , mCont(cont)
- {
- mLock.LockI(mCont);
- }
-
- inline ~ContGuard() {
- mLock.UnLock(mCont);
- }
-
-private:
- TContMtSpinlock& mLock;
- TCont* mCont;
-};
diff --git a/library/cpp/coroutine/util/pipeevent.h b/library/cpp/coroutine/util/pipeevent.h
deleted file mode 100644
index 059f2a34d9a..00000000000
--- a/library/cpp/coroutine/util/pipeevent.h
+++ /dev/null
@@ -1,93 +0,0 @@
-#pragma once
-
-#include <library/cpp/coroutine/engine/events.h>
-#include <library/cpp/coroutine/engine/impl.h>
-#include <library/cpp/coroutine/engine/network.h>
-
-#include <util/network/socket.h>
-#include <library/cpp/deprecated/atomic/atomic.h>
-#include <util/system/pipe.h>
-
-// TPipeEvent and TPipeSemaphore try to minimize number of coroutines reading from same pipe
-// because its actually quite expensive with >1000 coroutines waiting on same event/semaphore
-
-// Using this class you can block in coroutine on waiting
-// a signal from outer thread.
-// Usual thread synchronization primitives are not appropriate
-// because they will block all coroutines in single thread.
-class TPipeEvent {
-public:
- explicit TPipeEvent()
- : Signaled(0)
- , NumWaiting(0)
- {
- TPipeHandle::Pipe(SignalRecvPipe, SignalSendPipe);
- SetNonBlock(SignalRecvPipe);
- SetNonBlock(SignalSendPipe);
- }
-
- void Signal() {
- if (AtomicCas(&Signaled, 1, 0)) {
- char tmp = 1;
- SignalSendPipe.Write(&tmp, 1);
- }
- }
-
- void Wait(TCont* cont) {
- if (++NumWaiting > 1) {
- ToWake.WaitI(cont);
- }
- char tmp;
- NCoro::ReadI(cont, SignalRecvPipe, &tmp, 1).Checked();
- AtomicSet(Signaled, 0);
- if (--NumWaiting > 0) {
- ToWake.Signal();
- }
- }
-
-private:
- TAtomic Signaled;
- size_t NumWaiting;
- TContWaitQueue ToWake;
- TPipeHandle SignalRecvPipe;
- TPipeHandle SignalSendPipe;
-};
-
-class TPipeSemaphore {
-public:
- explicit TPipeSemaphore()
- : Value(0)
- , NumWaiting(0)
- {
- TPipeHandle::Pipe(SignalRecvPipe, SignalSendPipe);
- SetNonBlock(SignalRecvPipe);
- SetNonBlock(SignalSendPipe);
- }
-
- void Inc() {
- if (AtomicIncrement(Value) <= 0) {
- char tmp = 1;
- SignalSendPipe.Write(&tmp, 1);
- }
- }
-
- void Dec(TCont* cont) {
- if (AtomicDecrement(Value) < 0) {
- if (++NumWaiting > 1) {
- ToWake.WaitI(cont);
- }
- char tmp;
- NCoro::ReadI(cont, SignalRecvPipe, &tmp, 1).Checked();
- if (--NumWaiting > 0) {
- ToWake.Signal();
- }
- }
- }
-
-private:
- TAtomic Value;
- size_t NumWaiting;
- TContWaitQueue ToWake;
- TPipeHandle SignalRecvPipe;
- TPipeHandle SignalSendPipe;
-};
diff --git a/library/cpp/coroutine/util/pipeque.h b/library/cpp/coroutine/util/pipeque.h
deleted file mode 100644
index 8a197f5bff6..00000000000
--- a/library/cpp/coroutine/util/pipeque.h
+++ /dev/null
@@ -1,262 +0,0 @@
-#pragma once
-
-#include <util/network/sock.h>
-#include <util/network/pair.h>
-#include <util/generic/intrlist.h>
-#include <util/system/mutex.h>
-#include <util/memory/smallobj.h>
-
-class TConnectedStreamSocket: public TStreamSocket {
-public:
- TConnectedStreamSocket(SOCKET fd)
- : TStreamSocket(fd)
- {
- }
-
- ssize_t Send(const void* msg, size_t len, int flags = 0) {
- ssize_t ret = 0;
- do {
- ret = TStreamSocket::Send(msg, len, flags);
- } while (ret == -EINTR);
- return ret;
- }
-
- ssize_t Recv(void* buf, size_t len, int flags = 0) {
- ssize_t ret = 0;
- do {
- ret = TStreamSocket::Recv(buf, len, flags);
- } while (ret == -EINTR);
- return ret;
- }
-};
-
-// Limited size, syncronized queue based on intrusive list.
-// Reader and writer both wait on file descriptors instead
-// of traditional cond vars. That allows the queue to be
-// employed both in (POSIX) threads and coroutines.
-template <class T>
-class TBasicPipeQueue {
-public:
- // derive your T from TBasicPipeQueue<T>::Item
- typedef TIntrusiveListItem<T> TItem;
- static const size_t MaxQueueSizeDef = (1 << 6); // fits default socket buffer sizes on our Linux and FreeBSD servers
-
- TBasicPipeQueue(bool isPushBlocking = true,
- bool isPopBlocking = true,
- size_t maxQueueSize = MaxQueueSizeDef)
- : CurSize(0)
- , MaxQueueSize(maxQueueSize)
- , PushSock(INVALID_SOCKET)
- , PopSock(INVALID_SOCKET)
- , ShouldContinue(true)
- {
- SOCKET socks[2];
- socks[0] = INVALID_SOCKET;
- socks[1] = INVALID_SOCKET;
- if (SocketPair(socks))
- ythrow yexception() << "TBasicPipeQueue: can't create socket pair: " << LastSystemError();
- TSocketHolder pushSock(socks[0]);
- TSocketHolder popSock(socks[1]);
- PushSock.Swap(pushSock);
- PopSock.Swap(popSock);
- if (!isPushBlocking)
- SetNonBlock(PushFd(), true);
- if (!isPopBlocking)
- SetNonBlock(PopFd(), true);
-
- char b = '\0';
- for (size_t i = 0; i < MaxQueueSize; i++) {
- if (PopSock.Send(&b, 1) != 1) {
- Y_VERIFY(false, "TBasicPipeQueue: queue size too big (%" PRISZT "): make it less then %" PRISZT " or increase limits", MaxQueueSize, i);
- }
- }
- }
-
- ~TBasicPipeQueue() {
- }
-
- void Stop() {
- ShouldContinue = false;
- }
-
- bool Push(T* t) {
- char b = '\0';
- ssize_t r = 0;
- if (!ShouldContinue)
- return false;
-
- // make sure the queue has enough space to write to (wait for reading on PushSock or block on Recv)
- r = PushSock.Recv(&b, 1);
- if (r == 0) {
- return false;
- } else if (r < 0) {
- if (errno == EAGAIN)
- return false;
- else
- ythrow yexception() << "TBasicPipeQueue: error in Push(Recv): " << LastSystemError();
- }
-
- Y_VERIFY(!IsFull(), "TBasicPipeQueue: no space left");
-
- if (!ShouldContinue)
- return false;
-
- {
- TGuard<TMutex> guard(Mutex);
- List.PushBack(t);
- CurSize++;
- }
-
- // signal that the queue has item
- r = PushSock.Send(&b, 1);
- if (r <= 0)
- ythrow yexception() << "TBasicPipeQueue: error in Push(Send): " << LastSystemError();
-
- return true;
- }
-
- T* Pop() {
- char b = '\0';
- // wait for an item to appear in the queue
- ssize_t r = PopSock.Recv(&b, 1);
- if (r == 0) {
- return nullptr;
- } else if (r < 0) {
- if (errno == EAGAIN)
- return nullptr;
- else
- ythrow yexception() << "TBasicPipeQueue: error in Pop(Recv): " << LastSystemError();
- }
-
- T* t = nullptr;
- {
- TGuard<TMutex> guard(Mutex);
- t = List.PopFront();
- CurSize--;
- }
-
- // signal that the queue has a free slot
- r = PopSock.Send(&b, 1);
- if (r <= 0)
- ythrow yexception() << "TBasicPipeQueue: error in Pop(Send): " << LastSystemError();
-
- return t;
- }
-
- SOCKET PushFd() {
- return (SOCKET)PushSock;
- }
-
- SOCKET PopFd() {
- return (SOCKET)PopSock;
- }
-
- size_t Size() const {
- TGuard<TMutex> guard(Mutex);
- size_t curSize = CurSize;
- return curSize;
- }
-
- bool Empty() const {
- TGuard<TMutex> guard(Mutex);
- bool empty = CurSize == 0;
- return empty;
- }
-
- bool IsFull() const {
- TGuard g{Mutex};
- return CurSize >= MaxQueueSize;
- }
-
-protected:
- typedef TIntrusiveList<T> TListType;
- TListType List;
- size_t CurSize;
- size_t MaxQueueSize;
- TConnectedStreamSocket PushSock;
- TConnectedStreamSocket PopSock;
- TMutex Mutex;
- bool ShouldContinue;
- /**
- * --{push(write)}--> PushSock ---> PopSock --{pop(read)}-->
- */
-};
-
-template <class T>
-class TSyncSmallObjAllocator {
-public:
- TSyncSmallObjAllocator(IAllocator* alloc = TDefaultAllocator::Instance())
- : Allocator(alloc)
- {
- }
-
- void Release(T* t) {
- t->~T();
- {
- TGuard<TMutex> guard(Mutex);
- Allocator.Release(t);
- }
- }
-
- T* Alloc() {
- void* t;
- {
- TGuard<TMutex> guard(Mutex);
- t = Allocator.Allocate();
- }
- return new (t) T();
- }
-
-protected:
- TSmallObjAllocator<T> Allocator;
- TMutex Mutex;
-};
-
-template <class T>
-struct TPipeQueueItem: public TIntrusiveListItem<TPipeQueueItem<T>> {
- T Val;
-};
-
-template <class T>
-class TPipeQueue
- : protected TSyncSmallObjAllocator<TPipeQueueItem<T>>,
- protected TBasicPipeQueue<TPipeQueueItem<T>> {
- typedef TPipeQueueItem<T> TItem;
- typedef TSyncSmallObjAllocator<TItem> TAlloc;
- typedef TBasicPipeQueue<TItem> TBaseQueue;
-
-public:
- TPipeQueue(bool isPushBlocking = true,
- bool isPopBlocking = true,
- size_t maxQueueSize = TBaseQueue::MaxQueueSizeDef,
- IAllocator* alloc = TDefaultAllocator::Instance())
- : TAlloc(alloc)
- , TBaseQueue(isPushBlocking, isPopBlocking, maxQueueSize)
- {
- }
-
- bool Push(const T& t) {
- TItem* item = TAlloc::Alloc();
- item->Val = t;
- bool res = TBaseQueue::Push(item);
- if (!res)
- TAlloc::Release(item);
- return res;
- }
-
- bool Pop(T* t) {
- TItem* item = TBaseQueue::Pop();
- if (item == nullptr)
- return false;
-
- *t = item->Val;
- TAlloc::Release(item);
- return true;
- }
-
- using TBaseQueue::Empty;
- using TBaseQueue::PopFd;
- using TBaseQueue::PushFd;
- using TBaseQueue::Size;
- using TBaseQueue::Stop;
-};