aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/asio/io_service_impl.h
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/asio/io_service_impl.h
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/neh/asio/io_service_impl.h')
-rw-r--r--library/cpp/neh/asio/io_service_impl.h744
1 files changed, 744 insertions, 0 deletions
diff --git a/library/cpp/neh/asio/io_service_impl.h b/library/cpp/neh/asio/io_service_impl.h
new file mode 100644
index 0000000000..46fa9f9ee1
--- /dev/null
+++ b/library/cpp/neh/asio/io_service_impl.h
@@ -0,0 +1,744 @@
+#pragma once
+
+#include "asio.h"
+#include "poll_interrupter.h"
+
+#include <library/cpp/neh/lfqueue.h>
+#include <library/cpp/neh/pipequeue.h>
+
+#include <library/cpp/dns/cache.h>
+
+#include <util/generic/hash_set.h>
+#include <util/network/iovec.h>
+#include <util/network/pollerimpl.h>
+#include <util/thread/lfqueue.h>
+#include <util/thread/factory.h>
+
+#ifdef DEBUG_ASIO
+#define DBGOUT(args) Cout << args << Endl;
+#else
+#define DBGOUT(args)
+#endif
+
+namespace NAsio {
+ //TODO: copypaste from neh, - need fix
+ template <class T>
+ class TLockFreeSequence {
+ public:
+ inline TLockFreeSequence() {
+ memset((void*)T_, 0, sizeof(T_));
+ }
+
+ inline ~TLockFreeSequence() {
+ for (size_t i = 0; i < Y_ARRAY_SIZE(T_); ++i) {
+ delete[] T_[i];
+ }
+ }
+
+ inline T& Get(size_t n) {
+ const size_t i = GetValueBitCount(n + 1) - 1;
+
+ return GetList(i)[n + 1 - (((size_t)1) << i)];
+ }
+
+ private:
+ inline T* GetList(size_t n) {
+ T* volatile* t = T_ + n;
+
+ while (!*t) {
+ TArrayHolder<T> nt(new T[((size_t)1) << n]);
+
+ if (AtomicCas(t, nt.Get(), nullptr)) {
+ return nt.Release();
+ }
+ }
+
+ return *t;
+ }
+
+ private:
+ T* volatile T_[sizeof(size_t) * 8];
+ };
+
+ struct TOperationCompare {
+ template <class T>
+ static inline bool Compare(const T& l, const T& r) noexcept {
+ return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
+ }
+ };
+
+ //async operation, execute in contex TIOService()::Run() thread-executor
+ //usualy used for call functors/callbacks
+ class TOperation: public TRbTreeItem<TOperation, TOperationCompare>, public IHandlingContext {
+ public:
+ TOperation(TInstant deadline = TInstant::Max())
+ : D_(deadline)
+ , Speculative_(false)
+ , RequiredRepeatExecution_(false)
+ , ND_(deadline)
+ {
+ }
+
+ //register this operation in svc.impl.
+ virtual void AddOp(TIOService::TImpl&) = 0;
+
+ //return false, if operation not completed
+ virtual bool Execute(int errorCode = 0) = 0;
+
+ void ContinueUseHandler(TDeadline deadline) override {
+ RequiredRepeatExecution_ = true;
+ ND_ = deadline;
+ }
+
+ virtual void Finalize() = 0;
+
+ inline TInstant Deadline() const noexcept {
+ return D_;
+ }
+
+ inline TInstant DeadLine() const noexcept {
+ return D_;
+ }
+
+ inline bool Speculative() const noexcept {
+ return Speculative_;
+ }
+
+ inline bool IsRequiredRepeat() const noexcept {
+ return RequiredRepeatExecution_;
+ }
+
+ inline void PrepareReExecution() noexcept {
+ RequiredRepeatExecution_ = false;
+ D_ = ND_;
+ }
+
+ protected:
+ TInstant D_;
+ bool Speculative_; //if true, operation will be runned immediately after dequeue (even without wating any event)
+ //as sample used for optimisation writing, - obviously in buffers exist space for write
+ bool RequiredRepeatExecution_; //set to true, if required re-exec operation
+ TInstant ND_; //new deadline (for re-exec operation)
+ };
+
+ typedef TAutoPtr<TOperation> TOperationPtr;
+
+ class TNoneOperation: public TOperation {
+ public:
+ TNoneOperation(TInstant deadline = TInstant::Max())
+ : TOperation(deadline)
+ {
+ }
+
+ void AddOp(TIOService::TImpl&) override {
+ Y_ASSERT(0);
+ }
+
+ void Finalize() override {
+ }
+ };
+
+ class TPollFdEventHandler;
+
+ //descriptor use operation
+ class TFdOperation: public TOperation {
+ public:
+ enum TPollType {
+ PollRead,
+ PollWrite
+ };
+
+ TFdOperation(SOCKET fd, TPollType pt, TInstant deadline = TInstant::Max())
+ : TOperation(deadline)
+ , Fd_(fd)
+ , PT_(pt)
+ , PH_(nullptr)
+ {
+ Y_ASSERT(Fd() != INVALID_SOCKET);
+ }
+
+ inline SOCKET Fd() const noexcept {
+ return Fd_;
+ }
+
+ inline bool IsPollRead() const noexcept {
+ return PT_ == PollRead;
+ }
+
+ void AddOp(TIOService::TImpl& srv) override;
+
+ void Finalize() override;
+
+ protected:
+ SOCKET Fd_;
+ TPollType PT_;
+
+ public:
+ TAutoPtr<TPollFdEventHandler>* PH_;
+ };
+
+ typedef TAutoPtr<TFdOperation> TFdOperationPtr;
+
+ class TPollFdEventHandler {
+ public:
+ TPollFdEventHandler(SOCKET fd, TIOService::TImpl& srv)
+ : Fd_(fd)
+ , HandledEvents_(0)
+ , Srv_(srv)
+ {
+ }
+
+ virtual ~TPollFdEventHandler() {
+ Y_ASSERT(ReadOperations_.size() == 0);
+ Y_ASSERT(WriteOperations_.size() == 0);
+ }
+
+ inline void AddReadOp(TFdOperationPtr op) {
+ ReadOperations_.push_back(op);
+ }
+
+ inline void AddWriteOp(TFdOperationPtr op) {
+ WriteOperations_.push_back(op);
+ }
+
+ virtual void OnFdEvent(int status, ui16 filter) {
+ DBGOUT("PollEvent(fd=" << Fd_ << ", " << status << ", " << filter << ")");
+ if (status) {
+ ExecuteOperations(ReadOperations_, status);
+ ExecuteOperations(WriteOperations_, status);
+ } else {
+ if (filter & CONT_POLL_READ) {
+ ExecuteOperations(ReadOperations_, status);
+ }
+ if (filter & CONT_POLL_WRITE) {
+ ExecuteOperations(WriteOperations_, status);
+ }
+ }
+ }
+
+ typedef TVector<TFdOperationPtr> TFdOperations;
+
+ void ExecuteOperations(TFdOperations& oprs, int errorCode);
+
+ //return true if filter handled events changed and require re-configure events poller
+ virtual bool FixHandledEvents() noexcept {
+ DBGOUT("TPollFdEventHandler::FixHandledEvents()");
+ ui16 filter = 0;
+
+ if (WriteOperations_.size()) {
+ filter |= CONT_POLL_WRITE;
+ }
+ if (ReadOperations_.size()) {
+ filter |= CONT_POLL_READ;
+ }
+
+ if (Y_LIKELY(HandledEvents_ == filter)) {
+ return false;
+ }
+
+ HandledEvents_ = filter;
+ return true;
+ }
+
+ inline bool FinishOp(TFdOperations& oprs, TFdOperation* op) noexcept {
+ for (TFdOperations::iterator it = oprs.begin(); it != oprs.end(); ++it) {
+ if (it->Get() == op) {
+ FinishedOperations_.push_back(*it);
+ oprs.erase(it);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void DelOp(TFdOperation* op);
+
+ inline SOCKET Fd() const noexcept {
+ return Fd_;
+ }
+
+ inline ui16 HandledEvents() const noexcept {
+ return HandledEvents_;
+ }
+
+ inline void AddHandlingEvent(ui16 ev) noexcept {
+ HandledEvents_ |= ev;
+ }
+
+ inline void DestroyFinishedOperations() {
+ FinishedOperations_.clear();
+ }
+
+ TIOService::TImpl& GetServiceImpl() const noexcept {
+ return Srv_;
+ }
+
+ protected:
+ SOCKET Fd_;
+ ui16 HandledEvents_;
+ TIOService::TImpl& Srv_;
+
+ private:
+ TVector<TFdOperationPtr> ReadOperations_;
+ TVector<TFdOperationPtr> WriteOperations_;
+ // we can't immediatly destroy finished operations, this can cause closing used socket descriptor Fd_
+ // (on cascade deletion operation object-handler), but later we use Fd_ for modify handled events at poller,
+ // so we collect here finished operations and destroy it only after update poller, -
+ // call FixHandledEvents(TPollFdEventHandlerPtr&)
+ TVector<TFdOperationPtr> FinishedOperations_;
+ };
+
+ //additional descriptor for poller, used for interrupt current poll wait
+ class TInterrupterHandler: public TPollFdEventHandler {
+ public:
+ TInterrupterHandler(TIOService::TImpl& srv, TPollInterrupter& pi)
+ : TPollFdEventHandler(pi.Fd(), srv)
+ , PI_(pi)
+ {
+ HandledEvents_ = CONT_POLL_READ;
+ }
+
+ ~TInterrupterHandler() override {
+ DBGOUT("~TInterrupterHandler");
+ }
+
+ void OnFdEvent(int status, ui16 filter) override;
+
+ bool FixHandledEvents() noexcept override {
+ DBGOUT("TInterrupterHandler::FixHandledEvents()");
+ return false;
+ }
+
+ private:
+ TPollInterrupter& PI_;
+ };
+
+ namespace {
+ inline TAutoPtr<IPollerFace> CreatePoller() {
+ try {
+#if defined(_linux_)
+ return IPollerFace::Construct(TStringBuf("epoll"));
+#endif
+#if defined(_freebsd_) || defined(_darwin_)
+ return IPollerFace::Construct(TStringBuf("kqueue"));
+#endif
+ } catch (...) {
+ Cdbg << CurrentExceptionMessage() << Endl;
+ }
+ return IPollerFace::Default();
+ }
+ }
+
+ //some equivalent TContExecutor
+ class TIOService::TImpl: public TNonCopyable {
+ public:
+ typedef TAutoPtr<TPollFdEventHandler> TEvh;
+ typedef TLockFreeSequence<TEvh> TEventHandlers;
+
+ class TTimer {
+ public:
+ typedef THashSet<TOperation*> TOperations;
+
+ TTimer(TIOService::TImpl& srv)
+ : Srv_(srv)
+ {
+ }
+
+ virtual ~TTimer() {
+ FailOperations(ECANCELED);
+ }
+
+ void AddOp(TOperation* op) {
+ THolder<TOperation> tmp(op);
+ Operations_.insert(op);
+ Y_UNUSED(tmp.Release());
+ Srv_.RegisterOpDeadline(op);
+ Srv_.IncTimersOp();
+ }
+
+ void DelOp(TOperation* op) {
+ TOperations::iterator it = Operations_.find(op);
+ if (it != Operations_.end()) {
+ Srv_.DecTimersOp();
+ delete op;
+ Operations_.erase(it);
+ }
+ }
+
+ inline void FailOperations(int ec) {
+ for (auto operation : Operations_) {
+ try {
+ operation->Execute(ec); //throw ?
+ } catch (...) {
+ }
+ Srv_.DecTimersOp();
+ delete operation;
+ }
+ Operations_.clear();
+ }
+
+ TIOService::TImpl& GetIOServiceImpl() const noexcept {
+ return Srv_;
+ }
+
+ protected:
+ TIOService::TImpl& Srv_;
+ THashSet<TOperation*> Operations_;
+ };
+
+ class TTimers: public THashSet<TTimer*> {
+ public:
+ ~TTimers() {
+ for (auto it : *this) {
+ delete it;
+ }
+ }
+ };
+
+ TImpl()
+ : P_(CreatePoller())
+ , DeadlinesQueue_(*this)
+ {
+ }
+
+ ~TImpl() {
+ TOperationPtr op;
+
+ while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations
+ try {
+ op->Execute(ECANCELED);
+ } catch (...) {
+ }
+ op.Destroy();
+ }
+ }
+
+ //similar TContExecutor::Execute() or io_service::run()
+ //process event loop (exit if none to do (no timers or event handlers))
+ void Run();
+
+ //enqueue functor fo call in Run() eventloop (thread safing)
+ inline void Post(TCompletionHandler h) {
+ class TFuncOperation: public TNoneOperation {
+ public:
+ TFuncOperation(TCompletionHandler completionHandler)
+ : TNoneOperation()
+ , H_(std::move(completionHandler))
+ {
+ Speculative_ = true;
+ }
+
+ private:
+ //return false, if operation not completed
+ bool Execute(int errorCode) override {
+ Y_UNUSED(errorCode);
+ H_();
+ return true;
+ }
+
+ TCompletionHandler H_;
+ };
+
+ ScheduleOp(new TFuncOperation(std::move(h)));
+ }
+
+ //cancel all current operations (handlers be called with errorCode == ECANCELED)
+ void Abort();
+ bool HasAbort() {
+ return AtomicGet(HasAbort_);
+ }
+
+ inline void ScheduleOp(TOperationPtr op) { //throw std::bad_alloc
+ Y_ASSERT(!Aborted_);
+ Y_ASSERT(!!op);
+ OpQueue_.Enqueue(op);
+ Interrupt();
+ }
+
+ inline void Interrupt() noexcept {
+ AtomicSet(NeedCheckOpQueue_, 1);
+ if (AtomicAdd(IsWaiting_, 0) == 1) {
+ I_.Interrupt();
+ }
+ }
+
+ inline void UpdateOpDeadline(TOperation* op) {
+ TInstant oldDeadline = op->Deadline();
+ op->PrepareReExecution();
+
+ if (oldDeadline == op->Deadline()) {
+ return;
+ }
+
+ if (oldDeadline != TInstant::Max()) {
+ op->UnLink();
+ }
+ if (op->Deadline() != TInstant::Max()) {
+ DeadlinesQueue_.Register(op);
+ }
+ }
+
+ void SyncRegisterTimer(TTimer* t) {
+ Timers_.insert(t);
+ }
+
+ inline void SyncUnregisterAndDestroyTimer(TTimer* t) {
+ Timers_.erase(t);
+ delete t;
+ }
+
+ inline void IncTimersOp() noexcept {
+ ++TimersOpCnt_;
+ }
+
+ inline void DecTimersOp() noexcept {
+ --TimersOpCnt_;
+ }
+
+ inline void WorkStarted() {
+ AtomicIncrement(OutstandingWork_);
+ }
+
+ inline void WorkFinished() {
+ if (AtomicDecrement(OutstandingWork_) == 0) {
+ Interrupt();
+ }
+ }
+
+ private:
+ void ProcessAbort();
+
+ inline TEvh& EnsureGetEvh(SOCKET fd) {
+ TEvh& evh = Evh_.Get(fd);
+ if (!evh) {
+ evh.Reset(new TPollFdEventHandler(fd, *this));
+ }
+ return evh;
+ }
+
+ inline void OnTimeoutOp(TOperation* op) {
+ DBGOUT("OnTimeoutOp");
+ try {
+ op->Execute(ETIMEDOUT); //throw ?
+ } catch (...) {
+ op->Finalize();
+ throw;
+ }
+
+ if (op->IsRequiredRepeat()) {
+ //operation not completed
+ UpdateOpDeadline(op);
+ } else {
+ //destroy operation structure
+ op->Finalize();
+ }
+ }
+
+ public:
+ inline void FixHandledEvents(TEvh& evh) {
+ if (!!evh) {
+ if (evh->FixHandledEvents()) {
+ if (!evh->HandledEvents()) {
+ DelEventHandler(evh);
+ evh.Destroy();
+ } else {
+ ModEventHandler(evh);
+ evh->DestroyFinishedOperations();
+ }
+ } else {
+ evh->DestroyFinishedOperations();
+ }
+ }
+ }
+
+ private:
+ inline TEvh& GetHandlerForOp(TFdOperation* op) {
+ TEvh& evh = EnsureGetEvh(op->Fd());
+ op->PH_ = &evh;
+ return evh;
+ }
+
+ void ProcessOpQueue() {
+ if (!AtomicGet(NeedCheckOpQueue_)) {
+ return;
+ }
+ AtomicSet(NeedCheckOpQueue_, 0);
+
+ TOperationPtr op;
+
+ while (OpQueue_.Dequeue(&op)) {
+ if (op->Speculative()) {
+ if (op->Execute(Y_UNLIKELY(Aborted_) ? ECANCELED : 0)) {
+ op.Destroy();
+ continue; //operation completed
+ }
+
+ if (!op->IsRequiredRepeat()) {
+ op->PrepareReExecution();
+ }
+ }
+ RegisterOpDeadline(op.Get());
+ op.Get()->AddOp(*this); // ... -> AddOp()
+ Y_UNUSED(op.Release());
+ }
+ }
+
+ inline void RegisterOpDeadline(TOperation* op) {
+ if (op->DeadLine() != TInstant::Max()) {
+ DeadlinesQueue_.Register(op);
+ }
+ }
+
+ public:
+ inline void AddOp(TFdOperation* op) {
+ DBGOUT("AddOp<Fd>(" << op->Fd() << ")");
+ TEvh& evh = GetHandlerForOp(op);
+ if (op->IsPollRead()) {
+ evh->AddReadOp(op);
+ EnsureEventHandled(evh, CONT_POLL_READ);
+ } else {
+ evh->AddWriteOp(op);
+ EnsureEventHandled(evh, CONT_POLL_WRITE);
+ }
+ }
+
+ private:
+ inline void EnsureEventHandled(TEvh& evh, ui16 ev) {
+ if (!evh->HandledEvents()) {
+ evh->AddHandlingEvent(ev);
+ AddEventHandler(evh);
+ } else {
+ if ((evh->HandledEvents() & ev) == 0) {
+ evh->AddHandlingEvent(ev);
+ ModEventHandler(evh);
+ }
+ }
+ }
+
+ public:
+ //cancel all current operations for socket
+ //method MUST be called from Run() thread-executor
+ void CancelFdOp(SOCKET fd) {
+ TEvh& evh = Evh_.Get(fd);
+ if (!evh) {
+ return;
+ }
+
+ OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE);
+ }
+
+ private:
+ //helper for fixing handled events even in case exception
+ struct TExceptionProofFixerHandledEvents {
+ TExceptionProofFixerHandledEvents(TIOService::TImpl& srv, TEvh& iEvh)
+ : Srv_(srv)
+ , Evh_(iEvh)
+ {
+ }
+
+ ~TExceptionProofFixerHandledEvents() {
+ Srv_.FixHandledEvents(Evh_);
+ }
+
+ TIOService::TImpl& Srv_;
+ TEvh& Evh_;
+ };
+
+ inline void OnFdEvent(TEvh& evh, int status, ui16 filter) {
+ TExceptionProofFixerHandledEvents fixer(*this, evh);
+ Y_UNUSED(fixer);
+ evh->OnFdEvent(status, filter);
+ }
+
+ inline void AddEventHandler(TEvh& evh) {
+ if (evh->Fd() > MaxFd_) {
+ MaxFd_ = evh->Fd();
+ }
+ SetEventHandler(&evh, evh->Fd(), evh->HandledEvents());
+ ++FdEventHandlersCnt_;
+ }
+
+ inline void ModEventHandler(TEvh& evh) {
+ SetEventHandler(&evh, evh->Fd(), evh->HandledEvents());
+ }
+
+ inline void DelEventHandler(TEvh& evh) {
+ SetEventHandler(&evh, evh->Fd(), 0);
+ --FdEventHandlersCnt_;
+ }
+
+ inline void SetEventHandler(void* h, int fd, ui16 flags) {
+ DBGOUT("SetEventHandler(" << fd << ", " << flags << ")");
+ P_->Set(h, fd, flags);
+ }
+
+ //exception safe call DelEventHandler
+ struct TInterrupterKeeper {
+ TInterrupterKeeper(TImpl& srv, TEvh& iEvh)
+ : Srv_(srv)
+ , Evh_(iEvh)
+ {
+ Srv_.AddEventHandler(Evh_);
+ }
+
+ ~TInterrupterKeeper() {
+ Srv_.DelEventHandler(Evh_);
+ }
+
+ TImpl& Srv_;
+ TEvh& Evh_;
+ };
+
+ TAutoPtr<IPollerFace> P_;
+ TPollInterrupter I_;
+ TAtomic IsWaiting_ = 0;
+ TAtomic NeedCheckOpQueue_ = 0;
+ TAtomic OutstandingWork_ = 0;
+
+ NNeh::TAutoLockFreeQueue<TOperation> OpQueue_;
+
+ TEventHandlers Evh_; //i/o event handlers
+ TTimers Timers_; //timeout event handlers
+
+ size_t FdEventHandlersCnt_ = 0; //i/o event handlers counter
+ size_t TimersOpCnt_ = 0; //timers op counter
+ SOCKET MaxFd_ = 0; //max used descriptor num
+ TAtomic HasAbort_ = 0;
+ bool Aborted_ = false;
+
+ class TDeadlinesQueue {
+ public:
+ TDeadlinesQueue(TIOService::TImpl& srv)
+ : Srv_(srv)
+ {
+ }
+
+ inline void Register(TOperation* op) {
+ Deadlines_.Insert(op);
+ }
+
+ TInstant NextDeadline() {
+ TDeadlines::TIterator it = Deadlines_.Begin();
+
+ while (it != Deadlines_.End()) {
+ if (it->DeadLine() > TInstant::Now()) {
+ DBGOUT("TDeadlinesQueue::NewDeadline:" << (it->DeadLine().GetValue() - TInstant::Now().GetValue()));
+ return it->DeadLine();
+ }
+
+ TOperation* op = &*(it++);
+ Srv_.OnTimeoutOp(op);
+ }
+
+ return Deadlines_.Empty() ? TInstant::Max() : Deadlines_.Begin()->DeadLine();
+ }
+
+ private:
+ typedef TRbTree<TOperation, TOperationCompare> TDeadlines;
+ TDeadlines Deadlines_;
+ TIOService::TImpl& Srv_;
+ };
+
+ TDeadlinesQueue DeadlinesQueue_;
+ };
+}