#pragma once
#include "asio.h"
#include "poll_interrupter.h"
#include <library/cpp/neh/lfqueue.h>
#include <library/cpp/neh/pipequeue.h>
#include <library/cpp/dns/cache.h>
#include <util/generic/hash_set.h>
#include <util/network/iovec.h>
#include <util/network/pollerimpl.h>
#include <util/thread/lfqueue.h>
#include <util/thread/factory.h>
#ifdef DEBUG_ASIO
#define DBGOUT(args) Cout << args << Endl;
#else
#define DBGOUT(args)
#endif
namespace NAsio {
#if defined(_arm_)
template <typename T>
struct TLockFreeSequence {
Y_NO_INLINE T& Get(size_t n) {
with_lock (M) {
return H[n];
}
}
TMutex M;
THashMap<size_t, T> H;
};
#else
//TODO: copypaste from neh, - need fix
template <class T>
class TLockFreeSequence {
public:
inline TLockFreeSequence() {
memset((void*)T_, 0, sizeof(T_));
}
inline ~TLockFreeSequence() {
for (size_t i = 0; i < Y_ARRAY_SIZE(T_); ++i) {
delete[] T_[i];
}
}
inline T& Get(size_t n) {
const size_t i = GetValueBitCount(n + 1) - 1;
return GetList(i)[n + 1 - (((size_t)1) << i)];
}
private:
inline T* GetList(size_t n) {
T* volatile* t = T_ + n;
while (!*t) {
TArrayHolder<T> nt(new T[((size_t)1) << n]);
if (AtomicCas(t, nt.Get(), nullptr)) {
return nt.Release();
}
}
return *t;
}
private:
T* volatile T_[sizeof(size_t) * 8];
};
#endif
struct TOperationCompare {
template <class T>
static inline bool Compare(const T& l, const T& r) noexcept {
return l.DeadLine() < r.DeadLine() || (l.DeadLine() == r.DeadLine() && &l < &r);
}
};
//async operation, execute in contex TIOService()::Run() thread-executor
//usualy used for call functors/callbacks
class TOperation: public TRbTreeItem<TOperation, TOperationCompare>, public IHandlingContext {
public:
TOperation(TInstant deadline = TInstant::Max())
: D_(deadline)
, Speculative_(false)
, RequiredRepeatExecution_(false)
, ND_(deadline)
{
}
//register this operation in svc.impl.
virtual void AddOp(TIOService::TImpl&) = 0;
//return false, if operation not completed
virtual bool Execute(int errorCode = 0) = 0;
void ContinueUseHandler(TDeadline deadline) override {
RequiredRepeatExecution_ = true;
ND_ = deadline;
}
virtual void Finalize() = 0;
inline TInstant Deadline() const noexcept {
return D_;
}
inline TInstant DeadLine() const noexcept {
return D_;
}
inline bool Speculative() const noexcept {
return Speculative_;
}
inline bool IsRequiredRepeat() const noexcept {
return RequiredRepeatExecution_;
}
inline void PrepareReExecution() noexcept {
RequiredRepeatExecution_ = false;
D_ = ND_;
}
protected:
TInstant D_;
bool Speculative_; //if true, operation will be runned immediately after dequeue (even without wating any event)
//as sample used for optimisation writing, - obviously in buffers exist space for write
bool RequiredRepeatExecution_; //set to true, if required re-exec operation
TInstant ND_; //new deadline (for re-exec operation)
};
typedef TAutoPtr<TOperation> TOperationPtr;
class TNoneOperation: public TOperation {
public:
TNoneOperation(TInstant deadline = TInstant::Max())
: TOperation(deadline)
{
}
void AddOp(TIOService::TImpl&) override {
Y_ASSERT(0);
}
void Finalize() override {
}
};
class TPollFdEventHandler;
//descriptor use operation
class TFdOperation: public TOperation {
public:
enum TPollType {
PollRead,
PollWrite
};
TFdOperation(SOCKET fd, TPollType pt, TInstant deadline = TInstant::Max())
: TOperation(deadline)
, Fd_(fd)
, PT_(pt)
, PH_(nullptr)
{
Y_ASSERT(Fd() != INVALID_SOCKET);
}
inline SOCKET Fd() const noexcept {
return Fd_;
}
inline bool IsPollRead() const noexcept {
return PT_ == PollRead;
}
void AddOp(TIOService::TImpl& srv) override;
void Finalize() override;
protected:
SOCKET Fd_;
TPollType PT_;
public:
TAutoPtr<TPollFdEventHandler>* PH_;
};
typedef TAutoPtr<TFdOperation> TFdOperationPtr;
class TPollFdEventHandler {
public:
TPollFdEventHandler(SOCKET fd, TIOService::TImpl& srv)
: Fd_(fd)
, HandledEvents_(0)
, Srv_(srv)
{
}
virtual ~TPollFdEventHandler() {
Y_ASSERT(ReadOperations_.size() == 0);
Y_ASSERT(WriteOperations_.size() == 0);
}
inline void AddReadOp(TFdOperationPtr op) {
ReadOperations_.push_back(op);
}
inline void AddWriteOp(TFdOperationPtr op) {
WriteOperations_.push_back(op);
}
virtual void OnFdEvent(int status, ui16 filter) {
DBGOUT("PollEvent(fd=" << Fd_ << ", " << status << ", " << filter << ")");
if (status) {
ExecuteOperations(ReadOperations_, status);
ExecuteOperations(WriteOperations_, status);
} else {
if (filter & CONT_POLL_READ) {
ExecuteOperations(ReadOperations_, status);
}
if (filter & CONT_POLL_WRITE) {
ExecuteOperations(WriteOperations_, status);
}
}
}
typedef TVector<TFdOperationPtr> TFdOperations;
void ExecuteOperations(TFdOperations& oprs, int errorCode);
//return true if filter handled events changed and require re-configure events poller
virtual bool FixHandledEvents() noexcept {
DBGOUT("TPollFdEventHandler::FixHandledEvents()");
ui16 filter = 0;
if (WriteOperations_.size()) {
filter |= CONT_POLL_WRITE;
}
if (ReadOperations_.size()) {
filter |= CONT_POLL_READ;
}
if (Y_LIKELY(HandledEvents_ == filter)) {
return false;
}
HandledEvents_ = filter;
return true;
}
inline bool FinishOp(TFdOperations& oprs, TFdOperation* op) noexcept {
for (TFdOperations::iterator it = oprs.begin(); it != oprs.end(); ++it) {
if (it->Get() == op) {
FinishedOperations_.push_back(*it);
oprs.erase(it);
return true;
}
}
return false;
}
void DelOp(TFdOperation* op);
inline SOCKET Fd() const noexcept {
return Fd_;
}
inline ui16 HandledEvents() const noexcept {
return HandledEvents_;
}
inline void AddHandlingEvent(ui16 ev) noexcept {
HandledEvents_ |= ev;
}
inline void DestroyFinishedOperations() {
FinishedOperations_.clear();
}
TIOService::TImpl& GetServiceImpl() const noexcept {
return Srv_;
}
protected:
SOCKET Fd_;
ui16 HandledEvents_;
TIOService::TImpl& Srv_;
private:
TVector<TFdOperationPtr> ReadOperations_;
TVector<TFdOperationPtr> WriteOperations_;
// we can't immediatly destroy finished operations, this can cause closing used socket descriptor Fd_
// (on cascade deletion operation object-handler), but later we use Fd_ for modify handled events at poller,
// so we collect here finished operations and destroy it only after update poller, -
// call FixHandledEvents(TPollFdEventHandlerPtr&)
TVector<TFdOperationPtr> FinishedOperations_;
};
//additional descriptor for poller, used for interrupt current poll wait
class TInterrupterHandler: public TPollFdEventHandler {
public:
TInterrupterHandler(TIOService::TImpl& srv, TPollInterrupter& pi)
: TPollFdEventHandler(pi.Fd(), srv)
, PI_(pi)
{
HandledEvents_ = CONT_POLL_READ;
}
~TInterrupterHandler() override {
DBGOUT("~TInterrupterHandler");
}
void OnFdEvent(int status, ui16 filter) override;
bool FixHandledEvents() noexcept override {
DBGOUT("TInterrupterHandler::FixHandledEvents()");
return false;
}
private:
TPollInterrupter& PI_;
};
namespace {
inline TAutoPtr<IPollerFace> CreatePoller() {
try {
#if defined(_linux_)
return IPollerFace::Construct(TStringBuf("epoll"));
#endif
#if defined(_freebsd_) || defined(_darwin_)
return IPollerFace::Construct(TStringBuf("kqueue"));
#endif
} catch (...) {
Cdbg << CurrentExceptionMessage() << Endl;
}
return IPollerFace::Default();
}
}
//some equivalent TContExecutor
class TIOService::TImpl: public TNonCopyable {
public:
typedef TAutoPtr<TPollFdEventHandler> TEvh;
typedef TLockFreeSequence<TEvh> TEventHandlers;
class TTimer {
public:
typedef THashSet<TOperation*> TOperations;
TTimer(TIOService::TImpl& srv)
: Srv_(srv)
{
}
virtual ~TTimer() {
FailOperations(ECANCELED);
}
void AddOp(TOperation* op) {
THolder<TOperation> tmp(op);
Operations_.insert(op);
Y_UNUSED(tmp.Release());
Srv_.RegisterOpDeadline(op);
Srv_.IncTimersOp();
}
void DelOp(TOperation* op) {
TOperations::iterator it = Operations_.find(op);
if (it != Operations_.end()) {
Srv_.DecTimersOp();
delete op;
Operations_.erase(it);
}
}
inline void FailOperations(int ec) {
for (auto operation : Operations_) {
try {
operation->Execute(ec); //throw ?
} catch (...) {
}
Srv_.DecTimersOp();
delete operation;
}
Operations_.clear();
}
TIOService::TImpl& GetIOServiceImpl() const noexcept {
return Srv_;
}
protected:
TIOService::TImpl& Srv_;
THashSet<TOperation*> Operations_;
};
class TTimers: public THashSet<TTimer*> {
public:
~TTimers() {
for (auto it : *this) {
delete it;
}
}
};
TImpl()
: P_(CreatePoller())
, DeadlinesQueue_(*this)
{
}
~TImpl() {
TOperationPtr op;
while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations
try {
op->Execute(ECANCELED);
} catch (...) {
}
op.Destroy();
}
}
//similar TContExecutor::Execute() or io_service::run()
//process event loop (exit if none to do (no timers or event handlers))
void Run();
//enqueue functor fo call in Run() eventloop (thread safing)
inline void Post(TCompletionHandler h) {
class TFuncOperation: public TNoneOperation {
public:
TFuncOperation(TCompletionHandler completionHandler)
: TNoneOperation()
, H_(std::move(completionHandler))
{
Speculative_ = true;
}
private:
//return false, if operation not completed
bool Execute(int errorCode) override {
Y_UNUSED(errorCode);
H_();
return true;
}
TCompletionHandler H_;
};
ScheduleOp(new TFuncOperation(std::move(h)));
}
//cancel all current operations (handlers be called with errorCode == ECANCELED)
void Abort();
bool HasAbort() {
return AtomicGet(HasAbort_);
}
inline void ScheduleOp(TOperationPtr op) { //throw std::bad_alloc
Y_ASSERT(!Aborted_);
Y_ASSERT(!!op);
OpQueue_.Enqueue(op);
Interrupt();
}
inline void Interrupt() noexcept {
AtomicSet(NeedCheckOpQueue_, 1);
if (AtomicAdd(IsWaiting_, 0) == 1) {
I_.Interrupt();
}
}
inline void UpdateOpDeadline(TOperation* op) {
TInstant oldDeadline = op->Deadline();
op->PrepareReExecution();
if (oldDeadline == op->Deadline()) {
return;
}
if (oldDeadline != TInstant::Max()) {
op->UnLink();
}
if (op->Deadline() != TInstant::Max()) {
DeadlinesQueue_.Register(op);
}
}
void SyncRegisterTimer(TTimer* t) {
Timers_.insert(t);
}
inline void SyncUnregisterAndDestroyTimer(TTimer* t) {
Timers_.erase(t);
delete t;
}
inline void IncTimersOp() noexcept {
++TimersOpCnt_;
}
inline void DecTimersOp() noexcept {
--TimersOpCnt_;
}
inline void WorkStarted() {
AtomicIncrement(OutstandingWork_);
}
inline void WorkFinished() {
if (AtomicDecrement(OutstandingWork_) == 0) {
Interrupt();
}
}
private:
void ProcessAbort();
inline TEvh& EnsureGetEvh(SOCKET fd) {
TEvh& evh = Evh_.Get(fd);
if (!evh) {
evh.Reset(new TPollFdEventHandler(fd, *this));
}
return evh;
}
inline void OnTimeoutOp(TOperation* op) {
DBGOUT("OnTimeoutOp");
try {
op->Execute(ETIMEDOUT); //throw ?
} catch (...) {
op->Finalize();
throw;
}
if (op->IsRequiredRepeat()) {
//operation not completed
UpdateOpDeadline(op);
} else {
//destroy operation structure
op->Finalize();
}
}
public:
inline void FixHandledEvents(TEvh& evh) {
if (!!evh) {
if (evh->FixHandledEvents()) {
if (!evh->HandledEvents()) {
DelEventHandler(evh);
evh.Destroy();
} else {
ModEventHandler(evh);
evh->DestroyFinishedOperations();
}
} else {
evh->DestroyFinishedOperations();
}
}
}
private:
inline TEvh& GetHandlerForOp(TFdOperation* op) {
TEvh& evh = EnsureGetEvh(op->Fd());
op->PH_ = &evh;
return evh;
}
void ProcessOpQueue() {
if (!AtomicGet(NeedCheckOpQueue_)) {
return;
}
AtomicSet(NeedCheckOpQueue_, 0);
TOperationPtr op;
while (OpQueue_.Dequeue(&op)) {
if (op->Speculative()) {
if (op->Execute(Y_UNLIKELY(Aborted_) ? ECANCELED : 0)) {
op.Destroy();
continue; //operation completed
}
if (!op->IsRequiredRepeat()) {
op->PrepareReExecution();
}
}
RegisterOpDeadline(op.Get());
op.Get()->AddOp(*this); // ... -> AddOp()
Y_UNUSED(op.Release());
}
}
inline void RegisterOpDeadline(TOperation* op) {
if (op->DeadLine() != TInstant::Max()) {
DeadlinesQueue_.Register(op);
}
}
public:
inline void AddOp(TFdOperation* op) {
DBGOUT("AddOp<Fd>(" << op->Fd() << ")");
TEvh& evh = GetHandlerForOp(op);
if (op->IsPollRead()) {
evh->AddReadOp(op);
EnsureEventHandled(evh, CONT_POLL_READ);
} else {
evh->AddWriteOp(op);
EnsureEventHandled(evh, CONT_POLL_WRITE);
}
}
private:
inline void EnsureEventHandled(TEvh& evh, ui16 ev) {
if (!evh->HandledEvents()) {
evh->AddHandlingEvent(ev);
AddEventHandler(evh);
} else {
if ((evh->HandledEvents() & ev) == 0) {
evh->AddHandlingEvent(ev);
ModEventHandler(evh);
}
}
}
public:
//cancel all current operations for socket
//method MUST be called from Run() thread-executor
void CancelFdOp(SOCKET fd) {
TEvh& evh = Evh_.Get(fd);
if (!evh) {
return;
}
OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE);
}
private:
//helper for fixing handled events even in case exception
struct TExceptionProofFixerHandledEvents {
TExceptionProofFixerHandledEvents(TIOService::TImpl& srv, TEvh& iEvh)
: Srv_(srv)
, Evh_(iEvh)
{
}
~TExceptionProofFixerHandledEvents() {
Srv_.FixHandledEvents(Evh_);
}
TIOService::TImpl& Srv_;
TEvh& Evh_;
};
inline void OnFdEvent(TEvh& evh, int status, ui16 filter) {
TExceptionProofFixerHandledEvents fixer(*this, evh);
Y_UNUSED(fixer);
evh->OnFdEvent(status, filter);
}
inline void AddEventHandler(TEvh& evh) {
if (evh->Fd() > MaxFd_) {
MaxFd_ = evh->Fd();
}
SetEventHandler(&evh, evh->Fd(), evh->HandledEvents());
++FdEventHandlersCnt_;
}
inline void ModEventHandler(TEvh& evh) {
SetEventHandler(&evh, evh->Fd(), evh->HandledEvents());
}
inline void DelEventHandler(TEvh& evh) {
SetEventHandler(&evh, evh->Fd(), 0);
--FdEventHandlersCnt_;
}
inline void SetEventHandler(void* h, int fd, ui16 flags) {
DBGOUT("SetEventHandler(" << fd << ", " << flags << ")");
P_->Set(h, fd, flags);
}
//exception safe call DelEventHandler
struct TInterrupterKeeper {
TInterrupterKeeper(TImpl& srv, TEvh& iEvh)
: Srv_(srv)
, Evh_(iEvh)
{
Srv_.AddEventHandler(Evh_);
}
~TInterrupterKeeper() {
Srv_.DelEventHandler(Evh_);
}
TImpl& Srv_;
TEvh& Evh_;
};
TAutoPtr<IPollerFace> P_;
TPollInterrupter I_;
TAtomic IsWaiting_ = 0;
TAtomic NeedCheckOpQueue_ = 0;
TAtomic OutstandingWork_ = 0;
NNeh::TAutoLockFreeQueue<TOperation> OpQueue_;
TEventHandlers Evh_; //i/o event handlers
TTimers Timers_; //timeout event handlers
size_t FdEventHandlersCnt_ = 0; //i/o event handlers counter
size_t TimersOpCnt_ = 0; //timers op counter
SOCKET MaxFd_ = 0; //max used descriptor num
TAtomic HasAbort_ = 0;
bool Aborted_ = false;
class TDeadlinesQueue {
public:
TDeadlinesQueue(TIOService::TImpl& srv)
: Srv_(srv)
{
}
inline void Register(TOperation* op) {
Deadlines_.Insert(op);
}
TInstant NextDeadline() {
TDeadlines::TIterator it = Deadlines_.Begin();
while (it != Deadlines_.End()) {
if (it->DeadLine() > TInstant::Now()) {
DBGOUT("TDeadlinesQueue::NewDeadline:" << (it->DeadLine().GetValue() - TInstant::Now().GetValue()));
return it->DeadLine();
}
TOperation* op = &*(it++);
Srv_.OnTimeoutOp(op);
}
return Deadlines_.Empty() ? TInstant::Max() : Deadlines_.Begin()->DeadLine();
}
private:
typedef TRbTree<TOperation, TOperationCompare> TDeadlines;
TDeadlines Deadlines_;
TIOService::TImpl& Srv_;
};
TDeadlinesQueue DeadlinesQueue_;
};
}