aboutsummaryrefslogblamecommitdiffstats
path: root/util/network/pollerimpl.h
blob: e8c7e40fbaf1b2af8e1d25779c1c17eb186866a1 (plain) (tree)
1
2
3
4
5
6
7
8
9
            



                              

                                 
                                
                                    
                               



                                           
                                                                       


















                                                                
                                                                  

                                                                                                      
                                                 








                                                                      
                                                      















                                                                              
                                


                                                                         
                                                                                                          










                                                                            
                                   







                                                     
                             

                   
                                    



                                                       
                           
 










                                                                               
                
                                                                                                                 
 
                                                           


                                                         
                                         





                                                            
                                                                                                                                        
     
                                                                          



                                                
                                                                  
 
                                                                       


                           
                                                                    

                            
                                                                   


                                      
                             
     
                                                                       
                                           




                                            
 


                                                    

                 




                              
                                                                                                            
            
 
        
                                                                                     


                                          
 



                                        
                                                             




                                                           
                            

                   
                                    






                                                       


















                                                                               


                                     
                          
                                                                                       




                                                              
                                         





                                              
                                                                          
                                                                               
                                                                          


                           
                                                                    

                               
                                                                   





                                                    
                                                                       








                                       


                                         

                   









                                    
                                                                                                  













                                          
                            


                        
                                            

                         
                                                  


                        


                                           
                                            


                           
                                                  



                                                            
                                       

                      
                                                                             
                           

                                                  



















                                                        
                                            

                                     
                                                      

                         
                                                         





                                                     
                                                    

                          
                         




                                
                             


                                
                                                          
                                  
                                                          



                 
                                            
                                  
                                                 



                 
                                                                          
                                  
 



























                                                                         
                                                                              
                                                                                           









                                                                               




                                                    
                                                    
                                                                                                     

                                           
          
 










                                                                             
                                                                        




                                                  
                                                                                            

                                     
                                                                                 
                                        
                                         

                                                      




                                                           











                                                             



                                                               




                                                                               


                 



                                                      

                                    
                                                    

                                      
                                                                    

                             
                                                                   

                               
                                                                       
                               
     
        
                                   







                                                         
                                  

         
                                             

                          
                                            


                          




                                                     



                             
                                       


                             
      





                                      
                          
                                
 


                      
                                                                                          








                                                         
                       
                                          
 
                                                      





                                          
                                                                   
                                          
                                                                      



                                               
                                                                                                                 


























                                                                                       
#pragma once

#include "socket.h"

#include <util/system/error.h>
#include <util/system/mutex.h>
#include <util/system/defaults.h>
#include <util/generic/ylimits.h>
#include <util/generic/utility.h>
#include <util/generic/vector.h>
#include <util/generic/yexception.h>
#include <util/datetime/base.h>

#if defined(_freebsd_) || defined(_darwin_)
    #define HAVE_KQUEUE_POLLER
#endif

#if (defined(_linux_) && !defined(_bionic_)) || (__ANDROID_API__ >= 21)
    #define HAVE_EPOLL_POLLER
#endif

//now we always have it
#define HAVE_SELECT_POLLER

#if defined(HAVE_KQUEUE_POLLER)
    #include <sys/event.h>
#endif

#if defined(HAVE_EPOLL_POLLER)
    #include <sys/epoll.h>
#endif

enum EContPoll {
    CONT_POLL_READ = 1,
    CONT_POLL_WRITE = 2,
    CONT_POLL_RDHUP = 4,
    CONT_POLL_ONE_SHOT = 8,        // Disable after first event
    CONT_POLL_MODIFY = 16,         // Modify already added event
    CONT_POLL_EDGE_TRIGGERED = 32, // Notify only about new events
    CONT_POLL_BACKLOG_EMPTY = 64,  // Backlog is empty (seen end of request, EAGAIN or truncated read)
};

static inline bool IsSocket(SOCKET fd) noexcept {
    int val = 0;
    socklen_t len = sizeof(val);

    if (getsockopt(fd, SOL_SOCKET, SO_TYPE, (char*)&val, &len) == 0) {
        return true;
    }

    return LastSystemError() != ENOTSOCK;
}

static inline int MicroToMilli(int timeout) noexcept {
    if (timeout) {
        /*
         * 1. API of epoll syscall allows to specify timeout with millisecond
         * accuracy only
         * 2. It is quite complicated to guarantee time resolution of blocking
         * syscall less than kernel 1/HZ
         *
         * Without this rounding we just waste cpu time and do a lot of
         * fast epoll_wait(..., 0) syscalls.
         */
        return Max(timeout / 1000, 1);
    }

    return 0;
}

struct TWithoutLocking {
    using TMyMutex = TFakeMutex;
};

#if defined(HAVE_KQUEUE_POLLER)
static inline int Kevent(int kq, struct kevent* changelist, int nchanges,
                         struct kevent* eventlist, int nevents, const struct timespec* timeout) noexcept {
    int ret;

    do {
        ret = kevent(kq, changelist, nchanges, eventlist, nevents, timeout);
    } while (ret == -1 && errno == EINTR);

    return ret;
}

template <class TLockPolicy>
class TKqueuePoller {
public:
    typedef struct ::kevent TEvent;

    inline TKqueuePoller()
        : Fd_(kqueue())
    {
        if (Fd_ == -1) {
            ythrow TSystemError() << "kqueue failed";
        }
    }

    inline ~TKqueuePoller() {
        close(Fd_);
    }

    inline int Fd() const noexcept {
        return Fd_;
    }

    inline void SetImpl(void* data, int fd, int what) {
        TEvent e[2];
        int flags = EV_ADD;

        if (what & CONT_POLL_EDGE_TRIGGERED) {
            if (what & CONT_POLL_BACKLOG_EMPTY) {
                // When backlog is empty, edge-triggered does not need restart.
                return;
            }
            flags |= EV_CLEAR;
        }

        if (what & CONT_POLL_ONE_SHOT) {
            flags |= EV_ONESHOT;
        }

        Zero(e);

        EV_SET(e + 0, fd, EVFILT_READ, flags | ((what & CONT_POLL_READ) ? EV_ENABLE : EV_DISABLE), 0, 0, data);
        EV_SET(e + 1, fd, EVFILT_WRITE, flags | ((what & CONT_POLL_WRITE) ? EV_ENABLE : EV_DISABLE), 0, 0, data);

        if (Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1) {
            ythrow TSystemError() << "kevent add failed";
        }
    }

    inline void Remove(int fd) noexcept {
        TEvent e[2];

        Zero(e);

        EV_SET(e + 0, fd, EVFILT_READ, EV_DELETE, 0, 0, 0);
        EV_SET(e + 1, fd, EVFILT_WRITE, EV_DELETE, 0, 0, 0);

        Y_VERIFY(!(Kevent(Fd_, e, 2, nullptr, 0, nullptr) == -1 && errno != ENOENT), "kevent remove failed: %s", LastSystemErrorText());
    }

    inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
        struct timespec ts;

        ts.tv_sec = timeout / 1000000;
        ts.tv_nsec = (timeout % 1000000) * 1000;

        const int ret = Kevent(Fd_, nullptr, 0, events, len, &ts);

        Y_VERIFY(ret >= 0, "kevent failed: %s", LastSystemErrorText());

        return (size_t)ret;
    }

    static inline void* ExtractEvent(const TEvent* event) noexcept {
        return event->udata;
    }

    static inline int ExtractStatus(const TEvent* event) noexcept {
        if (event->flags & EV_ERROR) {
            return EIO;
        }

        return event->fflags;
    }

    static inline int ExtractFilterImpl(const TEvent* event) noexcept {
        if (event->filter == EVFILT_READ) {
            return CONT_POLL_READ;
        }

        if (event->filter == EVFILT_WRITE) {
            return CONT_POLL_WRITE;
        }

        if (event->flags & EV_EOF) {
            return CONT_POLL_READ | CONT_POLL_WRITE;
        }

        return 0;
    }

private:
    int Fd_;
};
#endif

#if defined(HAVE_EPOLL_POLLER)
static inline int ContEpollWait(int epfd, struct epoll_event* events, int maxevents, int timeout) noexcept {
    int ret;

    do {
        ret = epoll_wait(epfd, events, maxevents, Min<int>(timeout, 35 * 60 * 1000));
    } while (ret == -1 && errno == EINTR);

    return ret;
}

template <class TLockPolicy>
class TEpollPoller {
public:
    typedef struct ::epoll_event TEvent;

    inline TEpollPoller(bool closeOnExec = false)
        : Fd_(epoll_create1(closeOnExec ? EPOLL_CLOEXEC : 0))
    {
        if (Fd_ == -1) {
            ythrow TSystemError() << "epoll_create failed";
        }
    }

    inline ~TEpollPoller() {
        close(Fd_);
    }

    inline int Fd() const noexcept {
        return Fd_;
    }

    inline void SetImpl(void* data, int fd, int what) {
        TEvent e;

        Zero(e);

        if (what & CONT_POLL_EDGE_TRIGGERED) {
            if (what & CONT_POLL_BACKLOG_EMPTY) {
                // When backlog is empty, edge-triggered does not need restart.
                return;
            }
            e.events |= EPOLLET;
        }

        if (what & CONT_POLL_ONE_SHOT) {
            e.events |= EPOLLONESHOT;
        }

        if (what & CONT_POLL_READ) {
            e.events |= EPOLLIN;
        }

        if (what & CONT_POLL_WRITE) {
            e.events |= EPOLLOUT;
        }

        if (what & CONT_POLL_RDHUP) {
            e.events |= EPOLLRDHUP;
        }

        e.data.ptr = data;

        if ((what & CONT_POLL_MODIFY) || epoll_ctl(Fd_, EPOLL_CTL_ADD, fd, &e) == -1) {
            if (epoll_ctl(Fd_, EPOLL_CTL_MOD, fd, &e) == -1) {
                ythrow TSystemError() << "epoll add failed";
            }
        }
    }

    inline void Remove(int fd) noexcept {
        TEvent e;

        Zero(e);

        epoll_ctl(Fd_, EPOLL_CTL_DEL, fd, &e);
    }

    inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
        const int ret = ContEpollWait(Fd_, events, len, MicroToMilli(timeout));

        Y_VERIFY(ret >= 0, "epoll wait error: %s", LastSystemErrorText());

        return (size_t)ret;
    }

    static inline void* ExtractEvent(const TEvent* event) noexcept {
        return event->data.ptr;
    }

    static inline int ExtractStatus(const TEvent* event) noexcept {
        if (event->events & (EPOLLERR | EPOLLHUP)) {
            return EIO;
        }

        return 0;
    }

    static inline int ExtractFilterImpl(const TEvent* event) noexcept {
        int ret = 0;

        if (event->events & EPOLLIN) {
            ret |= CONT_POLL_READ;
        }

        if (event->events & EPOLLOUT) {
            ret |= CONT_POLL_WRITE;
        }

        if (event->events & EPOLLRDHUP) {
            ret |= CONT_POLL_RDHUP;
        }

        return ret;
    }

private:
    int Fd_;
};
#endif

#if defined(HAVE_SELECT_POLLER)
    #include <util/memory/tempbuf.h>
    #include <util/generic/hash.h>

    #include "pair.h"

static inline int ContSelect(int n, fd_set* r, fd_set* w, fd_set* e, struct timeval* t) noexcept {
    int ret;

    do {
        ret = select(n, r, w, e, t);
    } while (ret == -1 && errno == EINTR);

    return ret;
}

struct TSelectPollerNoTemplate {
    struct THandle {
        void* Data_;
        int Filter_;

        inline THandle()
            : Data_(nullptr)
            , Filter_(0)
        {
        }

        inline void* Data() const noexcept {
            return Data_;
        }

        inline void Set(void* d, int s) noexcept {
            Data_ = d;
            Filter_ = s;
        }

        inline void Clear(int c) noexcept {
            Filter_ &= ~c;
        }

        inline int Filter() const noexcept {
            return Filter_;
        }
    };

    class TFds: public THashMap<SOCKET, THandle> {
    public:
        inline void Set(SOCKET fd, void* data, int filter) {
            (*this)[fd].Set(data, filter);
        }

        inline void Remove(SOCKET fd) {
            erase(fd);
        }

        inline SOCKET Build(fd_set* r, fd_set* w, fd_set* e) const noexcept {
            SOCKET ret = 0;

            for (const auto& it : *this) {
                const SOCKET fd = it.first;
                const THandle& handle = it.second;

                FD_SET(fd, e);

                if (handle.Filter() & CONT_POLL_READ) {
                    FD_SET(fd, r);
                }

                if (handle.Filter() & CONT_POLL_WRITE) {
                    FD_SET(fd, w);
                }

                if (fd > ret) {
                    ret = fd;
                }
            }

            return ret;
        }
    };

    struct TEvent: public THandle {
        inline int Status() const noexcept {
            return -Min(Filter(), 0);
        }

        inline void Error(void* d, int err) noexcept {
            Set(d, -err);
        }

        inline void Success(void* d, int what) noexcept {
            Set(d, what);
        }
    };
};

template <class TLockPolicy>
class TSelectPoller: public TSelectPollerNoTemplate {
    using TMyMutex = typename TLockPolicy::TMyMutex;

public:
    inline TSelectPoller()
        : Begin_(nullptr)
        , End_(nullptr)
    {
        SocketPair(Signal_);
        SetNonBlock(WaitSock());
        SetNonBlock(SigSock());
    }

    inline ~TSelectPoller() {
        closesocket(Signal_[0]);
        closesocket(Signal_[1]);
    }

    inline void SetImpl(void* data, SOCKET fd, int what) {
        with_lock (CommandLock_) {
            Commands_.push_back(TCommand(fd, what, data));
        }

        Signal();
    }

    inline void Remove(SOCKET fd) noexcept {
        with_lock (CommandLock_) {
            Commands_.push_back(TCommand(fd, 0));
        }

        Signal();
    }

    inline size_t Wait(TEvent* events, size_t len, int timeout) noexcept {
        auto guard = Guard(Lock_);

        do {
            if (Begin_ != End_) {
                const size_t ret = Min<size_t>(End_ - Begin_, len);

                memcpy(events, Begin_, sizeof(*events) * ret);
                Begin_ += ret;

                return ret;
            }

            if (len >= EventNumberHint()) {
                return WaitBase(events, len, timeout);
            }

            Begin_ = SavedEvents();
            End_ = Begin_ + WaitBase(Begin_, EventNumberHint(), timeout);
        } while (Begin_ != End_);

        return 0;
    }

    inline TEvent* SavedEvents() {
        if (!SavedEvents_) {
            SavedEvents_.Reset(new TEvent[EventNumberHint()]);
        }

        return SavedEvents_.Get();
    }

    inline size_t WaitBase(TEvent* events, size_t len, int timeout) noexcept {
        with_lock (CommandLock_) {
            for (auto command = Commands_.begin(); command != Commands_.end(); ++command) {
                if (command->Filter_ != 0) {
                    Fds_.Set(command->Fd_, command->Cookie_, command->Filter_);
                } else {
                    Fds_.Remove(command->Fd_);
                }
            }

            Commands_.clear();
        }

        TTempBuf tmpBuf(3 * sizeof(fd_set) + Fds_.size() * sizeof(SOCKET));

        fd_set* in = (fd_set*)tmpBuf.Data();
        fd_set* out = &in[1];
        fd_set* errFds = &in[2];

        SOCKET* keysToDeleteBegin = (SOCKET*)&in[3];
        SOCKET* keysToDeleteEnd = keysToDeleteBegin;

    #if defined(_msan_enabled_) // msan doesn't handle FD_ZERO and cause false positive BALANCER-1347
        memset(in, 0, sizeof(*in));
        memset(out, 0, sizeof(*out));
        memset(errFds, 0, sizeof(*errFds));
    #endif

        FD_ZERO(in);
        FD_ZERO(out);
        FD_ZERO(errFds);

        FD_SET(WaitSock(), in);

        const SOCKET maxFdNum = Max(Fds_.Build(in, out, errFds), WaitSock());
        struct timeval tout;

        tout.tv_sec = timeout / 1000000;
        tout.tv_usec = timeout % 1000000;

        int ret = ContSelect(int(maxFdNum + 1), in, out, errFds, &tout);

        if (ret > 0 && FD_ISSET(WaitSock(), in)) {
            --ret;
            TryWait();
        }

        Y_VERIFY(ret >= 0 && (size_t)ret <= len, "select error: %s", LastSystemErrorText());

        TEvent* eventsStart = events;

        for (typename TFds::iterator it = Fds_.begin(); it != Fds_.end(); ++it) {
            const SOCKET fd = it->first;
            THandle& handle = it->second;

            if (FD_ISSET(fd, errFds)) {
                (events++)->Error(handle.Data(), EIO);

                if (handle.Filter() & CONT_POLL_ONE_SHOT) {
                    *keysToDeleteEnd = fd;
                    ++keysToDeleteEnd;
                }

            } else {
                int what = 0;

                if (FD_ISSET(fd, in)) {
                    what |= CONT_POLL_READ;
                }

                if (FD_ISSET(fd, out)) {
                    what |= CONT_POLL_WRITE;
                }

                if (what) {
                    (events++)->Success(handle.Data(), what);

                    if (handle.Filter() & CONT_POLL_ONE_SHOT) {
                        *keysToDeleteEnd = fd;
                        ++keysToDeleteEnd;
                    }

                    if (handle.Filter() & CONT_POLL_EDGE_TRIGGERED) {
                        // Emulate edge-triggered for level-triggered select().
                        // User must restart waiting this event when needed.
                        handle.Clear(what);
                    }
                }
            }
        }

        while (keysToDeleteBegin != keysToDeleteEnd) {
            Fds_.erase(*keysToDeleteBegin);
            ++keysToDeleteBegin;
        }

        return events - eventsStart;
    }

    inline size_t EventNumberHint() const noexcept {
        return sizeof(fd_set) * 8 * 2;
    }

    static inline void* ExtractEvent(const TEvent* event) noexcept {
        return event->Data();
    }

    static inline int ExtractStatus(const TEvent* event) noexcept {
        return event->Status();
    }

    static inline int ExtractFilterImpl(const TEvent* event) noexcept {
        return event->Filter();
    }

private:
    inline void Signal() noexcept {
        char ch = 13;

        send(SigSock(), &ch, 1, 0);
    }

    inline void TryWait() {
        char ch[32];

        while (recv(WaitSock(), ch, sizeof(ch), 0) > 0) {
            Y_ASSERT(ch[0] == 13);
        }
    }

    inline SOCKET WaitSock() const noexcept {
        return Signal_[1];
    }

    inline SOCKET SigSock() const noexcept {
        return Signal_[0];
    }

private:
    struct TCommand {
        SOCKET Fd_;
        int Filter_; // 0 to remove
        void* Cookie_;

        TCommand(SOCKET fd, int filter, void* cookie)
            : Fd_(fd)
            , Filter_(filter)
            , Cookie_(cookie)
        {
        }

        TCommand(SOCKET fd, int filter)
            : Fd_(fd)
            , Filter_(filter)
        {
        }
    };

    TFds Fds_;

    TMyMutex Lock_;
    TArrayHolder<TEvent> SavedEvents_;
    TEvent* Begin_;
    TEvent* End_;

    TMyMutex CommandLock_;
    TVector<TCommand> Commands_;

    SOCKET Signal_[2];
};
#endif

static inline TDuration PollStep(const TInstant& deadLine, const TInstant& now) noexcept {
    if (deadLine < now) {
        return TDuration::Zero();
    }

    return Min(deadLine - now, TDuration::Seconds(1000));
}

template <class TBase>
class TGenericPoller: public TBase {
public:
    using TBase::TBase;

    using TEvent = typename TBase::TEvent;

    inline void Set(void* data, SOCKET fd, int what) {
        if (what) {
            this->SetImpl(data, fd, what);
        } else {
            this->Remove(fd);
        }
    }

    static inline int ExtractFilter(const TEvent* event) noexcept {
        if (TBase::ExtractStatus(event)) {
            return CONT_POLL_READ | CONT_POLL_WRITE | CONT_POLL_RDHUP;
        }

        return TBase::ExtractFilterImpl(event);
    }

    inline size_t WaitD(TEvent* events, size_t len, TInstant deadLine, TInstant now = TInstant::Now()) noexcept {
        if (!len) {
            return 0;
        }

        size_t ret;

        do {
            ret = this->Wait(events, len, (int)PollStep(deadLine, now).MicroSeconds());
        } while (!ret && ((now = TInstant::Now()) < deadLine));

        return ret;
    }
};

#if defined(HAVE_KQUEUE_POLLER)
    #define TPollerImplBase TKqueuePoller
#elif defined(HAVE_EPOLL_POLLER)
    #define TPollerImplBase TEpollPoller
#elif defined(HAVE_SELECT_POLLER)
    #define TPollerImplBase TSelectPoller
#else
    #error "unsupported platform"
#endif

template <class TLockPolicy>
using TPollerImpl = TGenericPoller<TPollerImplBase<TLockPolicy>>;

#undef TPollerImplBase