aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/asio/io_service_impl.cpp
blob: d49b3fb03ee902aa42d69cb27ed86def37703003 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include "io_service_impl.h"

#include <library/cpp/coroutine/engine/poller.h>

using namespace NAsio;

void TFdOperation::AddOp(TIOService::TImpl& srv) {
    srv.AddOp(this);
}

void TFdOperation::Finalize() {
    (*PH_)->DelOp(this);
}

void TPollFdEventHandler::ExecuteOperations(TFdOperations& oprs, int errorCode) {
    TFdOperations::iterator it = oprs.begin();

    try {
        while (it != oprs.end()) {
            TFdOperation* op = it->Get();

            if (op->Execute(errorCode)) { // throw ?
                if (op->IsRequiredRepeat()) {
                    Srv_.UpdateOpDeadline(op);
                    ++it; //operation completed, but want be repeated
                } else {
                    FinishedOperations_.push_back(*it);
                    it = oprs.erase(it);
                }
            } else {
                ++it; //operation not completed
            }
        }
    } catch (...) {
        if (it != oprs.end()) {
            FinishedOperations_.push_back(*it);
            oprs.erase(it);
        }
        throw;
    }
}

void TPollFdEventHandler::DelOp(TFdOperation* op) {
    TAutoPtr<TPollFdEventHandler>& evh = *op->PH_;

    if (op->IsPollRead()) {
        Y_ASSERT(FinishOp(ReadOperations_, op));
    } else {
        Y_ASSERT(FinishOp(WriteOperations_, op));
    }
    Srv_.FixHandledEvents(evh); //alarm, - 'this' can be destroyed here!
}

void TInterrupterHandler::OnFdEvent(int status, ui16 filter) {
    if (!status && (filter & CONT_POLL_READ)) {
        PI_.Reset();
    }
}

void TIOService::TImpl::Run() {
    TEvh& iEvh = Evh_.Get(I_.Fd());
    iEvh.Reset(new TInterrupterHandler(*this, I_));

    TInterrupterKeeper ik(*this, iEvh);
    Y_UNUSED(ik);
    IPollerFace::TEvents evs;
    AtomicSet(NeedCheckOpQueue_, 1);
    TInstant deadline;

    while (Y_LIKELY(!Aborted_ && (AtomicGet(OutstandingWork_) || FdEventHandlersCnt_ > 1 || TimersOpCnt_ || AtomicGet(NeedCheckOpQueue_)))) {
        //while
        //  expected work (external flag)
        //  or have event handlers (exclude interrupter)
        //  or have not completed timer operation
        //  or have any operation in queues

        AtomicIncrement(IsWaiting_);
        if (!AtomicGet(NeedCheckOpQueue_)) {
            P_->Wait(evs, deadline);
        }
        AtomicDecrement(IsWaiting_);

        if (evs.size()) {
            for (IPollerFace::TEvents::const_iterator iev = evs.begin(); iev != evs.end() && !Aborted_; ++iev) {
                const IPollerFace::TEvent& ev = *iev;
                TEvh& evh = *(TEvh*)ev.Data;

                if (!evh) {
                    continue; //op. cancel (see ProcessOpQueue) can destroy evh
                }

                int status = ev.Status;
                if (ev.Status == EIO) {
                    int error = status;
                    if (GetSockOpt(evh->Fd(), SOL_SOCKET, SO_ERROR, error) == 0) {
                        status = error;
                    }
                }

                OnFdEvent(evh, status, ev.Filter); //here handle fd events
                //immediatly after handling events for one descriptor check op. queue
                //often queue can contain another operation for this fd (next async read as sample)
                //so we can optimize redundant epoll_ctl (or similar) calls
                ProcessOpQueue();
            }

            evs.clear();
        } else {
            ProcessOpQueue();
        }

        deadline = DeadlinesQueue_.NextDeadline(); //here handle timeouts/process timers
    }
}

void TIOService::TImpl::Abort() {
    class TAbortOperation: public TNoneOperation {
    public:
        TAbortOperation(TIOService::TImpl& srv)
            : TNoneOperation()
            , Srv_(srv)
        {
            Speculative_ = true;
        }

    private:
        bool Execute(int errorCode) override {
            Y_UNUSED(errorCode);
            Srv_.ProcessAbort();
            return true;
        }

        TIOService::TImpl& Srv_;
    };
    AtomicSet(HasAbort_, 1);
    ScheduleOp(new TAbortOperation(*this));
}

void TIOService::TImpl::ProcessAbort() {
    Aborted_ = true;

    for (int fd = 0; fd <= MaxFd_; ++fd) {
        TEvh& evh = Evh_.Get(fd);
        if (!!evh && evh->Fd() != I_.Fd()) {
            OnFdEvent(evh, ECANCELED, CONT_POLL_READ | CONT_POLL_WRITE);
        }
    }

    for (auto t : Timers_) {
        t->FailOperations(ECANCELED);
    }

    TOperationPtr op;
    while (OpQueue_.Dequeue(&op)) { //cancel all enqueued operations
        try {
            op->Execute(ECANCELED);
        } catch (...) {
        }
        op.Destroy();
    }
}