diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/dns/thread.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/dns/thread.cpp')
-rw-r--r-- | library/cpp/dns/thread.cpp | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/library/cpp/dns/thread.cpp b/library/cpp/dns/thread.cpp new file mode 100644 index 00000000000..8b27d2d527c --- /dev/null +++ b/library/cpp/dns/thread.cpp @@ -0,0 +1,133 @@ +#include "thread.h" + +#include "magic.h" + +#include <util/network/socket.h> +#include <util/thread/factory.h> +#include <util/thread/lfqueue.h> +#include <util/system/event.h> +#include <util/generic/vector.h> +#include <util/generic/singleton.h> + +using namespace NDns; + +namespace { + class TThreadedResolver: public IThreadFactory::IThreadAble, public TNonCopyable { + struct TResolveRequest { + inline TResolveRequest(const TString& host, ui16 port) + : Host(host) + , Port(port) + { + } + + inline TNetworkAddressPtr Wait() { + E.Wait(); + + if (!Error) { + if (!Result) { + ythrow TNetworkResolutionError(EAI_AGAIN) << TStringBuf(": resolver down"); + } + + return Result; + } + + Error->Raise(); + + ythrow TNetworkResolutionError(EAI_FAIL) << TStringBuf(": shit happen"); + } + + inline void Resolve() noexcept { + try { + Result = new TNetworkAddress(Host, Port); + } catch (...) { + Error = SaveError(); + } + + Wake(); + } + + inline void Wake() noexcept { + E.Signal(); + } + + TString Host; + ui16 Port; + TManualEvent E; + TNetworkAddressPtr Result; + IErrorRef Error; + }; + + public: + inline TThreadedResolver() + : E_(TSystemEvent::rAuto) + { + T_.push_back(SystemThreadFactory()->Run(this)); + } + + inline ~TThreadedResolver() override { + Schedule(nullptr); + + for (size_t i = 0; i < T_.size(); ++i) { + T_[i]->Join(); + } + + { + TResolveRequest* rr = nullptr; + + while (Q_.Dequeue(&rr)) { + if (rr) { + rr->Wake(); + } + } + } + } + + static inline TThreadedResolver* Instance() { + return Singleton<TThreadedResolver>(); + } + + inline TNetworkAddressPtr Resolve(const TString& host, ui16 port) { + TResolveRequest rr(host, port); + + Schedule(&rr); + + return rr.Wait(); + } + + private: + inline void Schedule(TResolveRequest* rr) { + Q_.Enqueue(rr); + E_.Signal(); + } + + void DoExecute() override { + while (true) { + TResolveRequest* rr = nullptr; + + while (!Q_.Dequeue(&rr)) { + E_.Wait(); + } + + if (rr) { + rr->Resolve(); + } else { + break; + } + } + + Schedule(nullptr); + } + + private: + TLockFreeQueue<TResolveRequest*> Q_; + TSystemEvent E_; + typedef TAutoPtr<IThreadFactory::IThread> IThreadRef; + TVector<IThreadRef> T_; + }; +} + +namespace NDns { + TNetworkAddressPtr ThreadedResolve(const TString& host, ui16 port) { + return TThreadedResolver::Instance()->Resolve(host, port); + } +} |