aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/asio/poll_interrupter.h
blob: faf815c5129333667e3a6ff9c778f91c7c93b853 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#pragma once

#include <util/system/defaults.h>
#include <util/generic/yexception.h>
#include <util/network/socket.h>
#include <util/system/pipe.h>

#ifdef _linux_
#include <sys/eventfd.h>
#endif

#if defined(_bionic_) && !defined(EFD_SEMAPHORE)
#define EFD_SEMAPHORE 1
#endif

namespace NAsio {
#ifdef _linux_
    class TEventFdPollInterrupter {
    public:
        inline TEventFdPollInterrupter() {
            F_ = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
            if (F_ < 0) {
                ythrow TFileError() << "failed to create a eventfd";
            }
        }

        inline ~TEventFdPollInterrupter() {
            close(F_);
        }

        inline void Interrupt() const noexcept {
            const static eventfd_t ev(1);
            ssize_t res = ::write(F_, &ev, sizeof ev);
            Y_UNUSED(res);
        }

        inline bool Reset() const noexcept {
            eventfd_t ev(0);

            for (;;) {
                ssize_t res = ::read(F_, &ev, sizeof ev);
                if (res && res == EINTR) {
                    continue;
                }

                return res > 0;
            }
        }

        int Fd() {
            return F_;
        }

    private:
        int F_;
    };
#endif

    class TPipePollInterrupter {
    public:
        TPipePollInterrupter() {
            TPipeHandle::Pipe(S_[0], S_[1]);

            SetNonBlock(S_[0]);
            SetNonBlock(S_[1]);
        }

        inline void Interrupt() const noexcept {
            char byte = 0;
            ssize_t res = S_[1].Write(&byte, 1);
            Y_UNUSED(res);
        }

        inline bool Reset() const noexcept {
            char buff[256];

            for (;;) {
                ssize_t r = S_[0].Read(buff, sizeof buff);

                if (r < 0 && r == EINTR) {
                    continue;
                }

                bool wasInterrupted = r > 0;

                while (r == sizeof buff) {
                    r = S_[0].Read(buff, sizeof buff);
                }

                return wasInterrupted;
            }
        }

        PIPEHANDLE Fd() const noexcept {
            return S_[0];
        }

    private:
        TPipeHandle S_[2];
    };

#ifdef _linux_
    typedef TEventFdPollInterrupter TPollInterrupter; //more effective than pipe, but only linux impl.
#else
    typedef TPipePollInterrupter TPollInterrupter;
#endif
}