diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/asio | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/neh/asio')
-rw-r--r-- | library/cpp/neh/asio/asio.cpp | 187 | ||||
-rw-r--r-- | library/cpp/neh/asio/asio.h | 280 | ||||
-rw-r--r-- | library/cpp/neh/asio/deadline_timer_impl.cpp | 1 | ||||
-rw-r--r-- | library/cpp/neh/asio/deadline_timer_impl.h | 110 | ||||
-rw-r--r-- | library/cpp/neh/asio/executor.cpp | 1 | ||||
-rw-r--r-- | library/cpp/neh/asio/executor.h | 76 | ||||
-rw-r--r-- | library/cpp/neh/asio/io_service_impl.cpp | 161 | ||||
-rw-r--r-- | library/cpp/neh/asio/io_service_impl.h | 744 | ||||
-rw-r--r-- | library/cpp/neh/asio/poll_interrupter.cpp | 1 | ||||
-rw-r--r-- | library/cpp/neh/asio/poll_interrupter.h | 107 | ||||
-rw-r--r-- | library/cpp/neh/asio/tcp_acceptor_impl.cpp | 25 | ||||
-rw-r--r-- | library/cpp/neh/asio/tcp_acceptor_impl.h | 76 | ||||
-rw-r--r-- | library/cpp/neh/asio/tcp_socket_impl.cpp | 117 | ||||
-rw-r--r-- | library/cpp/neh/asio/tcp_socket_impl.h | 332 |
14 files changed, 2218 insertions, 0 deletions
diff --git a/library/cpp/neh/asio/asio.cpp b/library/cpp/neh/asio/asio.cpp new file mode 100644 index 0000000000..8b6cf383ea --- /dev/null +++ b/library/cpp/neh/asio/asio.cpp @@ -0,0 +1,187 @@ +#include "io_service_impl.h" +#include "deadline_timer_impl.h" +#include "tcp_socket_impl.h" +#include "tcp_acceptor_impl.h" + +using namespace NDns; +using namespace NAsio; + +namespace NAsio { + TIOService::TWork::TWork(TWork& w) + : Srv_(w.Srv_) + { + Srv_.GetImpl().WorkStarted(); + } + + TIOService::TWork::TWork(TIOService& srv) + : Srv_(srv) + { + Srv_.GetImpl().WorkStarted(); + } + + TIOService::TWork::~TWork() { + Srv_.GetImpl().WorkFinished(); + } + + TIOService::TIOService() + : Impl_(new TImpl()) + { + } + + TIOService::~TIOService() { + } + + void TIOService::Run() { + Impl_->Run(); + } + + void TIOService::Post(TCompletionHandler h) { + Impl_->Post(std::move(h)); + } + + void TIOService::Abort() { + Impl_->Abort(); + } + + TDeadlineTimer::TDeadlineTimer(TIOService& srv) noexcept + : Srv_(srv) + , Impl_(nullptr) + { + } + + TDeadlineTimer::~TDeadlineTimer() { + if (Impl_) { + Srv_.GetImpl().ScheduleOp(new TUnregisterTimerOperation(Impl_)); + } + } + + void TDeadlineTimer::AsyncWaitExpireAt(TDeadline deadline, THandler h) { + if (!Impl_) { + Impl_ = new TDeadlineTimer::TImpl(Srv_.GetImpl()); + Srv_.GetImpl().ScheduleOp(new TRegisterTimerOperation(Impl_)); + } + Impl_->AsyncWaitExpireAt(deadline, h); + } + + void TDeadlineTimer::Cancel() { + Impl_->Cancel(); + } + + TTcpSocket::TTcpSocket(TIOService& srv) noexcept + : Srv_(srv) + , Impl_(new TImpl(srv.GetImpl())) + { + } + + TTcpSocket::~TTcpSocket() { + } + + void TTcpSocket::AsyncConnect(const TEndpoint& ep, TTcpSocket::TConnectHandler h, TDeadline deadline) { + Impl_->AsyncConnect(ep, h, deadline); + } + + void TTcpSocket::AsyncWrite(TSendedData& d, TTcpSocket::TWriteHandler h, TDeadline deadline) { + Impl_->AsyncWrite(d, h, deadline); + } + + void TTcpSocket::AsyncWrite(TContIOVector* vec, TWriteHandler h, TDeadline deadline) { + Impl_->AsyncWrite(vec, h, deadline); + } + + void TTcpSocket::AsyncWrite(const void* data, size_t size, TWriteHandler h, TDeadline deadline) { + class TBuffers: public IBuffers { + public: + TBuffers(const void* theData, size_t theSize) + : Part(theData, theSize) + , IOVec(&Part, 1) + { + } + + TContIOVector* GetIOvec() override { + return &IOVec; + } + + IOutputStream::TPart Part; + TContIOVector IOVec; + }; + + TSendedData d(new TBuffers(data, size)); + Impl_->AsyncWrite(d, h, deadline); + } + + void TTcpSocket::AsyncRead(void* buff, size_t size, TTcpSocket::TReadHandler h, TDeadline deadline) { + Impl_->AsyncRead(buff, size, h, deadline); + } + + void TTcpSocket::AsyncReadSome(void* buff, size_t size, TTcpSocket::TReadHandler h, TDeadline deadline) { + Impl_->AsyncReadSome(buff, size, h, deadline); + } + + void TTcpSocket::AsyncPollRead(TTcpSocket::TPollHandler h, TDeadline deadline) { + Impl_->AsyncPollRead(h, deadline); + } + + void TTcpSocket::AsyncPollWrite(TTcpSocket::TPollHandler h, TDeadline deadline) { + Impl_->AsyncPollWrite(h, deadline); + } + + void TTcpSocket::AsyncCancel() { + return Impl_->AsyncCancel(); + } + + size_t TTcpSocket::WriteSome(TContIOVector& d, TErrorCode& ec) noexcept { + return Impl_->WriteSome(d, ec); + } + + size_t TTcpSocket::WriteSome(const void* buff, size_t size, TErrorCode& ec) noexcept { + return Impl_->WriteSome(buff, size, ec); + } + + size_t TTcpSocket::ReadSome(void* buff, size_t size, TErrorCode& ec) noexcept { + return Impl_->ReadSome(buff, size, ec); + } + + bool TTcpSocket::IsOpen() const noexcept { + return Native() != INVALID_SOCKET; + } + + void TTcpSocket::Shutdown(TShutdownMode what, TErrorCode& ec) { + return Impl_->Shutdown(what, ec); + } + + SOCKET TTcpSocket::Native() const noexcept { + return Impl_->Fd(); + } + + TEndpoint TTcpSocket::RemoteEndpoint() const { + return Impl_->RemoteEndpoint(); + } + + ////////////////////////////////// + + TTcpAcceptor::TTcpAcceptor(TIOService& srv) noexcept + : Srv_(srv) + , Impl_(new TImpl(srv.GetImpl())) + { + } + + TTcpAcceptor::~TTcpAcceptor() { + } + + void TTcpAcceptor::Bind(TEndpoint& ep, TErrorCode& ec) noexcept { + return Impl_->Bind(ep, ec); + } + + void TTcpAcceptor::Listen(int backlog, TErrorCode& ec) noexcept { + return Impl_->Listen(backlog, ec); + } + + void TTcpAcceptor::AsyncAccept(TTcpSocket& s, TTcpAcceptor::TAcceptHandler h, TDeadline deadline) { + return Impl_->AsyncAccept(s, h, deadline); + } + + void TTcpAcceptor::AsyncCancel() { + Impl_->AsyncCancel(); + } + +} diff --git a/library/cpp/neh/asio/asio.h b/library/cpp/neh/asio/asio.h new file mode 100644 index 0000000000..a902d663cf --- /dev/null +++ b/library/cpp/neh/asio/asio.h @@ -0,0 +1,280 @@ +#pragma once + +// +//primary header for work with asio +// + +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/network/socket.h> +#include <util/network/endpoint.h> +#include <util/system/error.h> +#include <util/stream/output.h> +#include <functional> + +#include <library/cpp/dns/cache.h> + +//#define DEBUG_ASIO + +class TContIOVector; + +namespace NAsio { + class TErrorCode { + public: + inline TErrorCode(int val = 0) noexcept + : Val_(val) + { + } + + typedef void (*TUnspecifiedBoolType)(); + + static void UnspecifiedBoolTrue() { + } + + //safe cast to bool value + operator TUnspecifiedBoolType() const noexcept { // true if error + return Val_ == 0 ? nullptr : UnspecifiedBoolTrue; + } + + bool operator!() const noexcept { + return Val_ == 0; + } + + void Assign(int val) noexcept { + Val_ = val; + } + + int Value() const noexcept { + return Val_; + } + + TString Text() const { + if (!Val_) { + return TString(); + } + return LastSystemErrorText(Val_); + } + + void Check() { + if (Val_) { + throw TSystemError(Val_); + } + } + + private: + int Val_; + }; + + //wrapper for TInstant, for enabling use TDuration (+TInstant::Now()) as deadline + class TDeadline: public TInstant { + public: + TDeadline() + : TInstant(TInstant::Max()) + { + } + + TDeadline(const TInstant& t) + : TInstant(t) + { + } + + TDeadline(const TDuration& d) + : TInstant(TInstant::Now() + d) + { + } + }; + + class IHandlingContext { + public: + virtual ~IHandlingContext() { + } + + //if handler throw exception, call this function be ignored + virtual void ContinueUseHandler(TDeadline deadline = TDeadline()) = 0; + }; + + typedef std::function<void()> TCompletionHandler; + + class TIOService: public TNonCopyable { + public: + TIOService(); + ~TIOService(); + + void Run(); + void Post(TCompletionHandler); //call handler in Run() thread-executor + void Abort(); //in Run() all exist async i/o operations + timers receive error = ECANCELED, Run() exited + + //counterpart boost::asio::io_service::work + class TWork { + public: + TWork(TWork&); + TWork(TIOService&); + ~TWork(); + + private: + void operator=(const TWork&); //disable + + TIOService& Srv_; + }; + + class TImpl; + + TImpl& GetImpl() noexcept { + return *Impl_; + } + + private: + THolder<TImpl> Impl_; + }; + + class TDeadlineTimer: public TNonCopyable { + public: + typedef std::function<void(const TErrorCode& err, IHandlingContext&)> THandler; + + TDeadlineTimer(TIOService&) noexcept; + ~TDeadlineTimer(); + + void AsyncWaitExpireAt(TDeadline, THandler); + void Cancel(); + + TIOService& GetIOService() const noexcept { + return Srv_; + } + + class TImpl; + + private: + TIOService& Srv_; + TImpl* Impl_; + }; + + class TTcpSocket: public TNonCopyable { + public: + class IBuffers { + public: + virtual ~IBuffers() { + } + virtual TContIOVector* GetIOvec() = 0; + }; + typedef TAutoPtr<IBuffers> TSendedData; + + typedef std::function<void(const TErrorCode& err, IHandlingContext&)> THandler; + typedef THandler TConnectHandler; + typedef std::function<void(const TErrorCode& err, size_t amount, IHandlingContext&)> TWriteHandler; + typedef std::function<void(const TErrorCode& err, size_t amount, IHandlingContext&)> TReadHandler; + typedef THandler TPollHandler; + + enum TShutdownMode { + ShutdownReceive = SHUT_RD, + ShutdownSend = SHUT_WR, + ShutdownBoth = SHUT_RDWR + }; + + TTcpSocket(TIOService&) noexcept; + ~TTcpSocket(); + + void AsyncConnect(const TEndpoint& ep, TConnectHandler, TDeadline deadline = TDeadline()); + void AsyncWrite(TSendedData&, TWriteHandler, TDeadline deadline = TDeadline()); + void AsyncWrite(TContIOVector* buff, TWriteHandler, TDeadline deadline = TDeadline()); + void AsyncWrite(const void* buff, size_t size, TWriteHandler, TDeadline deadline = TDeadline()); + void AsyncRead(void* buff, size_t size, TReadHandler, TDeadline deadline = TDeadline()); + void AsyncReadSome(void* buff, size_t size, TReadHandler, TDeadline deadline = TDeadline()); + void AsyncPollWrite(TPollHandler, TDeadline deadline = TDeadline()); + void AsyncPollRead(TPollHandler, TDeadline deadline = TDeadline()); + void AsyncCancel(); + + //sync, but non blocked methods + size_t WriteSome(TContIOVector&, TErrorCode&) noexcept; + size_t WriteSome(const void* buff, size_t size, TErrorCode&) noexcept; + size_t ReadSome(void* buff, size_t size, TErrorCode&) noexcept; + + bool IsOpen() const noexcept; + void Shutdown(TShutdownMode mode, TErrorCode& ec); + + TIOService& GetIOService() const noexcept { + return Srv_; + } + + SOCKET Native() const noexcept; + + TEndpoint RemoteEndpoint() const; + + inline size_t WriteSome(TContIOVector& v) { + TErrorCode ec; + size_t n = WriteSome(v, ec); + ec.Check(); + return n; + } + + inline size_t WriteSome(const void* buff, size_t size) { + TErrorCode ec; + size_t n = WriteSome(buff, size, ec); + ec.Check(); + return n; + } + + inline size_t ReadSome(void* buff, size_t size) { + TErrorCode ec; + size_t n = ReadSome(buff, size, ec); + ec.Check(); + return n; + } + + void Shutdown(TShutdownMode mode) { + TErrorCode ec; + Shutdown(mode, ec); + ec.Check(); + } + + class TImpl; + + TImpl& GetImpl() const noexcept { + return *Impl_; + } + + private: + TIOService& Srv_; + TIntrusivePtr<TImpl> Impl_; + }; + + class TTcpAcceptor: public TNonCopyable { + public: + typedef std::function<void(const TErrorCode& err, IHandlingContext&)> TAcceptHandler; + + TTcpAcceptor(TIOService&) noexcept; + ~TTcpAcceptor(); + + void Bind(TEndpoint&, TErrorCode&) noexcept; + void Listen(int backlog, TErrorCode&) noexcept; + + void AsyncAccept(TTcpSocket&, TAcceptHandler, TDeadline deadline = TDeadline()); + + void AsyncCancel(); + + inline void Bind(TEndpoint& ep) { + TErrorCode ec; + Bind(ep, ec); + ec.Check(); + } + inline void Listen(int backlog) { + TErrorCode ec; + Listen(backlog, ec); + ec.Check(); + } + + TIOService& GetIOService() const noexcept { + return Srv_; + } + + class TImpl; + + TImpl& GetImpl() const noexcept { + return *Impl_; + } + + private: + TIOService& Srv_; + TIntrusivePtr<TImpl> Impl_; + }; +} diff --git a/library/cpp/neh/asio/deadline_timer_impl.cpp b/library/cpp/neh/asio/deadline_timer_impl.cpp new file mode 100644 index 0000000000..399a4338fb --- /dev/null +++ b/library/cpp/neh/asio/deadline_timer_impl.cpp @@ -0,0 +1 @@ +#include "deadline_timer_impl.h" diff --git a/library/cpp/neh/asio/deadline_timer_impl.h b/library/cpp/neh/asio/deadline_timer_impl.h new file mode 100644 index 0000000000..d9db625c94 --- /dev/null +++ b/library/cpp/neh/asio/deadline_timer_impl.h @@ -0,0 +1,110 @@ +#pragma once + +#include "io_service_impl.h" + +namespace NAsio { + class TTimerOperation: public TOperation { + public: + TTimerOperation(TIOService::TImpl::TTimer* t, TInstant deadline) + : TOperation(deadline) + , T_(t) + { + } + + void AddOp(TIOService::TImpl&) override { + Y_ASSERT(0); + } + + void Finalize() override { + DBGOUT("TTimerDeadlineOperation::Finalize()"); + T_->DelOp(this); + } + + protected: + TIOService::TImpl::TTimer* T_; + }; + + class TRegisterTimerOperation: public TTimerOperation { + public: + TRegisterTimerOperation(TIOService::TImpl::TTimer* t, TInstant deadline = TInstant::Max()) + : TTimerOperation(t, deadline) + { + Speculative_ = true; + } + + bool Execute(int errorCode) override { + Y_UNUSED(errorCode); + T_->GetIOServiceImpl().SyncRegisterTimer(T_); + return true; + } + }; + + class TTimerDeadlineOperation: public TTimerOperation { + public: + TTimerDeadlineOperation(TIOService::TImpl::TTimer* t, TDeadlineTimer::THandler h, TInstant deadline) + : TTimerOperation(t, deadline) + , H_(h) + { + } + + void AddOp(TIOService::TImpl&) override { + T_->AddOp(this); + } + + bool Execute(int errorCode) override { + DBGOUT("TTimerDeadlineOperation::Execute(" << errorCode << ")"); + H_(errorCode == ETIMEDOUT ? 0 : errorCode, *this); + return true; + } + + private: + TDeadlineTimer::THandler H_; + }; + + class TCancelTimerOperation: public TTimerOperation { + public: + TCancelTimerOperation(TIOService::TImpl::TTimer* t) + : TTimerOperation(t, TInstant::Max()) + { + Speculative_ = true; + } + + bool Execute(int errorCode) override { + Y_UNUSED(errorCode); + T_->FailOperations(ECANCELED); + return true; + } + }; + + class TUnregisterTimerOperation: public TTimerOperation { + public: + TUnregisterTimerOperation(TIOService::TImpl::TTimer* t, TInstant deadline = TInstant::Max()) + : TTimerOperation(t, deadline) + { + Speculative_ = true; + } + + bool Execute(int errorCode) override { + Y_UNUSED(errorCode); + DBGOUT("TUnregisterTimerOperation::Execute(" << errorCode << ")"); + T_->GetIOServiceImpl().SyncUnregisterAndDestroyTimer(T_); + return true; + } + }; + + class TDeadlineTimer::TImpl: public TIOService::TImpl::TTimer { + public: + TImpl(TIOService::TImpl& srv) + : TIOService::TImpl::TTimer(srv) + { + } + + void AsyncWaitExpireAt(TDeadline d, TDeadlineTimer::THandler h) { + Srv_.ScheduleOp(new TTimerDeadlineOperation(this, h, d)); + } + + void Cancel() { + Srv_.ScheduleOp(new TCancelTimerOperation(this)); + } + }; +} diff --git a/library/cpp/neh/asio/executor.cpp b/library/cpp/neh/asio/executor.cpp new file mode 100644 index 0000000000..03b26bf847 --- /dev/null +++ b/library/cpp/neh/asio/executor.cpp @@ -0,0 +1 @@ +#include "executor.h" diff --git a/library/cpp/neh/asio/executor.h b/library/cpp/neh/asio/executor.h new file mode 100644 index 0000000000..4f6549044d --- /dev/null +++ b/library/cpp/neh/asio/executor.h @@ -0,0 +1,76 @@ +#pragma once + +#include "asio.h" + +#include <library/cpp/deprecated/atomic/atomic.h> + +#include <util/thread/factory.h> +#include <util/system/thread.h> + +namespace NAsio { + class TIOServiceExecutor: public IThreadFactory::IThreadAble { + public: + TIOServiceExecutor() + : Work_(new TIOService::TWork(Srv_)) + { + T_ = SystemThreadFactory()->Run(this); + } + + ~TIOServiceExecutor() override { + SyncShutdown(); + } + + void DoExecute() override { + TThread::SetCurrentThreadName("NehAsioExecutor"); + Srv_.Run(); + } + + inline TIOService& GetIOService() noexcept { + return Srv_; + } + + void SyncShutdown() { + if (Work_) { + Work_.Destroy(); + Srv_.Abort(); //cancel all async operations, break Run() execution + T_->Join(); + } + } + + private: + TIOService Srv_; + TAutoPtr<TIOService::TWork> Work_; + typedef TAutoPtr<IThreadFactory::IThread> IThreadRef; + IThreadRef T_; + }; + + class TExecutorsPool { + public: + TExecutorsPool(size_t executors) + : C_(0) + { + for (size_t i = 0; i < executors; ++i) { + E_.push_back(new TIOServiceExecutor()); + } + } + + inline size_t Size() const noexcept { + return E_.size(); + } + + inline TIOServiceExecutor& GetExecutor() noexcept { + TAtomicBase next = AtomicIncrement(C_); + return *E_[next % E_.size()]; + } + + void SyncShutdown() { + for (size_t i = 0; i < E_.size(); ++i) { + E_[i]->SyncShutdown(); + } + } + + private: + TAtomic C_; + TVector<TAutoPtr<TIOServiceExecutor>> E_; + }; +} diff --git a/library/cpp/neh/asio/io_service_impl.cpp b/library/cpp/neh/asio/io_service_impl.cpp new file mode 100644 index 0000000000..d49b3fb03e --- /dev/null +++ b/library/cpp/neh/asio/io_service_impl.cpp @@ -0,0 +1,161 @@ +#include "io_service_impl.h" + +#include <library/cpp/coroutine/engine/poller.h> + +using namespace NAsio; + +void TFdOperation::AddOp(TIOService::TImpl& srv) { + srv.AddOp(this); +} + +void TFdOperation::Finalize() { + (*PH_)->DelOp(this); +} + +void TPollFdEventHandler::ExecuteOperations(TFdOperations& oprs, int errorCode) { + TFdOperations::iterator it = oprs.begin(); + + try { + while (it != oprs.end()) { + TFdOperation* op = it->Get(); + + if (op->Execute(errorCode)) { // throw ? + if (op->IsRequiredRepeat()) { + Srv_.UpdateOpDeadline(op); + ++it; //operation completed, but want be repeated + } else { + FinishedOperations_.push_back(*it); + it = oprs.erase(it); + } + } else { + ++it; //operation not completed + } + } + } catch (...) { + if (it != oprs.end()) { + FinishedOperations_.push_back(*it); + oprs.erase(it); + } + throw; + } +} + +void TPollFdEventHandler::DelOp(TFdOperation* op) { + TAutoPtr<TPollFdEventHandler>& evh = *op->PH_; + + if (op->IsPollRead()) { + Y_ASSERT(FinishOp(ReadOperations_, op)); + } else { + Y_ASSERT(FinishOp(WriteOperations_, op)); + } + Srv_.FixHandledEvents(evh); //alarm, - 'this' can be destroyed here! +} + +void TInterrupterHandler::OnFdEvent(int status, ui16 filter) { + if (!status && (filter & CONT_POLL_READ)) { + PI_.Reset(); + } +} + +void TIOService::TImpl::Run() { + TEvh& iEvh = Evh_.Get(I_.Fd()); + iEvh.Reset(new TInterrupterHandler(*this, I_)); + + TInterrupterKeeper ik(*this, iEvh); + Y_UNUSED(ik); + IPollerFace::TEvents evs; + AtomicSet(NeedCheckOpQueue_, 1); + TInstant deadline; + + while (Y_LIKELY(!Aborted_ && (AtomicGet(OutstandingWork_) || FdEventHandlersCnt_ > 1 || TimersOpCnt_ || AtomicGet(NeedCheckOpQueue_)))) { + //while + // expected work (external flag) + // or have event handlers (exclude interrupter) + // or have not completed timer operation + // or have any operation in queues + + AtomicIncrement(IsWaiting_); + if (!AtomicGet(NeedCheckOpQueue_)) { + P_->Wait(evs, deadline); + } + AtomicDecrement(IsWaiting_); + + if (evs.size()) { + for (IPollerFace::TEvents::const_iterator iev = evs.begin(); iev != evs.end() && !Aborted_; ++iev) { + const IPollerFace::TEvent& ev = *iev; + TEvh& evh = *(TEvh*)ev.Data; + + if (!evh) { + continue; //op. cancel (see ProcessOpQueue) can destroy evh + } + + int status = ev.Status; + if (ev.Status == EIO) { + int error = status; + if (GetSockOpt(evh->Fd(), SOL_SOCKET, SO_ERROR, error) == 0) { + status = error; + } + } + + OnFdEvent(evh, status, ev.Filter); //here handle fd events + //immediatly after handling events for one descriptor check op. queue + //often queue can contain another operation for this fd (next async read as sample) + //so we can optimize redundant epoll_ctl (or similar) calls + ProcessOpQueue(); + } + + evs.clear(); + } else { + ProcessOpQueue(); + } + + deadline = DeadlinesQueue_.NextDeadline(); //here handle timeouts/process timers + } +} + +void TIOService::TImpl::Abort() { + class TAbortOperation: public TNoneOperation { + public: + TAbortOperation(TIOService::TImpl& srv) + : TNoneOperation() + , Srv_(srv) + { + Speculative_ = true; + } + + private: + bool Execute(int errorCode) override { + Y_UNUSED(errorCode); + Srv_.ProcessAbort(); + return true; + } + + TIOService::TImpl& Srv_; + }; + AtomicSet(HasAbort_, 1); + ScheduleOp(new TAbortOperation(*this)); +} + +void TIOService::TImpl::ProcessAbort() { + Aborted_ = true; + + for (int fd = 0; fd <= MaxFd_; ++fd) { + TEvh& evh = Evh_.Get(fd); + if (!!evh && evh->Fd() != I_.Fd()) { + OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE); + } + } + + for (auto t : Timers_) { + t->FailOperations(ECANCELED); + } + + TOperationPtr op; + while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations + try { + op->Execute(ECANCELED); + } catch (...) { + } + op.Destroy(); + } +} diff --git a/library/cpp/neh/asio/io_service_impl.h b/library/cpp/neh/asio/io_service_impl.h new file mode 100644 index 0000000000..46fa9f9ee1 --- /dev/null +++ b/library/cpp/neh/asio/io_service_impl.h @@ -0,0 +1,744 @@ +#pragma once + +#include "asio.h" +#include "poll_interrupter.h" + +#include <library/cpp/neh/lfqueue.h> +#include <library/cpp/neh/pipequeue.h> + +#include <library/cpp/dns/cache.h> + +#include <util/generic/hash_set.h> +#include <util/network/iovec.h> +#include <util/network/pollerimpl.h> +#include <util/thread/lfqueue.h> +#include <util/thread/factory.h> + +#ifdef DEBUG_ASIO +#define DBGOUT(args) Cout << args << Endl; +#else +#define DBGOUT(args) +#endif + +namespace NAsio { + //TODO: copypaste from neh, - need fix + template <class T> + class TLockFreeSequence { + public: + inline TLockFreeSequence() { + memset((void*)T_, 0, sizeof(T_)); + } + + inline ~TLockFreeSequence() { + for (size_t i = 0; i < Y_ARRAY_SIZE(T_); ++i) { + delete[] T_[i]; + } + } + + inline T& Get(size_t n) { + const size_t i = GetValueBitCount(n + 1) - 1; + + return GetList(i)[n + 1 - (((size_t)1) << i)]; + } + + private: + inline T* GetList(size_t n) { + T* volatile* t = T_ + n; + + while (!*t) { + TArrayHolder<T> nt(new T[((size_t)1) << n]); + + if (AtomicCas(t, nt.Get(), nullptr)) { + return nt.Release(); + } + } + + return *t; + } + + private: + T* volatile T_[sizeof(size_t) * 8]; + }; + + struct TOperationCompare { + template <class T> + static inline bool Compare(const T& l, const T& r) noexcept { + return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r); + } + }; + + //async operation, execute in contex TIOService()::Run() thread-executor + //usualy used for call functors/callbacks + class TOperation: public TRbTreeItem<TOperation, TOperationCompare>, public IHandlingContext { + public: + TOperation(TInstant deadline = TInstant::Max()) + : D_(deadline) + , Speculative_(false) + , RequiredRepeatExecution_(false) + , ND_(deadline) + { + } + + //register this operation in svc.impl. + virtual void AddOp(TIOService::TImpl&) = 0; + + //return false, if operation not completed + virtual bool Execute(int errorCode = 0) = 0; + + void ContinueUseHandler(TDeadline deadline) override { + RequiredRepeatExecution_ = true; + ND_ = deadline; + } + + virtual void Finalize() = 0; + + inline TInstant Deadline() const noexcept { + return D_; + } + + inline TInstant DeadLine() const noexcept { + return D_; + } + + inline bool Speculative() const noexcept { + return Speculative_; + } + + inline bool IsRequiredRepeat() const noexcept { + return RequiredRepeatExecution_; + } + + inline void PrepareReExecution() noexcept { + RequiredRepeatExecution_ = false; + D_ = ND_; + } + + protected: + TInstant D_; + bool Speculative_; //if true, operation will be runned immediately after dequeue (even without wating any event) + //as sample used for optimisation writing, - obviously in buffers exist space for write + bool RequiredRepeatExecution_; //set to true, if required re-exec operation + TInstant ND_; //new deadline (for re-exec operation) + }; + + typedef TAutoPtr<TOperation> TOperationPtr; + + class TNoneOperation: public TOperation { + public: + TNoneOperation(TInstant deadline = TInstant::Max()) + : TOperation(deadline) + { + } + + void AddOp(TIOService::TImpl&) override { + Y_ASSERT(0); + } + + void Finalize() override { + } + }; + + class TPollFdEventHandler; + + //descriptor use operation + class TFdOperation: public TOperation { + public: + enum TPollType { + PollRead, + PollWrite + }; + + TFdOperation(SOCKET fd, TPollType pt, TInstant deadline = TInstant::Max()) + : TOperation(deadline) + , Fd_(fd) + , PT_(pt) + , PH_(nullptr) + { + Y_ASSERT(Fd() != INVALID_SOCKET); + } + + inline SOCKET Fd() const noexcept { + return Fd_; + } + + inline bool IsPollRead() const noexcept { + return PT_ == PollRead; + } + + void AddOp(TIOService::TImpl& srv) override; + + void Finalize() override; + + protected: + SOCKET Fd_; + TPollType PT_; + + public: + TAutoPtr<TPollFdEventHandler>* PH_; + }; + + typedef TAutoPtr<TFdOperation> TFdOperationPtr; + + class TPollFdEventHandler { + public: + TPollFdEventHandler(SOCKET fd, TIOService::TImpl& srv) + : Fd_(fd) + , HandledEvents_(0) + , Srv_(srv) + { + } + + virtual ~TPollFdEventHandler() { + Y_ASSERT(ReadOperations_.size() == 0); + Y_ASSERT(WriteOperations_.size() == 0); + } + + inline void AddReadOp(TFdOperationPtr op) { + ReadOperations_.push_back(op); + } + + inline void AddWriteOp(TFdOperationPtr op) { + WriteOperations_.push_back(op); + } + + virtual void OnFdEvent(int status, ui16 filter) { + DBGOUT("PollEvent(fd=" << Fd_ << ", " << status << ", " << filter << ")"); + if (status) { + ExecuteOperations(ReadOperations_, status); + ExecuteOperations(WriteOperations_, status); + } else { + if (filter & CONT_POLL_READ) { + ExecuteOperations(ReadOperations_, status); + } + if (filter & CONT_POLL_WRITE) { + ExecuteOperations(WriteOperations_, status); + } + } + } + + typedef TVector<TFdOperationPtr> TFdOperations; + + void ExecuteOperations(TFdOperations& oprs, int errorCode); + + //return true if filter handled events changed and require re-configure events poller + virtual bool FixHandledEvents() noexcept { + DBGOUT("TPollFdEventHandler::FixHandledEvents()"); + ui16 filter = 0; + + if (WriteOperations_.size()) { + filter |= CONT_POLL_WRITE; + } + if (ReadOperations_.size()) { + filter |= CONT_POLL_READ; + } + + if (Y_LIKELY(HandledEvents_ == filter)) { + return false; + } + + HandledEvents_ = filter; + return true; + } + + inline bool FinishOp(TFdOperations& oprs, TFdOperation* op) noexcept { + for (TFdOperations::iterator it = oprs.begin(); it != oprs.end(); ++it) { + if (it->Get() == op) { + FinishedOperations_.push_back(*it); + oprs.erase(it); + return true; + } + } + return false; + } + + void DelOp(TFdOperation* op); + + inline SOCKET Fd() const noexcept { + return Fd_; + } + + inline ui16 HandledEvents() const noexcept { + return HandledEvents_; + } + + inline void AddHandlingEvent(ui16 ev) noexcept { + HandledEvents_ |= ev; + } + + inline void DestroyFinishedOperations() { + FinishedOperations_.clear(); + } + + TIOService::TImpl& GetServiceImpl() const noexcept { + return Srv_; + } + + protected: + SOCKET Fd_; + ui16 HandledEvents_; + TIOService::TImpl& Srv_; + + private: + TVector<TFdOperationPtr> ReadOperations_; + TVector<TFdOperationPtr> WriteOperations_; + // we can't immediatly destroy finished operations, this can cause closing used socket descriptor Fd_ + // (on cascade deletion operation object-handler), but later we use Fd_ for modify handled events at poller, + // so we collect here finished operations and destroy it only after update poller, - + // call FixHandledEvents(TPollFdEventHandlerPtr&) + TVector<TFdOperationPtr> FinishedOperations_; + }; + + //additional descriptor for poller, used for interrupt current poll wait + class TInterrupterHandler: public TPollFdEventHandler { + public: + TInterrupterHandler(TIOService::TImpl& srv, TPollInterrupter& pi) + : TPollFdEventHandler(pi.Fd(), srv) + , PI_(pi) + { + HandledEvents_ = CONT_POLL_READ; + } + + ~TInterrupterHandler() override { + DBGOUT("~TInterrupterHandler"); + } + + void OnFdEvent(int status, ui16 filter) override; + + bool FixHandledEvents() noexcept override { + DBGOUT("TInterrupterHandler::FixHandledEvents()"); + return false; + } + + private: + TPollInterrupter& PI_; + }; + + namespace { + inline TAutoPtr<IPollerFace> CreatePoller() { + try { +#if defined(_linux_) + return IPollerFace::Construct(TStringBuf("epoll")); +#endif +#if defined(_freebsd_) || defined(_darwin_) + return IPollerFace::Construct(TStringBuf("kqueue")); +#endif + } catch (...) { + Cdbg << CurrentExceptionMessage() << Endl; + } + return IPollerFace::Default(); + } + } + + //some equivalent TContExecutor + class TIOService::TImpl: public TNonCopyable { + public: + typedef TAutoPtr<TPollFdEventHandler> TEvh; + typedef TLockFreeSequence<TEvh> TEventHandlers; + + class TTimer { + public: + typedef THashSet<TOperation*> TOperations; + + TTimer(TIOService::TImpl& srv) + : Srv_(srv) + { + } + + virtual ~TTimer() { + FailOperations(ECANCELED); + } + + void AddOp(TOperation* op) { + THolder<TOperation> tmp(op); + Operations_.insert(op); + Y_UNUSED(tmp.Release()); + Srv_.RegisterOpDeadline(op); + Srv_.IncTimersOp(); + } + + void DelOp(TOperation* op) { + TOperations::iterator it = Operations_.find(op); + if (it != Operations_.end()) { + Srv_.DecTimersOp(); + delete op; + Operations_.erase(it); + } + } + + inline void FailOperations(int ec) { + for (auto operation : Operations_) { + try { + operation->Execute(ec); //throw ? + } catch (...) { + } + Srv_.DecTimersOp(); + delete operation; + } + Operations_.clear(); + } + + TIOService::TImpl& GetIOServiceImpl() const noexcept { + return Srv_; + } + + protected: + TIOService::TImpl& Srv_; + THashSet<TOperation*> Operations_; + }; + + class TTimers: public THashSet<TTimer*> { + public: + ~TTimers() { + for (auto it : *this) { + delete it; + } + } + }; + + TImpl() + : P_(CreatePoller()) + , DeadlinesQueue_(*this) + { + } + + ~TImpl() { + TOperationPtr op; + + while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations + try { + op->Execute(ECANCELED); + } catch (...) { + } + op.Destroy(); + } + } + + //similar TContExecutor::Execute() or io_service::run() + //process event loop (exit if none to do (no timers or event handlers)) + void Run(); + + //enqueue functor fo call in Run() eventloop (thread safing) + inline void Post(TCompletionHandler h) { + class TFuncOperation: public TNoneOperation { + public: + TFuncOperation(TCompletionHandler completionHandler) + : TNoneOperation() + , H_(std::move(completionHandler)) + { + Speculative_ = true; + } + + private: + //return false, if operation not completed + bool Execute(int errorCode) override { + Y_UNUSED(errorCode); + H_(); + return true; + } + + TCompletionHandler H_; + }; + + ScheduleOp(new TFuncOperation(std::move(h))); + } + + //cancel all current operations (handlers be called with errorCode == ECANCELED) + void Abort(); + bool HasAbort() { + return AtomicGet(HasAbort_); + } + + inline void ScheduleOp(TOperationPtr op) { //throw std::bad_alloc + Y_ASSERT(!Aborted_); + Y_ASSERT(!!op); + OpQueue_.Enqueue(op); + Interrupt(); + } + + inline void Interrupt() noexcept { + AtomicSet(NeedCheckOpQueue_, 1); + if (AtomicAdd(IsWaiting_, 0) == 1) { + I_.Interrupt(); + } + } + + inline void UpdateOpDeadline(TOperation* op) { + TInstant oldDeadline = op->Deadline(); + op->PrepareReExecution(); + + if (oldDeadline == op->Deadline()) { + return; + } + + if (oldDeadline != TInstant::Max()) { + op->UnLink(); + } + if (op->Deadline() != TInstant::Max()) { + DeadlinesQueue_.Register(op); + } + } + + void SyncRegisterTimer(TTimer* t) { + Timers_.insert(t); + } + + inline void SyncUnregisterAndDestroyTimer(TTimer* t) { + Timers_.erase(t); + delete t; + } + + inline void IncTimersOp() noexcept { + ++TimersOpCnt_; + } + + inline void DecTimersOp() noexcept { + --TimersOpCnt_; + } + + inline void WorkStarted() { + AtomicIncrement(OutstandingWork_); + } + + inline void WorkFinished() { + if (AtomicDecrement(OutstandingWork_) == 0) { + Interrupt(); + } + } + + private: + void ProcessAbort(); + + inline TEvh& EnsureGetEvh(SOCKET fd) { + TEvh& evh = Evh_.Get(fd); + if (!evh) { + evh.Reset(new TPollFdEventHandler(fd, *this)); + } + return evh; + } + + inline void OnTimeoutOp(TOperation* op) { + DBGOUT("OnTimeoutOp"); + try { + op->Execute(ETIMEDOUT); //throw ? + } catch (...) { + op->Finalize(); + throw; + } + + if (op->IsRequiredRepeat()) { + //operation not completed + UpdateOpDeadline(op); + } else { + //destroy operation structure + op->Finalize(); + } + } + + public: + inline void FixHandledEvents(TEvh& evh) { + if (!!evh) { + if (evh->FixHandledEvents()) { + if (!evh->HandledEvents()) { + DelEventHandler(evh); + evh.Destroy(); + } else { + ModEventHandler(evh); + evh->DestroyFinishedOperations(); + } + } else { + evh->DestroyFinishedOperations(); + } + } + } + + private: + inline TEvh& GetHandlerForOp(TFdOperation* op) { + TEvh& evh = EnsureGetEvh(op->Fd()); + op->PH_ = &evh; + return evh; + } + + void ProcessOpQueue() { + if (!AtomicGet(NeedCheckOpQueue_)) { + return; + } + AtomicSet(NeedCheckOpQueue_, 0); + + TOperationPtr op; + + while (OpQueue_.Dequeue(&op)) { + if (op->Speculative()) { + if (op->Execute(Y_UNLIKELY(Aborted_) ? ECANCELED : 0)) { + op.Destroy(); + continue; //operation completed + } + + if (!op->IsRequiredRepeat()) { + op->PrepareReExecution(); + } + } + RegisterOpDeadline(op.Get()); + op.Get()->AddOp(*this); // ... -> AddOp() + Y_UNUSED(op.Release()); + } + } + + inline void RegisterOpDeadline(TOperation* op) { + if (op->DeadLine() != TInstant::Max()) { + DeadlinesQueue_.Register(op); + } + } + + public: + inline void AddOp(TFdOperation* op) { + DBGOUT("AddOp<Fd>(" << op->Fd() << ")"); + TEvh& evh = GetHandlerForOp(op); + if (op->IsPollRead()) { + evh->AddReadOp(op); + EnsureEventHandled(evh, CONT_POLL_READ); + } else { + evh->AddWriteOp(op); + EnsureEventHandled(evh, CONT_POLL_WRITE); + } + } + + private: + inline void EnsureEventHandled(TEvh& evh, ui16 ev) { + if (!evh->HandledEvents()) { + evh->AddHandlingEvent(ev); + AddEventHandler(evh); + } else { + if ((evh->HandledEvents() & ev) == 0) { + evh->AddHandlingEvent(ev); + ModEventHandler(evh); + } + } + } + + public: + //cancel all current operations for socket + //method MUST be called from Run() thread-executor + void CancelFdOp(SOCKET fd) { + TEvh& evh = Evh_.Get(fd); + if (!evh) { + return; + } + + OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE); + } + + private: + //helper for fixing handled events even in case exception + struct TExceptionProofFixerHandledEvents { + TExceptionProofFixerHandledEvents(TIOService::TImpl& srv, TEvh& iEvh) + : Srv_(srv) + , Evh_(iEvh) + { + } + + ~TExceptionProofFixerHandledEvents() { + Srv_.FixHandledEvents(Evh_); + } + + TIOService::TImpl& Srv_; + TEvh& Evh_; + }; + + inline void OnFdEvent(TEvh& evh, int status, ui16 filter) { + TExceptionProofFixerHandledEvents fixer(*this, evh); + Y_UNUSED(fixer); + evh->OnFdEvent(status, filter); + } + + inline void AddEventHandler(TEvh& evh) { + if (evh->Fd() > MaxFd_) { + MaxFd_ = evh->Fd(); + } + SetEventHandler(&evh, evh->Fd(), evh->HandledEvents()); + ++FdEventHandlersCnt_; + } + + inline void ModEventHandler(TEvh& evh) { + SetEventHandler(&evh, evh->Fd(), evh->HandledEvents()); + } + + inline void DelEventHandler(TEvh& evh) { + SetEventHandler(&evh, evh->Fd(), 0); + --FdEventHandlersCnt_; + } + + inline void SetEventHandler(void* h, int fd, ui16 flags) { + DBGOUT("SetEventHandler(" << fd << ", " << flags << ")"); + P_->Set(h, fd, flags); + } + + //exception safe call DelEventHandler + struct TInterrupterKeeper { + TInterrupterKeeper(TImpl& srv, TEvh& iEvh) + : Srv_(srv) + , Evh_(iEvh) + { + Srv_.AddEventHandler(Evh_); + } + + ~TInterrupterKeeper() { + Srv_.DelEventHandler(Evh_); + } + + TImpl& Srv_; + TEvh& Evh_; + }; + + TAutoPtr<IPollerFace> P_; + TPollInterrupter I_; + TAtomic IsWaiting_ = 0; + TAtomic NeedCheckOpQueue_ = 0; + TAtomic OutstandingWork_ = 0; + + NNeh::TAutoLockFreeQueue<TOperation> OpQueue_; + + TEventHandlers Evh_; //i/o event handlers + TTimers Timers_; //timeout event handlers + + size_t FdEventHandlersCnt_ = 0; //i/o event handlers counter + size_t TimersOpCnt_ = 0; //timers op counter + SOCKET MaxFd_ = 0; //max used descriptor num + TAtomic HasAbort_ = 0; + bool Aborted_ = false; + + class TDeadlinesQueue { + public: + TDeadlinesQueue(TIOService::TImpl& srv) + : Srv_(srv) + { + } + + inline void Register(TOperation* op) { + Deadlines_.Insert(op); + } + + TInstant NextDeadline() { + TDeadlines::TIterator it = Deadlines_.Begin(); + + while (it != Deadlines_.End()) { + if (it->DeadLine() > TInstant::Now()) { + DBGOUT("TDeadlinesQueue::NewDeadline:" << (it->DeadLine().GetValue() - TInstant::Now().GetValue())); + return it->DeadLine(); + } + + TOperation* op = &*(it++); + Srv_.OnTimeoutOp(op); + } + + return Deadlines_.Empty() ? TInstant::Max() : Deadlines_.Begin()->DeadLine(); + } + + private: + typedef TRbTree<TOperation, TOperationCompare> TDeadlines; + TDeadlines Deadlines_; + TIOService::TImpl& Srv_; + }; + + TDeadlinesQueue DeadlinesQueue_; + }; +} diff --git a/library/cpp/neh/asio/poll_interrupter.cpp b/library/cpp/neh/asio/poll_interrupter.cpp new file mode 100644 index 0000000000..c96d40c4f3 --- /dev/null +++ b/library/cpp/neh/asio/poll_interrupter.cpp @@ -0,0 +1 @@ +#include "poll_interrupter.h" diff --git a/library/cpp/neh/asio/poll_interrupter.h b/library/cpp/neh/asio/poll_interrupter.h new file mode 100644 index 0000000000..faf815c512 --- /dev/null +++ b/library/cpp/neh/asio/poll_interrupter.h @@ -0,0 +1,107 @@ +#pragma once + +#include <util/system/defaults.h> +#include <util/generic/yexception.h> +#include <util/network/socket.h> +#include <util/system/pipe.h> + +#ifdef _linux_ +#include <sys/eventfd.h> +#endif + +#if defined(_bionic_) && !defined(EFD_SEMAPHORE) +#define EFD_SEMAPHORE 1 +#endif + +namespace NAsio { +#ifdef _linux_ + class TEventFdPollInterrupter { + public: + inline TEventFdPollInterrupter() { + F_ = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE); + if (F_ < 0) { + ythrow TFileError() << "failed to create a eventfd"; + } + } + + inline ~TEventFdPollInterrupter() { + close(F_); + } + + inline void Interrupt() const noexcept { + const static eventfd_t ev(1); + ssize_t res = ::write(F_, &ev, sizeof ev); + Y_UNUSED(res); + } + + inline bool Reset() const noexcept { + eventfd_t ev(0); + + for (;;) { + ssize_t res = ::read(F_, &ev, sizeof ev); + if (res && res == EINTR) { + continue; + } + + return res > 0; + } + } + + int Fd() { + return F_; + } + + private: + int F_; + }; +#endif + + class TPipePollInterrupter { + public: + TPipePollInterrupter() { + TPipeHandle::Pipe(S_[0], S_[1]); + + SetNonBlock(S_[0]); + SetNonBlock(S_[1]); + } + + inline void Interrupt() const noexcept { + char byte = 0; + ssize_t res = S_[1].Write(&byte, 1); + Y_UNUSED(res); + } + + inline bool Reset() const noexcept { + char buff[256]; + + for (;;) { + ssize_t r = S_[0].Read(buff, sizeof buff); + + if (r < 0 && r == EINTR) { + continue; + } + + bool wasInterrupted = r > 0; + + while (r == sizeof buff) { + r = S_[0].Read(buff, sizeof buff); + } + + return wasInterrupted; + } + } + + PIPEHANDLE Fd() const noexcept { + return S_[0]; + } + + private: + TPipeHandle S_[2]; + }; + +#ifdef _linux_ + typedef TEventFdPollInterrupter TPollInterrupter; //more effective than pipe, but only linux impl. +#else + typedef TPipePollInterrupter TPollInterrupter; +#endif +} diff --git a/library/cpp/neh/asio/tcp_acceptor_impl.cpp b/library/cpp/neh/asio/tcp_acceptor_impl.cpp new file mode 100644 index 0000000000..7e1d75fcf5 --- /dev/null +++ b/library/cpp/neh/asio/tcp_acceptor_impl.cpp @@ -0,0 +1,25 @@ +#include "tcp_acceptor_impl.h" + +using namespace NAsio; + +bool TOperationAccept::Execute(int errorCode) { + if (errorCode) { + H_(errorCode, *this); + + return true; + } + + struct sockaddr_storage addr; + socklen_t sz = sizeof(addr); + + SOCKET res = ::accept(Fd(), (sockaddr*)&addr, &sz); + + if (res == INVALID_SOCKET) { + H_(LastSystemError(), *this); + } else { + NS_.Assign(res, TEndpoint(new NAddr::TOpaqueAddr((sockaddr*)&addr))); + H_(0, *this); + } + + return true; +} diff --git a/library/cpp/neh/asio/tcp_acceptor_impl.h b/library/cpp/neh/asio/tcp_acceptor_impl.h new file mode 100644 index 0000000000..c990236efc --- /dev/null +++ b/library/cpp/neh/asio/tcp_acceptor_impl.h @@ -0,0 +1,76 @@ +#pragma once + +#include "asio.h" + +#include "tcp_socket_impl.h" + +namespace NAsio { + class TOperationAccept: public TFdOperation { + public: + TOperationAccept(SOCKET fd, TTcpSocket::TImpl& newSocket, TTcpAcceptor::TAcceptHandler h, TInstant deadline) + : TFdOperation(fd, PollRead, deadline) + , H_(h) + , NS_(newSocket) + { + } + + bool Execute(int errorCode) override; + + TTcpAcceptor::TAcceptHandler H_; + TTcpSocket::TImpl& NS_; + }; + + class TTcpAcceptor::TImpl: public TThrRefBase { + public: + TImpl(TIOService::TImpl& srv) noexcept + : Srv_(srv) + { + } + + inline void Bind(TEndpoint& ep, TErrorCode& ec) noexcept { + TSocketHolder s(socket(ep.SockAddr()->sa_family, SOCK_STREAM, 0)); + + if (s == INVALID_SOCKET) { + ec.Assign(LastSystemError()); + } + + FixIPv6ListenSocket(s); + CheckedSetSockOpt(s, SOL_SOCKET, SO_REUSEADDR, 1, "reuse addr"); + SetNonBlock(s); + + if (::bind(s, ep.SockAddr(), ep.SockAddrLen())) { + ec.Assign(LastSystemError()); + return; + } + + S_.Swap(s); + } + + inline void Listen(int backlog, TErrorCode& ec) noexcept { + if (::listen(S_, backlog)) { + ec.Assign(LastSystemError()); + return; + } + } + + inline void AsyncAccept(TTcpSocket& s, TTcpAcceptor::TAcceptHandler h, TInstant deadline) { + Srv_.ScheduleOp(new TOperationAccept((SOCKET)S_, s.GetImpl(), h, deadline)); //set callback + } + + inline void AsyncCancel() { + Srv_.ScheduleOp(new TOperationCancel<TTcpAcceptor::TImpl>(this)); + } + + inline TIOService::TImpl& GetIOServiceImpl() const noexcept { + return Srv_; + } + + inline SOCKET Fd() const noexcept { + return S_; + } + + private: + TIOService::TImpl& Srv_; + TSocketHolder S_; + }; +} diff --git a/library/cpp/neh/asio/tcp_socket_impl.cpp b/library/cpp/neh/asio/tcp_socket_impl.cpp new file mode 100644 index 0000000000..98cef97561 --- /dev/null +++ b/library/cpp/neh/asio/tcp_socket_impl.cpp @@ -0,0 +1,117 @@ +#include "tcp_socket_impl.h" + +using namespace NAsio; + +TSocketOperation::TSocketOperation(TTcpSocket::TImpl& s, TPollType pt, TInstant deadline) + : TFdOperation(s.Fd(), pt, deadline) + , S_(s) +{ +} + +bool TOperationWrite::Execute(int errorCode) { + if (errorCode) { + H_(errorCode, Written_, *this); + + return true; //op. completed + } + + TErrorCode ec; + TContIOVector& iov = *Buffs_->GetIOvec(); + + size_t n = S_.WriteSome(iov, ec); + + if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) { + H_(ec, Written_ + n, *this); + + return true; + } + + if (n) { + Written_ += n; + iov.Proceed(n); + if (!iov.Bytes()) { + H_(ec, Written_, *this); + + return true; //op. completed + } + } + + return false; //operation not compleled +} + +bool TOperationWriteVector::Execute(int errorCode) { + if (errorCode) { + H_(errorCode, Written_, *this); + + return true; //op. completed + } + + TErrorCode ec; + + size_t n = S_.WriteSome(V_, ec); + + if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) { + H_(ec, Written_ + n, *this); + + return true; + } + + if (n) { + Written_ += n; + V_.Proceed(n); + if (!V_.Bytes()) { + H_(ec, Written_, *this); + + return true; //op. completed + } + } + + return false; //operation not compleled +} + +bool TOperationReadSome::Execute(int errorCode) { + if (errorCode) { + H_(errorCode, 0, *this); + + return true; //op. completed + } + + TErrorCode ec; + + H_(ec, S_.ReadSome(Buff_, Size_, ec), *this); + + return true; +} + +bool TOperationRead::Execute(int errorCode) { + if (errorCode) { + H_(errorCode, Read_, *this); + + return true; //op. completed + } + + TErrorCode ec; + size_t n = S_.ReadSome(Buff_, Size_, ec); + Read_ += n; + + if (ec && ec.Value() != EAGAIN && ec.Value() != EWOULDBLOCK) { + H_(ec, Read_, *this); + + return true; //op. completed + } + + if (n) { + Size_ -= n; + if (!Size_) { + H_(ec, Read_, *this); + + return true; + } + Buff_ += n; + } else if (!ec) { // EOF while read not all + H_(ec, Read_, *this); + return true; + } + + return false; +} diff --git a/library/cpp/neh/asio/tcp_socket_impl.h b/library/cpp/neh/asio/tcp_socket_impl.h new file mode 100644 index 0000000000..44f8f42d87 --- /dev/null +++ b/library/cpp/neh/asio/tcp_socket_impl.h @@ -0,0 +1,332 @@ +#pragma once + +#include "asio.h" +#include "io_service_impl.h" + +#include <sys/uio.h> + +#if defined(_bionic_) +# define IOV_MAX 1024 +#endif + +namespace NAsio { + // ownership/keep-alive references: + // Handlers <- TOperation...(TFdOperation) <- TPollFdEventHandler <- TIOService + + class TSocketOperation: public TFdOperation { + public: + TSocketOperation(TTcpSocket::TImpl& s, TPollType pt, TInstant deadline); + + protected: + TTcpSocket::TImpl& S_; + }; + + class TOperationConnect: public TSocketOperation { + public: + TOperationConnect(TTcpSocket::TImpl& s, TTcpSocket::TConnectHandler h, TInstant deadline) + : TSocketOperation(s, PollWrite, deadline) + , H_(h) + { + } + + bool Execute(int errorCode) override { + H_(errorCode, *this); + + return true; + } + + TTcpSocket::TConnectHandler H_; + }; + + class TOperationConnectFailed: public TSocketOperation { + public: + TOperationConnectFailed(TTcpSocket::TImpl& s, TTcpSocket::TConnectHandler h, int errorCode, TInstant deadline) + : TSocketOperation(s, PollWrite, deadline) + , H_(h) + , ErrorCode_(errorCode) + { + Speculative_ = true; + } + + bool Execute(int errorCode) override { + Y_UNUSED(errorCode); + H_(ErrorCode_, *this); + + return true; + } + + TTcpSocket::TConnectHandler H_; + int ErrorCode_; + }; + + class TOperationWrite: public TSocketOperation { + public: + TOperationWrite(TTcpSocket::TImpl& s, NAsio::TTcpSocket::TSendedData& buffs, TTcpSocket::TWriteHandler h, TInstant deadline) + : TSocketOperation(s, PollWrite, deadline) + , H_(h) + , Buffs_(buffs) + , Written_(0) + { + Speculative_ = true; + } + + //return true, if not need write more data + bool Execute(int errorCode) override; + + private: + TTcpSocket::TWriteHandler H_; + NAsio::TTcpSocket::TSendedData Buffs_; + size_t Written_; + }; + + class TOperationWriteVector: public TSocketOperation { + public: + TOperationWriteVector(TTcpSocket::TImpl& s, TContIOVector* v, TTcpSocket::TWriteHandler h, TInstant deadline) + : TSocketOperation(s, PollWrite, deadline) + , H_(h) + , V_(*v) + , Written_(0) + { + Speculative_ = true; + } + + //return true, if not need write more data + bool Execute(int errorCode) override; + + private: + TTcpSocket::TWriteHandler H_; + TContIOVector& V_; + size_t Written_; + }; + + class TOperationReadSome: public TSocketOperation { + public: + TOperationReadSome(TTcpSocket::TImpl& s, void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline) + : TSocketOperation(s, PollRead, deadline) + , H_(h) + , Buff_(static_cast<char*>(buff)) + , Size_(size) + { + } + + //return true, if not need read more data + bool Execute(int errorCode) override; + + protected: + TTcpSocket::TReadHandler H_; + char* Buff_; + size_t Size_; + }; + + class TOperationRead: public TOperationReadSome { + public: + TOperationRead(TTcpSocket::TImpl& s, void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline) + : TOperationReadSome(s, buff, size, h, deadline) + , Read_(0) + { + } + + bool Execute(int errorCode) override; + + private: + size_t Read_; + }; + + class TOperationPoll: public TSocketOperation { + public: + TOperationPoll(TTcpSocket::TImpl& s, TPollType pt, TTcpSocket::TPollHandler h, TInstant deadline) + : TSocketOperation(s, pt, deadline) + , H_(h) + { + } + + bool Execute(int errorCode) override { + H_(errorCode, *this); + + return true; + } + + private: + TTcpSocket::TPollHandler H_; + }; + + template <class T> + class TOperationCancel: public TNoneOperation { + public: + TOperationCancel(T* s) + : TNoneOperation() + , S_(s) + { + Speculative_ = true; + } + + ~TOperationCancel() override { + } + + private: + bool Execute(int errorCode) override { + Y_UNUSED(errorCode); + if (!errorCode && S_->Fd() != INVALID_SOCKET) { + S_->GetIOServiceImpl().CancelFdOp(S_->Fd()); + } + return true; + } + + TIntrusivePtr<T> S_; + }; + + class TTcpSocket::TImpl: public TNonCopyable, public TThrRefBase { + public: + typedef TTcpSocket::TSendedData TSendedData; + + TImpl(TIOService::TImpl& srv) noexcept + : Srv_(srv) + { + } + + ~TImpl() override { + DBGOUT("TSocket::~TImpl()"); + } + + void Assign(SOCKET fd, TEndpoint ep) { + TSocketHolder(fd).Swap(S_); + RemoteEndpoint_ = ep; + } + + void AsyncConnect(const TEndpoint& ep, TTcpSocket::TConnectHandler h, TInstant deadline) { + TSocketHolder s(socket(ep.SockAddr()->sa_family, SOCK_STREAM, 0)); + + if (Y_UNLIKELY(s == INVALID_SOCKET || Srv_.HasAbort())) { + throw TSystemError() << TStringBuf("can't create socket"); + } + + SetNonBlock(s); + + int err; + do { + err = connect(s, ep.SockAddr(), (int)ep.SockAddrLen()); + if (Y_LIKELY(err)) { + err = LastSystemError(); + } +#if defined(_freebsd_) + if (Y_UNLIKELY(err == EINTR)) { + err = EINPROGRESS; + } + } while (0); +#elif defined(_linux_) + } while (Y_UNLIKELY(err == EINTR)); +#else + } while (0); +#endif + + RemoteEndpoint_ = ep; + S_.Swap(s); + + DBGOUT("AsyncConnect(): " << err); + if (Y_LIKELY(err == EINPROGRESS || err == EWOULDBLOCK || err == 0)) { + Srv_.ScheduleOp(new TOperationConnect(*this, h, deadline)); //set callback + } else { + Srv_.ScheduleOp(new TOperationConnectFailed(*this, h, err, deadline)); //set callback + } + } + + inline void AsyncWrite(TSendedData& d, TTcpSocket::TWriteHandler h, TInstant deadline) { + Srv_.ScheduleOp(new TOperationWrite(*this, d, h, deadline)); + } + + inline void AsyncWrite(TContIOVector* v, TTcpSocket::TWriteHandler h, TInstant deadline) { + Srv_.ScheduleOp(new TOperationWriteVector(*this, v, h, deadline)); + } + + inline void AsyncRead(void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline) { + Srv_.ScheduleOp(new TOperationRead(*this, buff, size, h, deadline)); + } + + inline void AsyncReadSome(void* buff, size_t size, TTcpSocket::TReadHandler h, TInstant deadline) { + Srv_.ScheduleOp(new TOperationReadSome(*this, buff, size, h, deadline)); + } + + inline void AsyncPollWrite(TTcpSocket::TPollHandler h, TInstant deadline) { + Srv_.ScheduleOp(new TOperationPoll(*this, TOperationPoll::PollWrite, h, deadline)); + } + + inline void AsyncPollRead(TTcpSocket::TPollHandler h, TInstant deadline) { + Srv_.ScheduleOp(new TOperationPoll(*this, TOperationPoll::PollRead, h, deadline)); + } + + inline void AsyncCancel() { + if (Y_UNLIKELY(Srv_.HasAbort())) { + return; + } + Srv_.ScheduleOp(new TOperationCancel<TTcpSocket::TImpl>(this)); + } + + inline bool SysCallHasResult(ssize_t& n, TErrorCode& ec) noexcept { + if (n >= 0) { + return true; + } + + int errn = LastSystemError(); + if (errn == EINTR) { + return false; + } + + ec.Assign(errn); + n = 0; + return true; + } + + size_t WriteSome(TContIOVector& iov, TErrorCode& ec) noexcept { + for (;;) { + ssize_t n = writev(S_, (const iovec*)iov.Parts(), Min(IOV_MAX, (int)iov.Count())); + DBGOUT("WriteSome(): n=" << n); + if (SysCallHasResult(n, ec)) { + return n; + } + } + } + + size_t WriteSome(const void* buff, size_t size, TErrorCode& ec) noexcept { + for (;;) { + ssize_t n = send(S_, (char*)buff, size, 0); + DBGOUT("WriteSome(): n=" << n); + if (SysCallHasResult(n, ec)) { + return n; + } + } + } + + size_t ReadSome(void* buff, size_t size, TErrorCode& ec) noexcept { + for (;;) { + ssize_t n = recv(S_, (char*)buff, size, 0); + DBGOUT("ReadSome(): n=" << n); + if (SysCallHasResult(n, ec)) { + return n; + } + } + } + + inline void Shutdown(TTcpSocket::TShutdownMode mode, TErrorCode& ec) { + if (shutdown(S_, mode)) { + ec.Assign(LastSystemError()); + } + } + + TIOService::TImpl& GetIOServiceImpl() const noexcept { + return Srv_; + } + + inline SOCKET Fd() const noexcept { + return S_; + } + + TEndpoint RemoteEndpoint() const { + return RemoteEndpoint_; + } + + private: + TIOService::TImpl& Srv_; + TSocketHolder S_; + TEndpoint RemoteEndpoint_; + }; +} |