aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/asio
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/asio
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/neh/asio')
-rw-r--r--library/cpp/neh/asio/asio.cpp187
-rw-r--r--library/cpp/neh/asio/asio.h280
-rw-r--r--library/cpp/neh/asio/deadline_timer_impl.cpp1
-rw-r--r--library/cpp/neh/asio/deadline_timer_impl.h110
-rw-r--r--library/cpp/neh/asio/executor.cpp1
-rw-r--r--library/cpp/neh/asio/executor.h76
-rw-r--r--library/cpp/neh/asio/io_service_impl.cpp161
-rw-r--r--library/cpp/neh/asio/io_service_impl.h744
-rw-r--r--library/cpp/neh/asio/poll_interrupter.cpp1
-rw-r--r--library/cpp/neh/asio/poll_interrupter.h107
-rw-r--r--library/cpp/neh/asio/tcp_acceptor_impl.cpp25
-rw-r--r--library/cpp/neh/asio/tcp_acceptor_impl.h76
-rw-r--r--library/cpp/neh/asio/tcp_socket_impl.cpp117
-rw-r--r--library/cpp/neh/asio/tcp_socket_impl.h332
14 files changed, 2218 insertions, 0 deletions
diff --git a/library/cpp/neh/asio/asio.cpp b/library/cpp/neh/asio/asio.cpp
new file mode 100644
index 0000000000..8b6cf383ea
--- /dev/null
+++ b/library/cpp/neh/asio/asio.cpp
@@ -0,0 +1,187 @@
+#include "io_service_impl.h"
+#include "deadline_timer_impl.h"
+#include "tcp_socket_impl.h"
+#include "tcp_acceptor_impl.h"
+
+using namespace NDns;
+using namespace NAsio;
+
+namespace NAsio {
+ TIOService::TWork::TWork(TWork& w)
+ : Srv_(w.Srv_)
+ {
+ Srv_.GetImpl().WorkStarted();
+ }
+
+ TIOService::TWork::TWork(TIOService& srv)
+ : Srv_(srv)
+ {
+ Srv_.GetImpl().WorkStarted();
+ }
+
+ TIOService::TWork::~TWork() {
+ Srv_.GetImpl().WorkFinished();
+ }
+
+ TIOService::TIOService()
+ : Impl_(new TImpl())
+ {
+ }
+
+ TIOService::~TIOService() {
+ }
+
+ void TIOService::Run() {
+ Impl_->Run();
+ }
+
+ void TIOService::Post(TCompletionHandler h) {
+ Impl_->Post(std::move(h));
+ }
+
+ void TIOService::Abort() {
+ Impl_->Abort();
+ }
+
+ TDeadlineTimer::TDeadlineTimer(TIOService& srv) noexcept
+ : Srv_(srv)
+ , Impl_(nullptr)
+ {
+ }
+
+ TDeadlineTimer::~TDeadlineTimer() {
+ if (Impl_) {
+ Srv_.GetImpl().ScheduleOp(new TUnregisterTimerOperation(Impl_));
+ }
+ }
+
+ void TDeadlineTimer::AsyncWaitExpireAt(TDeadline deadline, THandler h) {
+ if (!Impl_) {
+ Impl_ = new TDeadlineTimer::TImpl(Srv_.GetImpl());
+ Srv_.GetImpl().ScheduleOp(new TRegisterTimerOperation(Impl_));
+ }
+ Impl_->AsyncWaitExpireAt(deadline, h);
+ }
+
+ void TDeadlineTimer::Cancel() {
+ Impl_->Cancel();
+ }
+
+ TTcpSocket::TTcpSocket(TIOService& srv) noexcept
+ : Srv_(srv)
+ , Impl_(new TImpl(srv.GetImpl()))
+ {
+ }
+
+ TTcpSocket::~TTcpSocket() {
+ }
+
+ void TTcpSocket::AsyncConnect(const TEndpoint& ep, TTcpSocket::TConnectHandler h, TDeadline deadline) {
+ Impl_->AsyncConnect(ep, h, deadline);
+ }
+
+ void TTcpSocket::AsyncWrite(TSendedData& d, TTcpSocket::TWriteHandler h, TDeadline deadline) {
+ Impl_->AsyncWrite(d, h, deadline);
+ }
+
+ void TTcpSocket::AsyncWrite(TContIOVector* vec, TWriteHandler h, TDeadline deadline) {
+ Impl_->AsyncWrite(vec, h, deadline);
+ }
+
+ void TTcpSocket::AsyncWrite(const void* data, size_t size, TWriteHandler h, TDeadline deadline) {
+ class TBuffers: public IBuffers {
+ public:
+ TBuffers(const void* theData, size_t theSize)
+ : Part(theData, theSize)
+ , IOVec(&Part, 1)
+ {
+ }
+
+ TContIOVector* GetIOvec() override {
+ return &IOVec;
+ }
+
+ IOutputStream::TPart Part;
+ TContIOVector IOVec;
+ };
+
+ TSendedData d(new TBuffers(data, size));
+ Impl_->AsyncWrite(d, h, deadline);
+ }
+
+ void TTcpSocket::AsyncRead(void* buff, size_t size, TTcpSocket::TReadHandler h, TDeadline deadline) {
+ Impl_->AsyncRead(buff, size, h, deadline);
+ }
+
+ void TTcpSocket::AsyncReadSome(void* buff, size_t size, TTcpSocket::TReadHandler h, TDeadline deadline) {
+ Impl_->AsyncReadSome(buff, size, h, deadline);
+ }
+
+ void TTcpSocket::AsyncPollRead(TTcpSocket::TPollHandler h, TDeadline deadline) {
+ Impl_->AsyncPollRead(h, deadline);
+ }
+
+ void TTcpSocket::AsyncPollWrite(TTcpSocket::TPollHandler h, TDeadline deadline) {
+ Impl_->AsyncPollWrite(h, deadline);
+ }
+
+ void TTcpSocket::AsyncCancel() {
+ return Impl_->AsyncCancel();
+ }
+
+ size_t TTcpSocket::WriteSome(TContIOVector& d, TErrorCode& ec) noexcept {
+ return Impl_->WriteSome(d, ec);
+ }
+
+ size_t TTcpSocket::WriteSome(const void* buff, size_t size, TErrorCode& ec) noexcept {
+ return Impl_->WriteSome(buff, size, ec);
+ }
+
+ size_t TTcpSocket::ReadSome(void* buff, size_t size, TErrorCode& ec) noexcept {
+ return Impl_->ReadSome(buff, size, ec);
+ }
+
+ bool TTcpSocket::IsOpen() const noexcept {
+ return Native() != INVALID_SOCKET;
+ }
+
+ void TTcpSocket::Shutdown(TShutdownMode what, TErrorCode& ec) {
+ return Impl_->Shutdown(what, ec);
+ }
+
+ SOCKET TTcpSocket::Native() const noexcept {
+ return Impl_->Fd();
+ }
+
+ TEndpoint TTcpSocket::RemoteEndpoint() const {
+ return Impl_->RemoteEndpoint();
+ }
+
+ //////////////////////////////////
+
+ TTcpAcceptor::TTcpAcceptor(TIOService& srv) noexcept
+ : Srv_(srv)
+ , Impl_(new TImpl(srv.GetImpl()))
+ {
+ }
+
+ TTcpAcceptor::~TTcpAcceptor() {
+ }
+
+ void TTcpAcceptor::Bind(TEndpoint& ep, TErrorCode& ec) noexcept {
+ return Impl_->Bind(ep, ec);
+ }
+
+ void TTcpAcceptor::Listen(int backlog, TErrorCode& ec) noexcept {
+ return Impl_->Listen(backlog, ec);
+ }
+
+ void TTcpAcceptor::AsyncAccept(TTcpSocket& s, TTcpAcceptor::TAcceptHandler h, TDeadline deadline) {
+ return Impl_->AsyncAccept(s, h, deadline);
+ }
+
+ void TTcpAcceptor::AsyncCancel() {
+ Impl_->AsyncCancel();
+ }
+
+}
diff --git a/library/cpp/neh/asio/asio.h b/library/cpp/neh/asio/asio.h
new file mode 100644
index 0000000000..a902d663cf
--- /dev/null
+++ b/library/cpp/neh/asio/asio.h
@@ -0,0 +1,280 @@
+#pragma once
+
+//
+//primary header for work with asio
+//
+
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+#include <util/network/socket.h>
+#include <util/network/endpoint.h>
+#include <util/system/error.h>
+#include <util/stream/output.h>
+#include <functional>
+
+#include <library/cpp/dns/cache.h>
+
+//#define DEBUG_ASIO
+
+class TContIOVector;
+
+namespace NAsio {
+ class TErrorCode {
+ public:
+ inline TErrorCode(int val = 0) noexcept
+ : Val_(val)
+ {
+ }
+
+ typedef void (*TUnspecifiedBoolType)();
+
+ static void UnspecifiedBoolTrue() {
+ }
+
+ //safe cast to bool value
+ operator TUnspecifiedBoolType() const noexcept { // true if error
+ return Val_ == 0 ? nullptr : UnspecifiedBoolTrue;
+ }
+
+ bool operator!() const noexcept {
+ return Val_ == 0;
+ }
+
+ void Assign(int val) noexcept {
+ Val_ = val;
+ }
+
+ int Value() const noexcept {
+ return Val_;
+ }
+
+ TString Text() const {
+ if (!Val_) {
+ return TString();
+ }
+ return LastSystemErrorText(Val_);
+ }
+
+ void Check() {
+ if (Val_) {
+ throw TSystemError(Val_);
+ }
+ }
+
+ private:
+ int Val_;
+ };
+
+ //wrapper for TInstant, for enabling use TDuration (+TInstant::Now()) as deadline
+ class TDeadline: public TInstant {
+ public:
+ TDeadline()
+ : TInstant(TInstant::Max())
+ {
+ }
+
+ TDeadline(const TInstant& t)
+ : TInstant(t)
+ {
+ }
+
+ TDeadline(const TDuration& d)
+ : TInstant(TInstant::Now() + d)
+ {
+ }
+ };
+
+ class IHandlingContext {
+ public:
+ virtual ~IHandlingContext() {
+ }
+
+ //if handler throw exception, call this function be ignored
+ virtual void ContinueUseHandler(TDeadline deadline = TDeadline()) = 0;
+ };
+
+ typedef std::function<void()> TCompletionHandler;
+
+ class TIOService: public TNonCopyable {
+ public:
+ TIOService();
+ ~TIOService();
+
+ void Run();
+ void Post(TCompletionHandler); //call handler in Run() thread-executor
+ void Abort(); //in Run() all exist async i/o operations + timers receive error = ECANCELED, Run() exited
+
+ //counterpart boost::asio::io_service::work
+ class TWork {
+ public:
+ TWork(TWork&);
+ TWork(TIOService&);
+ ~TWork();
+
+ private:
+ void operator=(const TWork&); //disable
+
+ TIOService& Srv_;
+ };
+
+ class TImpl;
+
+ TImpl& GetImpl() noexcept {
+ return *Impl_;
+ }
+
+ private:
+ THolder<TImpl> Impl_;
+ };
+
+ class TDeadlineTimer: public TNonCopyable {
+ public:
+ typedef std::function<void(const TErrorCode& err, IHandlingContext&)> THandler;
+
+ TDeadlineTimer(TIOService&) noexcept;
+ ~TDeadlineTimer();
+
+ void AsyncWaitExpireAt(TDeadline, THandler);
+ void Cancel();
+
+ TIOService& GetIOService() const noexcept {
+ return Srv_;
+ }
+
+ class TImpl;
+
+ private:
+ TIOService& Srv_;
+ TImpl* Impl_;
+ };
+
+ class TTcpSocket: public TNonCopyable {
+ public:
+ class IBuffers {
+ public:
+ virtual ~IBuffers() {
+ }
+ virtual TContIOVector* GetIOvec() = 0;
+ };
+ typedef TAutoPtr<IBuffers> TSendedData;
+
+ typedef std::function<void(const TErrorCode& err, IHandlingContext&)> THandler;
+ typedef THandler TConnectHandler;
+ typedef std::function<void(const TErrorCode& err, size_t amount, IHandlingContext&)> TWriteHandler;
+ typedef std::function<void(const TErrorCode& err, size_t amount, IHandlingContext&)> TReadHandler;
+ typedef THandler TPollHandler;
+
+ enum TShutdownMode {
+ ShutdownReceive = SHUT_RD,
+ ShutdownSend = SHUT_WR,
+ ShutdownBoth = SHUT_RDWR
+ };
+
+ TTcpSocket(TIOService&) noexcept;
+ ~TTcpSocket();
+
+ void AsyncConnect(const TEndpoint& ep, TConnectHandler, TDeadline deadline = TDeadline());
+ void AsyncWrite(TSendedData&, TWriteHandler, TDeadline deadline = TDeadline());
+ void AsyncWrite(TContIOVector* buff, TWriteHandler, TDeadline deadline = TDeadline());
+ void AsyncWrite(const void* buff, size_t size, TWriteHandler, TDeadline deadline = TDeadline());
+ void AsyncRead(void* buff, size_t size, TReadHandler, TDeadline deadline = TDeadline());
+ void AsyncReadSome(void* buff, size_t size, TReadHandler, TDeadline deadline = TDeadline());
+ void AsyncPollWrite(TPollHandler, TDeadline deadline = TDeadline());
+ void AsyncPollRead(TPollHandler, TDeadline deadline = TDeadline());
+ void AsyncCancel();
+
+ //sync, but non blocked methods
+ size_t WriteSome(TContIOVector&, TErrorCode&) noexcept;
+ size_t WriteSome(const void* buff, size_t size, TErrorCode&) noexcept;
+ size_t ReadSome(void* buff, size_t size, TErrorCode&) noexcept;
+
+ bool IsOpen() const noexcept;
+ void Shutdown(TShutdownMode mode, TErrorCode& ec);
+
+ TIOService& GetIOService() const noexcept {
+ return Srv_;
+ }
+
+ SOCKET Native() const noexcept;
+
+ TEndpoint RemoteEndpoint() const;
+
+ inline size_t WriteSome(TContIOVector& v) {
+ TErrorCode ec;
+ size_t n = WriteSome(v, ec);
+ ec.Check();
+ return n;
+ }
+
+ inline size_t WriteSome(const void* buff, size_t size) {
+ TErrorCode ec;
+ size_t n = WriteSome(buff, size, ec);
+ ec.Check();
+ return n;
+ }
+
+ inline size_t ReadSome(void* buff, size_t size) {
+ TErrorCode ec;
+ size_t n = ReadSome(buff, size, ec);
+ ec.Check();
+ return n;
+ }
+
+ void Shutdown(TShutdownMode mode) {
+ TErrorCode ec;
+ Shutdown(mode, ec);
+ ec.Check();
+ }
+
+ class TImpl;
+
+ TImpl& GetImpl() const noexcept {
+ return *Impl_;
+ }
+
+ private:
+ TIOService& Srv_;
+ TIntrusivePtr<TImpl> Impl_;
+ };
+
+ class TTcpAcceptor: public TNonCopyable {
+ public:
+ typedef std::function<void(const TErrorCode& err, IHandlingContext&)> TAcceptHandler;
+
+ TTcpAcceptor(TIOService&) noexcept;
+ ~TTcpAcceptor();
+
+ void Bind(TEndpoint&, TErrorCode&) noexcept;
+ void Listen(int backlog, TErrorCode&) noexcept;
+
+ void AsyncAccept(TTcpSocket&, TAcceptHandler, TDeadline deadline = TDeadline());
+
+ void AsyncCancel();
+
+ inline void Bind(TEndpoint& ep) {
+ TErrorCode ec;
+ Bind(ep, ec);
+ ec.Check();
+ }
+ inline void Listen(int backlog) {
+ TErrorCode ec;
+ Listen(backlog, ec);
+ ec.Check();
+ }
+
+ TIOService& GetIOService() const noexcept {
+ return Srv_;
+ }
+
+ class TImpl;
+
+ TImpl& GetImpl() const noexcept {
+ return *Impl_;
+ }
+
+ private:
+ TIOService& Srv_;
+ TIntrusivePtr<TImpl> Impl_;
+ };
+}
diff --git a/library/cpp/neh/asio/deadline_timer_impl.cpp b/library/cpp/neh/asio/deadline_timer_impl.cpp
new file mode 100644
index 0000000000..399a4338fb
--- /dev/null
+++ b/library/cpp/neh/asio/deadline_timer_impl.cpp
@@ -0,0 +1 @@
+#include "deadline_timer_impl.h"
diff --git a/library/cpp/neh/asio/deadline_timer_impl.h b/library/cpp/neh/asio/deadline_timer_impl.h
new file mode 100644
index 0000000000..d9db625c94
--- /dev/null
+++ b/library/cpp/neh/asio/deadline_timer_impl.h
@@ -0,0 +1,110 @@
+#pragma once
+
+#include "io_service_impl.h"
+
+namespace NAsio {
+ class TTimerOperation: public TOperation {
+ public:
+ TTimerOperation(TIOService::TImpl::TTimer* t, TInstant deadline)
+ : TOperation(deadline)
+ , T_(t)
+ {
+ }
+
+ void AddOp(TIOService::TImpl&) override {
+ Y_ASSERT(0);
+ }
+
+ void Finalize() override {
+ DBGOUT("TTimerDeadlineOperation::Finalize()");
+ T_->DelOp(this);
+ }
+
+ protected:
+ TIOService::TImpl::TTimer* T_;
+ };
+
+ class TRegisterTimerOperation: public TTimerOperation {
+ public:
+ TRegisterTimerOperation(TIOService::TImpl::TTimer* t, TInstant deadline = TInstant::Max())
+ : TTimerOperation(t, deadline)
+ {
+ Speculative_ = true;
+ }
+
+ bool Execute(int errorCode) override {
+ Y_UNUSED(errorCode);
+ T_->GetIOServiceImpl().SyncRegisterTimer(T_);
+ return true;
+ }
+ };
+
+ class TTimerDeadlineOperation: public TTimerOperation {
+ public:
+ TTimerDeadlineOperation(TIOService::TImpl::TTimer* t, TDeadlineTimer::THandler h, TInstant deadline)
+ : TTimerOperation(t, deadline)
+ , H_(h)
+ {
+ }
+
+ void AddOp(TIOService::TImpl&) override {
+ T_->AddOp(this);
+ }
+
+ bool Execute(int errorCode) override {
+ DBGOUT("TTimerDeadlineOperation::Execute(" << errorCode << ")");
+ H_(errorCode == ETIMEDOUT ? 0 : errorCode, *this);
+ return true;
+ }
+
+ private:
+ TDeadlineTimer::THandler H_;
+ };
+
+ class TCancelTimerOperation: public TTimerOperation {
+ public:
+ TCancelTimerOperation(TIOService::TImpl::TTimer* t)
+ : TTimerOperation(t, TInstant::Max())
+ {
+ Speculative_ = true;
+ }
+
+ bool Execute(int errorCode) override {
+ Y_UNUSED(errorCode);
+ T_->FailOperations(ECANCELED);
+ return true;
+ }
+ };
+
+ class TUnregisterTimerOperation: public TTimerOperation {
+ public:
+ TUnregisterTimerOperation(TIOService::TImpl::TTimer* t, TInstant deadline = TInstant::Max())
+ : TTimerOperation(t, deadline)
+ {
+ Speculative_ = true;
+ }
+
+ bool Execute(int errorCode) override {
+ Y_UNUSED(errorCode);
+ DBGOUT("TUnregisterTimerOperation::Execute(" << errorCode << ")");
+ T_->GetIOServiceImpl().SyncUnregisterAndDestroyTimer(T_);
+ return true;
+ }
+ };
+
+ class TDeadlineTimer::TImpl: public TIOService::TImpl::TTimer {
+ public:
+ TImpl(TIOService::TImpl& srv)
+ : TIOService::TImpl::TTimer(srv)
+ {
+ }
+
+ void AsyncWaitExpireAt(TDeadline d, TDeadlineTimer::THandler h) {
+ Srv_.ScheduleOp(new TTimerDeadlineOperation(this, h, d));
+ }
+
+ void Cancel() {
+ Srv_.ScheduleOp(new TCancelTimerOperation(this));
+ }
+ };
+}
diff --git a/library/cpp/neh/asio/executor.cpp b/library/cpp/neh/asio/executor.cpp
new file mode 100644
index 0000000000..03b26bf847
--- /dev/null
+++ b/library/cpp/neh/asio/executor.cpp
@@ -0,0 +1 @@
+#include "executor.h"
diff --git a/library/cpp/neh/asio/executor.h b/library/cpp/neh/asio/executor.h
new file mode 100644
index 0000000000..4f6549044d
--- /dev/null
+++ b/library/cpp/neh/asio/executor.h
@@ -0,0 +1,76 @@
+#pragma once
+
+#include "asio.h"
+
+#include <library/cpp/deprecated/atomic/atomic.h>
+
+#include <util/thread/factory.h>
+#include <util/system/thread.h>
+
+namespace NAsio {
+ class TIOServiceExecutor: public IThreadFactory::IThreadAble {
+ public:
+ TIOServiceExecutor()
+ : Work_(new TIOService::TWork(Srv_))
+ {
+ T_ = SystemThreadFactory()->Run(this);
+ }
+
+ ~TIOServiceExecutor() override {
+ SyncShutdown();
+ }
+
+ void DoExecute() override {
+ TThread::SetCurrentThreadName("NehAsioExecutor");
+ Srv_.Run();
+ }
+
+ inline TIOService& GetIOService() noexcept {
+ return Srv_;
+ }
+
+ void SyncShutdown() {
+ if (Work_) {
+ Work_.Destroy();
+ Srv_.Abort(); //cancel all async operations, break Run() execution
+ T_->Join();
+ }
+ }
+
+ private:
+ TIOService Srv_;
+ TAutoPtr<TIOService::TWork> Work_;
+ typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
+ IThreadRef T_;
+ };
+
+ class TExecutorsPool {
+ public:
+ TExecutorsPool(size_t executors)
+ : C_(0)
+ {
+ for (size_t i = 0; i < executors; ++i) {
+ E_.push_back(new TIOServiceExecutor());
+ }
+ }
+
+ inline size_t Size() const noexcept {
+ return E_.size();
+ }
+
+ inline TIOServiceExecutor& GetExecutor() noexcept {
+ TAtomicBase next = AtomicIncrement(C_);
+ return *E_[next % E_.size()];
+ }
+
+ void SyncShutdown() {
+ for (size_t i = 0; i < E_.size(); ++i) {
+ E_[i]->SyncShutdown();
+ }
+ }
+
+ private:
+ TAtomic C_;
+ TVector<TAutoPtr<TIOServiceExecutor>> E_;
+ };
+}
diff --git a/library/cpp/neh/asio/io_service_impl.cpp b/library/cpp/neh/asio/io_service_impl.cpp
new file mode 100644
index 0000000000..d49b3fb03e
--- /dev/null
+++ b/library/cpp/neh/asio/io_service_impl.cpp
@@ -0,0 +1,161 @@
+#include "io_service_impl.h"
+
+#include <library/cpp/coroutine/engine/poller.h>
+
+using namespace NAsio;
+
+void TFdOperation::AddOp(TIOService::TImpl& srv) {
+ srv.AddOp(this);
+}
+
+void TFdOperation::Finalize() {
+ (*PH_)->DelOp(this);
+}
+
+void TPollFdEventHandler::ExecuteOperations(TFdOperations& oprs, int errorCode) {
+ TFdOperations::iterator it = oprs.begin();
+
+ try {
+ while (it != oprs.end()) {
+ TFdOperation* op = it->Get();
+
+ if (op->Execute(errorCode)) { // throw ?
+ if (op->IsRequiredRepeat()) {
+ Srv_.UpdateOpDeadline(op);
+ ++it; //operation completed, but want be repeated
+ } else {
+ FinishedOperations_.push_back(*it);
+ it = oprs.erase(it);
+ }
+ } else {
+ ++it; //operation not completed
+ }
+ }
+ } catch (...) {
+ if (it != oprs.end()) {
+ FinishedOperations_.push_back(*it);
+ oprs.erase(it);
+ }
+ throw;
+ }
+}
+
+void TPollFdEventHandler::DelOp(TFdOperation* op) {
+ TAutoPtr<TPollFdEventHandler>& evh = *op->PH_;
+
+ if (op->IsPollRead()) {
+ Y_ASSERT(FinishOp(ReadOperations_, op));
+ } else {
+ Y_ASSERT(FinishOp(WriteOperations_, op));
+ }
+ Srv_.FixHandledEvents(evh); //alarm, - 'this' can be destroyed here!
+}
+
+void TInterrupterHandler::OnFdEvent(int status, ui16 filter) {
+ if (!status && (filter & CONT_POLL_READ)) {
+ PI_.Reset();
+ }
+}
+
+void TIOService::TImpl::Run() {
+ TEvh& iEvh = Evh_.Get(I_.Fd());
+ iEvh.Reset(new TInterrupterHandler(*this, I_));
+
+ TInterrupterKeeper ik(*this, iEvh);
+ Y_UNUSED(ik);
+ IPollerFace::TEvents evs;
+ AtomicSet(NeedCheckOpQueue_, 1);
+ TInstant deadline;
+
+ while (Y_LIKELY(!Aborted_ && (AtomicGet(OutstandingWork_) || FdEventHandlersCnt_ > 1 || TimersOpCnt_ || AtomicGet(NeedCheckOpQueue_)))) {
+ //while
+ // expected work (external flag)
+ // or have event handlers (exclude interrupter)
+ // or have not completed timer operation
+ // or have any operation in queues
+
+ AtomicIncrement(IsWaiting_);
+ if (!AtomicGet(NeedCheckOpQueue_)) {
+ P_->Wait(evs, deadline);
+ }
+ AtomicDecrement(IsWaiting_);
+
+ if (evs.size()) {
+ for (IPollerFace::TEvents::const_iterator iev = evs.begin(); iev != evs.end() && !Aborted_; ++iev) {
+ const IPollerFace::TEvent& ev = *iev;
+ TEvh& evh = *(TEvh*)ev.Data;
+
+ if (!evh) {
+ continue; //op. cancel (see ProcessOpQueue) can destroy evh
+ }
+
+ int status = ev.Status;
+ if (ev.Status == EIO) {
+ int error = status;
+ if (GetSockOpt(evh->Fd(), SOL_SOCKET, SO_ERROR, error) == 0) {
+ status = error;
+ }
+ }
+
+ OnFdEvent(evh, status, ev.Filter); //here handle fd events
+ //immediatly after handling events for one descriptor check op. queue
+ //often queue can contain another operation for this fd (next async read as sample)
+ //so we can optimize redundant epoll_ctl (or similar) calls
+ ProcessOpQueue();
+ }
+
+ evs.clear();
+ } else {
+ ProcessOpQueue();
+ }
+
+ deadline = DeadlinesQueue_.NextDeadline(); //here handle timeouts/process timers
+ }
+}
+
+void TIOService::TImpl::Abort() {
+ class TAbortOperation: public TNoneOperation {
+ public:
+ TAbortOperation(TIOService::TImpl& srv)
+ : TNoneOperation()
+ , Srv_(srv)
+ {
+ Speculative_ = true;
+ }
+
+ private:
+ bool Execute(int errorCode) override {
+ Y_UNUSED(errorCode);
+ Srv_.ProcessAbort();
+ return true;
+ }
+
+ TIOService::TImpl& Srv_;
+ };
+ AtomicSet(HasAbort_, 1);
+ ScheduleOp(new TAbortOperation(*this));
+}
+
+void TIOService::TImpl::ProcessAbort() {
+ Aborted_ = true;
+
+ for (int fd = 0; fd <= MaxFd_; ++fd) {
+ TEvh& evh = Evh_.Get(fd);
+ if (!!evh && evh->Fd() != I_.Fd()) {
+ OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE);
+ }
+ }
+
+ for (auto t : Timers_) {
+ t->FailOperations(ECANCELED);
+ }
+
+ TOperationPtr op;
+ while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations
+ try {
+ op->Execute(ECANCELED);
+ } catch (...) {
+ }
+ op.Destroy();
+ }
+}
diff --git a/library/cpp/neh/asio/io_service_impl.h b/library/cpp/neh/asio/io_service_impl.h
new file mode 100644
index 0000000000..46fa9f9ee1
--- /dev/null
+++ b/library/cpp/neh/asio/io_service_impl.h
@@ -0,0 +1,744 @@
+#pragma once
+
+#include "asio.h"
+#include "poll_interrupter.h"
+
+#include <library/cpp/neh/lfqueue.h>
+#include <library/cpp/neh/pipequeue.h>
+
+#include <library/cpp/dns/cache.h>
+
+#include <util/generic/hash_set.h>
+#include <util/network/iovec.h>
+#include <util/network/pollerimpl.h>
+#include <util/thread/lfqueue.h>
+#include <util/thread/factory.h>
+
+#ifdef DEBUG_ASIO
+#define DBGOUT(args) Cout << args << Endl;
+#else
+#define DBGOUT(args)
+#endif
+
+namespace NAsio {
+ //TODO: copypaste from neh, - need fix
+ template <class T>
+ class TLockFreeSequence {
+ public:
+ inline TLockFreeSequence() {
+ memset((void*)T_, 0, sizeof(T_));
+ }
+
+ inline ~TLockFreeSequence() {
+ for (size_t i = 0; i < Y_ARRAY_SIZE(T_); ++i) {
+ delete[] T_[i];
+ }
+ }
+
+ inline T& Get(size_t n) {
+ const size_t i = GetValueBitCount(n + 1) - 1;
+
+ return GetList(i)[n + 1 - (((size_t)1) << i)];
+ }
+
+ private:
+ inline T* GetList(size_t n) {
+ T* volatile* t = T_ + n;
+
+ while (!*t) {
+ TArrayHolder<T> nt(new T[((size_t)1) << n]);
+
+ if (AtomicCas(t, nt.Get(), nullptr)) {
+ return nt.Release();
+ }
+ }
+
+ return *t;
+ }
+
+ private:
+ T* volatile T_[sizeof(size_t) * 8];
+ };
+
+ struct TOperationCompare {
+ template <class T>
+ static inline bool Compare(const T& l, const T& r) noexcept {
+ return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
+ }
+ };
+
+ //async operation, execute in contex TIOService()::Run() thread-executor
+ //usualy used for call functors/callbacks
+ class TOperation: public TRbTreeItem<TOperation, TOperationCompare>, public IHandlingContext {
+ public:
+ TOperation(TInstant deadline = TInstant::Max())
+ : D_(deadline)
+ , Speculative_(false)
+ , RequiredRepeatExecution_(false)
+ , ND_(deadline)
+ {
+ }
+
+ //register this operation in svc.impl.
+ virtual void AddOp(TIOService::TImpl&) = 0;
+
+ //return false, if operation not completed
+ virtual bool Execute(int errorCode = 0) = 0;
+
+ void ContinueUseHandler(TDeadline deadline) override {
+ RequiredRepeatExecution_ = true;
+ ND_ = deadline;
+ }
+
+ virtual void Finalize() = 0;
+
+ inline TInstant Deadline() const noexcept {
+ return D_;
+ }
+
+ inline TInstant DeadLine() const noexcept {
+ return D_;
+ }
+
+ inline bool Speculative() const noexcept {
+ return Speculative_;
+ }
+
+ inline bool IsRequiredRepeat() const noexcept {
+ return RequiredRepeatExecution_;
+ }
+
+ inline void PrepareReExecution() noexcept {
+ RequiredRepeatExecution_ = false;
+ D_ = ND_;
+ }
+
+ protected:
+ TInstant D_;
+ bool Speculative_; //if true, operation will be runned immediately after dequeue (even without wating any event)
+ //as sample used for optimisation writing, - obviously in buffers exist space for write
+ bool RequiredRepeatExecution_; //set to true, if required re-exec operation
+ TInstant ND_; //new deadline (for re-exec operation)
+ };
+
+ typedef TAutoPtr<TOperation> TOperationPtr;
+
+ class TNoneOperation: public TOperation {
+ public:
+ TNoneOperation(TInstant deadline = TInstant::Max())
+ : TOperation(deadline)
+ {
+ }
+
+ void AddOp(TIOService::TImpl&) override {
+ Y_ASSERT(0);
+ }
+
+ void Finalize() override {
+ }
+ };
+
+ class TPollFdEventHandler;
+
+ //descriptor use operation
+ class TFdOperation: public TOperation {
+ public:
+ enum TPollType {
+ PollRead,
+ PollWrite
+ };
+
+ TFdOperation(SOCKET fd, TPollType pt, TInstant deadline = TInstant::Max())
+ : TOperation(deadline)
+ , Fd_(fd)
+ , PT_(pt)
+ , PH_(nullptr)
+ {
+ Y_ASSERT(Fd() != INVALID_SOCKET);
+ }
+
+ inline SOCKET Fd() const noexcept {
+ return Fd_;
+ }
+
+ inline bool IsPollRead() const noexcept {
+ return PT_ == PollRead;
+ }
+
+ void AddOp(TIOService::TImpl& srv) override;
+
+ void Finalize() override;
+
+ protected:
+ SOCKET Fd_;
+ TPollType PT_;
+
+ public:
+ TAutoPtr<TPollFdEventHandler>* PH_;
+ };
+
+ typedef TAutoPtr<TFdOperation> TFdOperationPtr;
+
+ class TPollFdEventHandler {
+ public:
+ TPollFdEventHandler(SOCKET fd, TIOService::TImpl& srv)
+ : Fd_(fd)
+ , HandledEvents_(0)
+ , Srv_(srv)
+ {
+ }
+
+ virtual ~TPollFdEventHandler() {
+ Y_ASSERT(ReadOperations_.size() == 0);
+ Y_ASSERT(WriteOperations_.size() == 0);
+ }
+
+ inline void AddReadOp(TFdOperationPtr op) {
+ ReadOperations_.push_back(op);
+ }
+
+ inline void AddWriteOp(TFdOperationPtr op) {
+ WriteOperations_.push_back(op);
+ }
+
+ virtual void OnFdEvent(int status, ui16 filter) {
+ DBGOUT("PollEvent(fd=" << Fd_ << ", " << status << ", " << filter << ")");
+ if (status) {
+ ExecuteOperations(ReadOperations_, status);
+ ExecuteOperations(WriteOperations_, status);
+ } else {
+ if (filter & CONT_POLL_READ) {
+ ExecuteOperations(ReadOperations_, status);
+ }
+ if (filter & CONT_POLL_WRITE) {
+ ExecuteOperations(WriteOperations_, status);
+ }
+ }
+ }
+
+ typedef TVector<TFdOperationPtr> TFdOperations;
+
+ void ExecuteOperations(TFdOperations& oprs, int errorCode);
+
+ //return true if filter handled events changed and require re-configure events poller
+ virtual bool FixHandledEvents() noexcept {
+ DBGOUT("TPollFdEventHandler::FixHandledEvents()");
+ ui16 filter = 0;
+
+ if (WriteOperations_.size()) {
+ filter |= CONT_POLL_WRITE;
+ }
+ if (ReadOperations_.size()) {
+ filter |= CONT_POLL_READ;
+ }
+
+ if (Y_LIKELY(HandledEvents_ == filter)) {
+ return false;
+ }
+
+ HandledEvents_ = filter;
+ return true;
+ }
+
+ inline bool FinishOp(TFdOperations& oprs, TFdOperation* op) noexcept {
+ for (TFdOperations::iterator it = oprs.begin(); it != oprs.end(); ++it) {
+ if (it->Get() == op) {
+ FinishedOperations_.push_back(*it);
+ oprs.erase(it);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void DelOp(TFdOperation* op);
+
+ inline SOCKET Fd() const noexcept {
+ return Fd_;
+ }
+
+ inline ui16 HandledEvents() const noexcept {
+ return HandledEvents_;
+ }
+
+ inline void AddHandlingEvent(ui16 ev) noexcept {
+ HandledEvents_ |= ev;
+ }
+
+ inline void DestroyFinishedOperations() {
+ FinishedOperations_.clear();
+ }
+
+ TIOService::TImpl& GetServiceImpl() const noexcept {
+ return Srv_;
+ }
+
+ protected:
+ SOCKET Fd_;
+ ui16 HandledEvents_;
+ TIOService::TImpl& Srv_;
+
+ private:
+ TVector<TFdOperationPtr> ReadOperations_;
+ TVector<TFdOperationPtr> WriteOperations_;
+ // we can't immediatly destroy finished operations, this can cause closing used socket descriptor Fd_
+ // (on cascade deletion operation object-handler), but later we use Fd_ for modify handled events at poller,
+ // so we collect here finished operations and destroy it only after update poller, -
+ // call FixHandledEvents(TPollFdEventHandlerPtr&)
+ TVector<TFdOperationPtr> FinishedOperations_;
+ };
+
+ //additional descriptor for poller, used for interrupt current poll wait
+ class TInterrupterHandler: public TPollFdEventHandler {
+ public:
+ TInterrupterHandler(TIOService::TImpl& srv, TPollInterrupter& pi)
+ : TPollFdEventHandler(pi.Fd(), srv)
+ , PI_(pi)
+ {
+ HandledEvents_ = CONT_POLL_READ;
+ }
+
+ ~TInterrupterHandler() override {
+ DBGOUT("~TInterrupterHandler");
+ }
+
+ void OnFdEvent(int status, ui16 filter) override;
+
+ bool FixHandledEvents() noexcept override {
+ DBGOUT("TInterrupterHandler::FixHandledEvents()");
+ return false;
+ }
+
+ private:
+ TPollInterrupter& PI_;
+ };
+
+ namespace {
+ inline TAutoPtr<IPollerFace> CreatePoller() {
+ try {
+#if defined(_linux_)
+ return IPollerFace::Construct(TStringBuf("epoll"));
+#endif
+#if defined(_freebsd_) || defined(_darwin_)
+ return IPollerFace::Construct(TStringBuf("kqueue"));
+#endif
+ } catch (...) {
+ Cdbg << CurrentExceptionMessage() << Endl;
+ }
+ return IPollerFace::Default();
+ }
+ }
+
+ //some equivalent TContExecutor
+ class TIOService::TImpl: public TNonCopyable {
+ public:
+ typedef TAutoPtr<TPollFdEventHandler> TEvh;
+ typedef TLockFreeSequence<TEvh> TEventHandlers;
+
+ class TTimer {
+ public:
+ typedef THashSet<TOperation*> TOperations;
+
+ TTimer(TIOService::TImpl& srv)
+ : Srv_(srv)
+ {
+ }
+
+ virtual ~TTimer() {
+ FailOperations(ECANCELED);
+ }
+
+ void AddOp(TOperation* op) {
+ THolder<TOperation> tmp(op);
+ Operations_.insert(op);
+ Y_UNUSED(tmp.Release());
+ Srv_.RegisterOpDeadline(op);
+ Srv_.IncTimersOp();
+ }
+
+ void DelOp(TOperation* op) {
+ TOperations::iterator it = Operations_.find(op);
+ if (it != Operations_.end()) {
+ Srv_.DecTimersOp();
+ delete op;
+ Operations_.erase(it);
+ }
+ }
+
+ inline void FailOperations(int ec) {
+ for (auto operation : Operations_) {
+ try {
+ operation->Execute(ec); //throw ?
+ } catch (...) {
+ }
+ Srv_.DecTimersOp();
+ delete operation;
+ }
+ Operations_.clear();
+ }
+
+ TIOService::TImpl& GetIOServiceImpl() const noexcept {
+ return Srv_;
+ }
+
+ protected:
+ TIOService::TImpl& Srv_;
+ THashSet<TOperation*> Operations_;
+ };
+
+ class TTimers: public THashSet<TTimer*> {
+ public:
+ ~TTimers() {
+ for (auto it : *this) {
+ delete it;
+ }
+ }
+ };
+
+ TImpl()
+ : P_(CreatePoller())
+ , DeadlinesQueue_(*this)
+ {
+ }
+
+ ~TImpl() {
+ TOperationPtr op;
+
+ while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations
+ try {
+ op->Execute(ECANCELED);
+ } catch (...) {
+ }
+ op.Destroy();
+ }
+ }
+
+ //similar TContExecutor::Execute() or io_service::run()
+ //process event loop (exit if none to do (no timers or event handlers))
+ void Run();
+
+ //enqueue functor fo call in Run() eventloop (thread safing)
+ inline void Post(TCompletionHandler h) {
+ class TFuncOperation: public TNoneOperation {
+ public:
+ TFuncOperation(TCompletionHandler completionHandler)
+ : TNoneOperation()
+ , H_(std::move(completionHandler))
+ {
+ Speculative_ = true;
+ }
+
+ private:
+ //return false, if operation not completed
+ bool Execute(int errorCode) override {
+ Y_UNUSED(errorCode);
+ H_();
+ return true;
+ }
+
+ TCompletionHandler H_;
+ };
+
+ ScheduleOp(new TFuncOperation(std::move(h)));
+ }
+
+ //cancel all current operations (handlers be called with errorCode == ECANCELED)
+ void Abort();
+ bool HasAbort() {
+ return AtomicGet(HasAbort_);
+ }
+
+ inline void ScheduleOp(TOperationPtr op) { //throw std::bad_alloc
+ Y_ASSERT(!Aborted_);
+ Y_ASSERT(!!op);
+ OpQueue_.Enqueue(op);
+ Interrupt();
+ }
+
+ inline void Interrupt() noexcept {
+ AtomicSet(NeedCheckOpQueue_, 1);
+ if (AtomicAdd(IsWaiting_, 0) == 1) {
+ I_.Interrupt();
+ }
+ }
+
+ inline void UpdateOpDeadline(TOperation* op) {
+ TInstant oldDeadline = op->Deadline();
+ op->PrepareReExecution();
+
+ if (oldDeadline == op->Deadline()) {
+ return;
+ }
+
+ if (oldDeadline != TInstant::Max()) {
+ op->UnLink();
+ }
+ if (op->Deadline() != TInstant::Max()) {
+ DeadlinesQueue_.Register(op);
+ }
+ }
+
+ void SyncRegisterTimer(TTimer* t) {
+ Timers_.insert(t);
+ }
+
+ inline void SyncUnregisterAndDestroyTimer(TTimer* t) {
+ Timers_.erase(t);
+ delete t;
+ }
+
+ inline void IncTimersOp() noexcept {
+ ++TimersOpCnt_;
+ }
+
+ inline void DecTimersOp() noexcept {
+ --TimersOpCnt_;
+ }
+
+ inline void WorkStarted() {
+ AtomicIncrement(OutstandingWork_);
+ }
+
+ inline void WorkFinished() {
+ if (AtomicDecrement(OutstandingWork_) == 0) {
+ Interrupt();
+ }
+ }
+
+ private:
+ void ProcessAbort();
+
+ inline TEvh& EnsureGetEvh(SOCKET fd) {
+ TEvh& evh = Evh_.Get(fd);
+ if (!evh) {
+ evh.Reset(new TPollFdEventHandler(fd, *this));
+ }
+ return evh;
+ }
+
+ inline void OnTimeoutOp(TOperation* op) {
+ DBGOUT("OnTimeoutOp");
+ try {
+ op->Execute(ETIMEDOUT); //throw ?
+ } catch (...) {
+ op->Finalize();
+ throw;
+ }
+
+ if (op->IsRequiredRepeat()) {
+ //operation not completed
+ UpdateOpDeadline(op);
+ } else {
+ //destroy operation structure
+ op->Finalize();
+ }
+ }
+
+ public:
+ inline void FixHandledEvents(TEvh& evh) {
+ if (!!evh) {
+ if (evh->FixHandledEvents()) {
+ if (!evh->HandledEvents()) {
+ DelEventHandler(evh);
+ evh.Destroy();
+ } else {
+ ModEventHandler(evh);
+ evh->DestroyFinishedOperations();
+ }
+ } else {
+ evh->DestroyFinishedOperations();
+ }
+ }
+ }
+
+ private:
+ inline TEvh& GetHandlerForOp(TFdOperation* op) {
+ TEvh& evh = EnsureGetEvh(op->Fd());
+ op->PH_ = &evh;
+ return evh;
+ }
+
+ void ProcessOpQueue() {
+ if (!AtomicGet(NeedCheckOpQueue_)) {
+ return;
+ }
+ AtomicSet(NeedCheckOpQueue_, 0);
+
+ TOperationPtr op;
+
+ while (OpQueue_.Dequeue(&op)) {
+ if (op->Speculative()) {
+ if (op->Execute(Y_UNLIKELY(Aborted_) ? ECANCELED : 0)) {
+ op.Destroy();
+ continue; //operation completed
+ }
+
+ if (!op->IsRequiredRepeat()) {
+ op->PrepareReExecution();
+ }
+ }
+ RegisterOpDeadline(op.Get());
+ op.Get()->AddOp(*this); // ... -> AddOp()
+ Y_UNUSED(op.Release());
+ }
+ }
+
+ inline void RegisterOpDeadline(TOperation* op) {
+ if (op->DeadLine() != TInstant::Max()) {
+ DeadlinesQueue_.Register(op);
+ }
+ }
+
+ public:
+ inline void AddOp(TFdOperation* op) {
+ DBGOUT("AddOp<Fd>(" << op->Fd() << ")");
+ TEvh& evh = GetHandlerForOp(op);
+ if (op->IsPollRead()) {
+ evh->AddReadOp(op);
+ EnsureEventHandled(evh, CONT_POLL_READ);
+ } else {
+ evh->AddWriteOp(op);
+ EnsureEventHandled(evh, CONT_POLL_WRITE);
+ }
+ }
+
+ private:
+ inline void EnsureEventHandled(TEvh& evh, ui16 ev) {
+ if (!evh->HandledEvents()) {
+ evh->AddHandlingEvent(ev);
+ AddEventHandler(evh);
+ } else {
+ if ((evh->HandledEvents() & ev) == 0) {
+ evh->AddHandlingEvent(ev);
+ ModEventHandler(evh);
+ }
+ }
+ }
+
+ public:
+ //cancel all current operations for socket
+ //method MUST be called from Run() thread-executor
+ void CancelFdOp(SOCKET fd) {
+ TEvh& evh = Evh_.Get(fd);
+ if (!evh) {
+ return;
+ }
+
+ OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE);
+ }
+
+ private:
+ //helper for fixing handled events even in case exception
+ struct TExceptionProofFixerHandledEvents {
+ TExceptionProofFixerHandledEvents(TIOService::TImpl& srv, TEvh& iEvh)
+ : Srv_(srv)
+ , Evh_(iEvh)
+ {
+ }
+
+ ~TExceptionProofFixerHandledEvents() {
+ Srv_.FixHandledEvents(Evh_);
+ }
+
+ TIOService::TImpl& Srv_;
+ TEvh& Evh_;
+ };
+
+ inline void OnFdEvent(TEvh& evh, int status, ui16 filter) {
+ TExceptionProofFixerHandledEvents fixer(*this, evh);
+ Y_UNUSED(fixer);
+ evh->OnFdEvent(status, filter);
+ }
+
+ inline void AddEventHandler(TEvh& evh) {
+ if (evh->Fd() > MaxFd_) {
+ MaxFd_ = evh->Fd();
+ }
+ SetEventHandler(&evh, evh->Fd(), evh->HandledEvents());
+ ++FdEventHandlersCnt_;
+ }
+
+ inline void ModEventHandler(TEvh& evh) {
+ SetEventHandler(&evh, evh->Fd(), evh->HandledEvents());
+ }
+
+ inline void DelEventHandler(TEvh& evh) {
+ SetEventHandler(&evh, evh->Fd(), 0);
+ --FdEventHandlersCnt_;
+ }
+
+ inline void SetEventHandler(void* h, int fd, ui16 flags) {
+ DBGOUT("SetEventHandler(" << fd << ", " << flags << ")");
+ P_->Set(h, fd, flags);
+ }
+
+ //exception safe call DelEventHandler
+ struct TInterrupterKeeper {
+ TInterrupterKeeper(TImpl& srv, TEvh& iEvh)
+ : Srv_(srv)
+ , Evh_(iEvh)
+ {
+ Srv_.AddEventHandler(Evh_);
+ }
+
+ ~TInterrupterKeeper() {
+ Srv_.DelEventHandler(Evh_);
+ }
+
+ TImpl& Srv_;
+ TEvh& Evh_;
+ };
+
+ TAutoPtr<IPollerFace> P_;
+ TPollInterrupter I_;
+ TAtomic IsWaiting_ = 0;
+ TAtomic NeedCheckOpQueue_ = 0;
+ TAtomic OutstandingWork_ = 0;
+
+ NNeh::TAutoLockFreeQueue<TOperation> OpQueue_;
+
+ TEventHandlers Evh_; //i/o event handlers
+ TTimers Timers_; //timeout event handlers
+
+ size_t FdEventHandlersCnt_ = 0; //i/o event handlers counter
+ size_t TimersOpCnt_ = 0; //timers op counter
+ SOCKET MaxFd_ = 0; //max used descriptor num
+ TAtomic HasAbort_ = 0;
+ bool Aborted_ = false;
+
+ class TDeadlinesQueue {
+ public:
+ TDeadlinesQueue(TIOService::TImpl& srv)
+ : Srv_(srv)
+ {
+ }
+
+ inline void Register(TOperation* op) {
+ Deadlines_.Insert(op);
+ }
+
+ TInstant NextDeadline() {
+ TDeadlines::TIterator it = Deadlines_.Begin();
+
+ while (it != Deadlines_.End()) {
+ if (it->DeadLine() > TInstant::Now()) {
+ DBGOUT("TDeadlinesQueue::NewDeadline:" << (it->DeadLine().GetValue() - TInstant::Now().GetValue()));
+ return it->DeadLine();
+ }
+
+ TOperation* op = &*(it++);
+ Srv_.OnTimeoutOp(op);
+ }
+
+ return Deadlines_.Empty() ? TInstant::Max() : Deadlines_.Begin()->DeadLine();
+ }
+
+ private:
+ typedef TRbTree<TOperation, TOperationCompare> TDeadlines;
+ TDeadlines Deadlines_;
+ TIOService::TImpl& Srv_;
+ };
+
+ TDeadlinesQueue DeadlinesQueue_;
+ };
+}
diff --git a/library/cpp/neh/asio/poll_interrupter.cpp b/library/cpp/neh/asio/poll_interrupter.cpp
new file mode 100644
index 0000000000..c96d40c4f3
--- /dev/null
+++ b/library/cpp/neh/asio/poll_interrupter.cpp
@@ -0,0 +1 @@
+#include "poll_interrupter.h"
diff --git a/library/cpp/neh/asio/poll_interrupter.h b/library/cpp/neh/asio/poll_interrupter.h
new file mode 100644
index 0000000000..faf815c512
--- /dev/null
+++ b/library/cpp/neh/asio/poll_interrupter.h
@@ -0,0 +1,107 @@
+#pragma once
+
+#include <util/system/defaults.h>
+#include <util/generic/yexception.h>
+#include <util/network/socket.h>
+#include <util/system/pipe.h>
+
+#ifdef _linux_
+#include <sys/eventfd.h>
+#endif
+
+#if defined(_bionic_) && !defined(EFD_SEMAPHORE)
+#define EFD_SEMAPHORE 1
+#endif
+
+namespace NAsio {
+#ifdef _linux_
+ class TEventFdPollInterrupter {
+ public:
+ inline TEventFdPollInterrupter() {
+ F_ = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
+ if (F_ < 0) {
+ ythrow TFileError() << "failed to create a eventfd";
+ }
+ }
+
+ inline ~TEventFdPollInterrupter() {
+ close(F_);
+ }
+
+ inline void Interrupt() const noexcept {
+ const static eventfd_t ev(1);
+ ssize_t res = ::write(F_, &ev, sizeof ev);
+ Y_UNUSED(res);
+ }
+
+ inline bool Reset() const noexcept {
+ eventfd_t ev(0);
+
+ for (;;) {
+ ssize_t res = ::read(F_, &ev, sizeof ev);
+ if (res && res == EINTR) {
+ continue;
+ }
+
+ return res > 0;
+ }
+ }
+
+ int Fd() {
+ return F_;
+ }
+
+ private:
+ int F_;
+ };
+#endif
+
+ class TPipePollInterrupter {
+ public:
+ TPipePollInterrupter() {
+ TPipeHandle::Pipe(S_[0], S_[1]);
+
+ SetNonBlock(S_[0]);
+ SetNonBlock(S_[1]);
+ }
+
+ inline void Interrupt() const noexcept {
+ char byte = 0;
+ ssize_t res = S_[1].Write(&byte, 1);
+ Y_UNUSED(res);
+ }
+
+ inline bool Reset() const noexcept {
+ char buff[256];
+
+ for (;;) {
+ ssize_t r = S_[0].Read(buff, sizeof buff);
+
+ if (r < 0 && r == EINTR) {
+ continue;
+ }
+
+ bool wasInterrupted = r > 0;
+
+ while (r == sizeof buff) {
+ r = S_[0].Read(buff, sizeof buff);
+ }
+
+ return wasInterrupted;
+ }
+ }
+
+ PIPEHANDLE Fd() const noexcept {
+ return S_[0];
+ }
+
+ private:
+ TPipeHandle S_[2];
+ };
+
+#ifdef _linux_
+ typedef TEventFdPollInterrupter TPollInterrupter; //more effective than pipe, but only linux impl.
+#else
+ typedef TPipePollInterrupter TPollInterrupter;
+#endif
+}
diff --git a/library/cpp/neh/asio/tcp_acceptor_impl.cpp b/library/cpp/neh/asio/tcp_acceptor_impl.cpp
new file mode 100644
index 0000000000..7e1d75fcf5
--- /dev/null
+++ b/library/cpp/neh/asio/tcp_acceptor_impl.cpp
@@ -0,0 +1,25 @@
+#include "tcp_acceptor_impl.h"
+
+using namespace NAsio;
+
+bool TOperationAccept::Execute(int errorCode) {
+ if (errorCode) {
+ H_(errorCode, *this);
+
+ return true;
+ }
+
+ struct sockaddr_storage addr;
+ socklen_t sz = sizeof(addr);
+
+ SOCKET res = ::accept(Fd(), (sockaddr*)&addr, &sz);
+
+ if (res == INVALID_SOCKET) {
+ H_(LastSystemError(), *this);
+ } else {
+ NS_.Assign(res, TEndpoint(new NAddr::TOpaqueAddr((sockaddr*)&addr)));
+ H_(0, *this);
+ }
+
+ return true;
+}
diff --git a/library/cpp/neh/asio/tcp_acceptor_impl.h b/library/cpp/neh/asio/tcp_acceptor_impl.h
new file mode 100644
index 0000000000..c990236efc
--- /dev/null
+++ b/library/cpp/neh/asio/tcp_acceptor_impl.h
@@ -0,0 +1,76 @@
+#pragma once
+
+#include "asio.h"
+
+#include "tcp_socket_impl.h"
+
+namespace NAsio {
+ class TOperationAccept: public TFdOperation {
+ public:
+ TOperationAccept(SOCKET fd, TTcpSocket::TImpl& newSocket, TTcpAcceptor::TAcceptHandler h, TInstant deadline)
+ : TFdOperation(fd, PollRead, deadline)
+ , H_(h)
+ , NS_(newSocket)
+ {
+ }
+
+ bool Execute(int errorCode) override;
+
+ TTcpAcceptor::TAcceptHandler H_;
+ TTcpSocket::TImpl& NS_;
+ };
+
+ class TTcpAcceptor::TImpl: public TThrRefBase {
+ public:
+ TImpl(TIOService::TImpl& srv) noexcept
+ : Srv_(srv)
+ {
+ }
+
+ inline void Bind(TEndpoint& ep, TErrorCode& ec) noexcept {
+ TSocketHolder s(socket(ep.SockAddr()->sa_family, SOCK_STREAM, 0));
+
+ if (s == INVALID_SOCKET) {
+ ec.Assign(LastSystemError());
+ }
+
+ FixIPv6ListenSocket(s);
+ CheckedSetSockOpt(s, SOL_SOCKET, SO_REUSEADDR, 1, "reuse addr");
+ SetNonBlock(s);
+
+ if (::bind(s, ep.SockAddr(), ep.SockAddrLen())) {
+ ec.Assign(LastSystemError());
+ return;
+ }
+
+ S_.Swap(s);
+ }
+
+ inline void Listen(int backlog, TErrorCode& ec) noexcept {
+ if (::listen(S_, backlog)) {
+ ec.Assign(LastSystemError());
+ return;
+ }
+ }
+
+ inline void AsyncAccept(TTcpSocket& s, TTcpAcceptor::TAcceptHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationAccept((SOCKET)S_, s.GetImpl(), h, deadline)); //set callback
+ }
+
+ inline void AsyncCancel() {
+ Srv_.ScheduleOp(new TOperationCancel<TTcpAcceptor::TImpl>(this));
+ }
+
+ inline TIOService::TImpl& GetIOServiceImpl() const noexcept {
+ return Srv_;
+ }
+
+ inline SOCKET Fd() const noexcept {
+ return S_;
+ }
+
+ private:
+ TIOService::TImpl& Srv_;
+ TSocketHolder S_;
+ };
+}
diff --git a/library/cpp/neh/asio/tcp_socket_impl.cpp b/library/cpp/neh/asio/tcp_socket_impl.cpp
new file mode 100644
index 0000000000..98cef97561
--- /dev/null
+++ b/library/cpp/neh/asio/tcp_socket_impl.cpp
@@ -0,0 +1,117 @@
+#include "tcp_socket_impl.h"
+
+using namespace NAsio;
+
+TSocketOperation::TSocketOperation(TTcpSocket::TImpl& s, TPollType pt, TInstant deadline)
+ : TFdOperation(s.Fd(), pt, deadline)
+ , S_(s)
+{
+}
+
+bool TOperationWrite::Execute(int errorCode) {
+ if (errorCode) {
+ H_(errorCode, Written_, *this);
+
+ return true; //op. completed
+ }
+
+ TErrorCode ec;
+ TContIOVector& iov = *Buffs_->GetIOvec();
+
+ size_t n = S_.WriteSome(iov, ec);
+
+ if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) {
+ H_(ec, Written_ + n, *this);
+
+ return true;
+ }
+
+ if (n) {
+ Written_ += n;
+ iov.Proceed(n);
+ if (!iov.Bytes()) {
+ H_(ec, Written_, *this);
+
+ return true; //op. completed
+ }
+ }
+
+ return false; //operation not compleled
+}
+
+bool TOperationWriteVector::Execute(int errorCode) {
+ if (errorCode) {
+ H_(errorCode, Written_, *this);
+
+ return true; //op. completed
+ }
+
+ TErrorCode ec;
+
+ size_t n = S_.WriteSome(V_, ec);
+
+ if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) {
+ H_(ec, Written_ + n, *this);
+
+ return true;
+ }
+
+ if (n) {
+ Written_ += n;
+ V_.Proceed(n);
+ if (!V_.Bytes()) {
+ H_(ec, Written_, *this);
+
+ return true; //op. completed
+ }
+ }
+
+ return false; //operation not compleled
+}
+
+bool TOperationReadSome::Execute(int errorCode) {
+ if (errorCode) {
+ H_(errorCode, 0, *this);
+
+ return true; //op. completed
+ }
+
+ TErrorCode ec;
+
+ H_(ec, S_.ReadSome(Buff_, Size_, ec), *this);
+
+ return true;
+}
+
+bool TOperationRead::Execute(int errorCode) {
+ if (errorCode) {
+ H_(errorCode, Read_, *this);
+
+ return true; //op. completed
+ }
+
+ TErrorCode ec;
+ size_t n = S_.ReadSome(Buff_, Size_, ec);
+ Read_ += n;
+
+ if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) {
+ H_(ec, Read_, *this);
+
+ return true; //op. completed
+ }
+
+ if (n) {
+ Size_ -= n;
+ if (!Size_) {
+ H_(ec, Read_, *this);
+
+ return true;
+ }
+ Buff_ += n;
+ } else if (!ec) { // EOF while read not all
+ H_(ec, Read_, *this);
+ return true;
+ }
+
+ return false;
+}
diff --git a/library/cpp/neh/asio/tcp_socket_impl.h b/library/cpp/neh/asio/tcp_socket_impl.h
new file mode 100644
index 0000000000..44f8f42d87
--- /dev/null
+++ b/library/cpp/neh/asio/tcp_socket_impl.h
@@ -0,0 +1,332 @@
+#pragma once
+
+#include "asio.h"
+#include "io_service_impl.h"
+
+#include <sys/uio.h>
+
+#if defined(_bionic_)
+# define IOV_MAX 1024
+#endif
+
+namespace NAsio {
+ // ownership/keep-alive references:
+ // Handlers <- TOperation...(TFdOperation) <- TPollFdEventHandler <- TIOService
+
+ class TSocketOperation: public TFdOperation {
+ public:
+ TSocketOperation(TTcpSocket::TImpl& s, TPollType pt, TInstant deadline);
+
+ protected:
+ TTcpSocket::TImpl& S_;
+ };
+
+ class TOperationConnect: public TSocketOperation {
+ public:
+ TOperationConnect(TTcpSocket::TImpl& s, TTcpSocket::TConnectHandler h, TInstant deadline)
+ : TSocketOperation(s, PollWrite, deadline)
+ , H_(h)
+ {
+ }
+
+ bool Execute(int errorCode) override {
+ H_(errorCode, *this);
+
+ return true;
+ }
+
+ TTcpSocket::TConnectHandler H_;
+ };
+
+ class TOperationConnectFailed: public TSocketOperation {
+ public:
+ TOperationConnectFailed(TTcpSocket::TImpl& s, TTcpSocket::TConnectHandler h, int errorCode, TInstant deadline)
+ : TSocketOperation(s, PollWrite, deadline)
+ , H_(h)
+ , ErrorCode_(errorCode)
+ {
+ Speculative_ = true;
+ }
+
+ bool Execute(int errorCode) override {
+ Y_UNUSED(errorCode);
+ H_(ErrorCode_, *this);
+
+ return true;
+ }
+
+ TTcpSocket::TConnectHandler H_;
+ int ErrorCode_;
+ };
+
+ class TOperationWrite: public TSocketOperation {
+ public:
+ TOperationWrite(TTcpSocket::TImpl& s, NAsio::TTcpSocket::TSendedData& buffs, TTcpSocket::TWriteHandler h, TInstant deadline)
+ : TSocketOperation(s, PollWrite, deadline)
+ , H_(h)
+ , Buffs_(buffs)
+ , Written_(0)
+ {
+ Speculative_ = true;
+ }
+
+ //return true, if not need write more data
+ bool Execute(int errorCode) override;
+
+ private:
+ TTcpSocket::TWriteHandler H_;
+ NAsio::TTcpSocket::TSendedData Buffs_;
+ size_t Written_;
+ };
+
+ class TOperationWriteVector: public TSocketOperation {
+ public:
+ TOperationWriteVector(TTcpSocket::TImpl& s, TContIOVector* v, TTcpSocket::TWriteHandler h, TInstant deadline)
+ : TSocketOperation(s, PollWrite, deadline)
+ , H_(h)
+ , V_(*v)
+ , Written_(0)
+ {
+ Speculative_ = true;
+ }
+
+ //return true, if not need write more data
+ bool Execute(int errorCode) override;
+
+ private:
+ TTcpSocket::TWriteHandler H_;
+ TContIOVector& V_;
+ size_t Written_;
+ };
+
+ class TOperationReadSome: public TSocketOperation {
+ public:
+ TOperationReadSome(TTcpSocket::TImpl& s, void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline)
+ : TSocketOperation(s, PollRead, deadline)
+ , H_(h)
+ , Buff_(static_cast<char*>(buff))
+ , Size_(size)
+ {
+ }
+
+ //return true, if not need read more data
+ bool Execute(int errorCode) override;
+
+ protected:
+ TTcpSocket::TReadHandler H_;
+ char* Buff_;
+ size_t Size_;
+ };
+
+ class TOperationRead: public TOperationReadSome {
+ public:
+ TOperationRead(TTcpSocket::TImpl& s, void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline)
+ : TOperationReadSome(s, buff, size, h, deadline)
+ , Read_(0)
+ {
+ }
+
+ bool Execute(int errorCode) override;
+
+ private:
+ size_t Read_;
+ };
+
+ class TOperationPoll: public TSocketOperation {
+ public:
+ TOperationPoll(TTcpSocket::TImpl& s, TPollType pt, TTcpSocket::TPollHandler h, TInstant deadline)
+ : TSocketOperation(s, pt, deadline)
+ , H_(h)
+ {
+ }
+
+ bool Execute(int errorCode) override {
+ H_(errorCode, *this);
+
+ return true;
+ }
+
+ private:
+ TTcpSocket::TPollHandler H_;
+ };
+
+ template <class T>
+ class TOperationCancel: public TNoneOperation {
+ public:
+ TOperationCancel(T* s)
+ : TNoneOperation()
+ , S_(s)
+ {
+ Speculative_ = true;
+ }
+
+ ~TOperationCancel() override {
+ }
+
+ private:
+ bool Execute(int errorCode) override {
+ Y_UNUSED(errorCode);
+ if (!errorCode && S_->Fd() != INVALID_SOCKET) {
+ S_->GetIOServiceImpl().CancelFdOp(S_->Fd());
+ }
+ return true;
+ }
+
+ TIntrusivePtr<T> S_;
+ };
+
+ class TTcpSocket::TImpl: public TNonCopyable, public TThrRefBase {
+ public:
+ typedef TTcpSocket::TSendedData TSendedData;
+
+ TImpl(TIOService::TImpl& srv) noexcept
+ : Srv_(srv)
+ {
+ }
+
+ ~TImpl() override {
+ DBGOUT("TSocket::~TImpl()");
+ }
+
+ void Assign(SOCKET fd, TEndpoint ep) {
+ TSocketHolder(fd).Swap(S_);
+ RemoteEndpoint_ = ep;
+ }
+
+ void AsyncConnect(const TEndpoint& ep, TTcpSocket::TConnectHandler h, TInstant deadline) {
+ TSocketHolder s(socket(ep.SockAddr()->sa_family, SOCK_STREAM, 0));
+
+ if (Y_UNLIKELY(s == INVALID_SOCKET || Srv_.HasAbort())) {
+ throw TSystemError() << TStringBuf("can't create socket");
+ }
+
+ SetNonBlock(s);
+
+ int err;
+ do {
+ err = connect(s, ep.SockAddr(), (int)ep.SockAddrLen());
+ if (Y_LIKELY(err)) {
+ err = LastSystemError();
+ }
+#if defined(_freebsd_)
+ if (Y_UNLIKELY(err == EINTR)) {
+ err = EINPROGRESS;
+ }
+ } while (0);
+#elif defined(_linux_)
+ } while (Y_UNLIKELY(err == EINTR));
+#else
+ } while (0);
+#endif
+
+ RemoteEndpoint_ = ep;
+ S_.Swap(s);
+
+ DBGOUT("AsyncConnect(): " << err);
+ if (Y_LIKELY(err == EINPROGRESS || err == EWOULDBLOCK || err == 0)) {
+ Srv_.ScheduleOp(new TOperationConnect(*this, h, deadline)); //set callback
+ } else {
+ Srv_.ScheduleOp(new TOperationConnectFailed(*this, h, err, deadline)); //set callback
+ }
+ }
+
+ inline void AsyncWrite(TSendedData& d, TTcpSocket::TWriteHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationWrite(*this, d, h, deadline));
+ }
+
+ inline void AsyncWrite(TContIOVector* v, TTcpSocket::TWriteHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationWriteVector(*this, v, h, deadline));
+ }
+
+ inline void AsyncRead(void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationRead(*this, buff, size, h, deadline));
+ }
+
+ inline void AsyncReadSome(void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationReadSome(*this, buff, size, h, deadline));
+ }
+
+ inline void AsyncPollWrite(TTcpSocket::TPollHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationPoll(*this, TOperationPoll::PollWrite, h, deadline));
+ }
+
+ inline void AsyncPollRead(TTcpSocket::TPollHandler h, TInstant deadline) {
+ Srv_.ScheduleOp(new TOperationPoll(*this, TOperationPoll::PollRead, h, deadline));
+ }
+
+ inline void AsyncCancel() {
+ if (Y_UNLIKELY(Srv_.HasAbort())) {
+ return;
+ }
+ Srv_.ScheduleOp(new TOperationCancel<TTcpSocket::TImpl>(this));
+ }
+
+ inline bool SysCallHasResult(ssize_t& n, TErrorCode& ec) noexcept {
+ if (n >= 0) {
+ return true;
+ }
+
+ int errn = LastSystemError();
+ if (errn == EINTR) {
+ return false;
+ }
+
+ ec.Assign(errn);
+ n = 0;
+ return true;
+ }
+
+ size_t WriteSome(TContIOVector& iov, TErrorCode& ec) noexcept {
+ for (;;) {
+ ssize_t n = writev(S_, (const iovec*)iov.Parts(), Min(IOV_MAX, (int)iov.Count()));
+ DBGOUT("WriteSome(): n=" << n);
+ if (SysCallHasResult(n, ec)) {
+ return n;
+ }
+ }
+ }
+
+ size_t WriteSome(const void* buff, size_t size, TErrorCode& ec) noexcept {
+ for (;;) {
+ ssize_t n = send(S_, (char*)buff, size, 0);
+ DBGOUT("WriteSome(): n=" << n);
+ if (SysCallHasResult(n, ec)) {
+ return n;
+ }
+ }
+ }
+
+ size_t ReadSome(void* buff, size_t size, TErrorCode& ec) noexcept {
+ for (;;) {
+ ssize_t n = recv(S_, (char*)buff, size, 0);
+ DBGOUT("ReadSome(): n=" << n);
+ if (SysCallHasResult(n, ec)) {
+ return n;
+ }
+ }
+ }
+
+ inline void Shutdown(TTcpSocket::TShutdownMode mode, TErrorCode& ec) {
+ if (shutdown(S_, mode)) {
+ ec.Assign(LastSystemError());
+ }
+ }
+
+ TIOService::TImpl& GetIOServiceImpl() const noexcept {
+ return Srv_;
+ }
+
+ inline SOCKET Fd() const noexcept {
+ return S_;
+ }
+
+ TEndpoint RemoteEndpoint() const {
+ return RemoteEndpoint_;
+ }
+
+ private:
+ TIOService::TImpl& Srv_;
+ TSocketHolder S_;
+ TEndpoint RemoteEndpoint_;
+ };
+}