blob: d49b3fb03ee902aa42d69cb27ed86def37703003 (
plain) (
tree)
|
|
#include "io_service_impl.h"
#include <library/cpp/coroutine/engine/poller.h>
using namespace NAsio;
void TFdOperation::AddOp(TIOService::TImpl& srv) {
srv.AddOp(this);
}
void TFdOperation::Finalize() {
(*PH_)->DelOp(this);
}
void TPollFdEventHandler::ExecuteOperations(TFdOperations& oprs, int errorCode) {
TFdOperations::iterator it = oprs.begin();
try {
while (it != oprs.end()) {
TFdOperation* op = it->Get();
if (op->Execute(errorCode)) { // throw ?
if (op->IsRequiredRepeat()) {
Srv_.UpdateOpDeadline(op);
++it; //operation completed, but want be repeated
} else {
FinishedOperations_.push_back(*it);
it = oprs.erase(it);
}
} else {
++it; //operation not completed
}
}
} catch (...) {
if (it != oprs.end()) {
FinishedOperations_.push_back(*it);
oprs.erase(it);
}
throw;
}
}
void TPollFdEventHandler::DelOp(TFdOperation* op) {
TAutoPtr<TPollFdEventHandler>& evh = *op->PH_;
if (op->IsPollRead()) {
Y_ASSERT(FinishOp(ReadOperations_, op));
} else {
Y_ASSERT(FinishOp(WriteOperations_, op));
}
Srv_.FixHandledEvents(evh); //alarm, - 'this' can be destroyed here!
}
void TInterrupterHandler::OnFdEvent(int status, ui16 filter) {
if (!status && (filter & CONT_POLL_READ)) {
PI_.Reset();
}
}
void TIOService::TImpl::Run() {
TEvh& iEvh = Evh_.Get(I_.Fd());
iEvh.Reset(new TInterrupterHandler(*this, I_));
TInterrupterKeeper ik(*this, iEvh);
Y_UNUSED(ik);
IPollerFace::TEvents evs;
AtomicSet(NeedCheckOpQueue_, 1);
TInstant deadline;
while (Y_LIKELY(!Aborted_ && (AtomicGet(OutstandingWork_) || FdEventHandlersCnt_ > 1 || TimersOpCnt_ || AtomicGet(NeedCheckOpQueue_)))) {
//while
// expected work (external flag)
// or have event handlers (exclude interrupter)
// or have not completed timer operation
// or have any operation in queues
AtomicIncrement(IsWaiting_);
if (!AtomicGet(NeedCheckOpQueue_)) {
P_->Wait(evs, deadline);
}
AtomicDecrement(IsWaiting_);
if (evs.size()) {
for (IPollerFace::TEvents::const_iterator iev = evs.begin(); iev != evs.end() && !Aborted_; ++iev) {
const IPollerFace::TEvent& ev = *iev;
TEvh& evh = *(TEvh*)ev.Data;
if (!evh) {
continue; //op. cancel (see ProcessOpQueue) can destroy evh
}
int status = ev.Status;
if (ev.Status == EIO) {
int error = status;
if (GetSockOpt(evh->Fd(), SOL_SOCKET, SO_ERROR, error) == 0) {
status = error;
}
}
OnFdEvent(evh, status, ev.Filter); //here handle fd events
//immediatly after handling events for one descriptor check op. queue
//often queue can contain another operation for this fd (next async read as sample)
//so we can optimize redundant epoll_ctl (or similar) calls
ProcessOpQueue();
}
evs.clear();
} else {
ProcessOpQueue();
}
deadline = DeadlinesQueue_.NextDeadline(); //here handle timeouts/process timers
}
}
void TIOService::TImpl::Abort() {
class TAbortOperation: public TNoneOperation {
public:
TAbortOperation(TIOService::TImpl& srv)
: TNoneOperation()
, Srv_(srv)
{
Speculative_ = true;
}
private:
bool Execute(int errorCode) override {
Y_UNUSED(errorCode);
Srv_.ProcessAbort();
return true;
}
TIOService::TImpl& Srv_;
};
AtomicSet(HasAbort_, 1);
ScheduleOp(new TAbortOperation(*this));
}
void TIOService::TImpl::ProcessAbort() {
Aborted_ = true;
for (int fd = 0; fd <= MaxFd_; ++fd) {
TEvh& evh = Evh_.Get(fd);
if (!!evh && evh->Fd() != I_.Fd()) {
OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE);
}
}
for (auto t : Timers_) {
t->FailOperations(ECANCELED);
}
TOperationPtr op;
while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations
try {
op->Execute(ECANCELED);
} catch (...) {
}
op.Destroy();
}
}
|