aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/yt/threading/notification_handle.cpp
blob: fea720670edf4c6ca87f188963e2a029264d296c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#include "notification_handle.h"

#include <library/cpp/yt/exception/exception.h>

#include <library/cpp/yt/system/handle_eintr.h>

#include <library/cpp/yt/assert/assert.h>

#ifdef _linux_
    #include <unistd.h>
    #include <sys/eventfd.h>
#endif

#ifdef _darwin_
    #include <fcntl.h>
    #include <unistd.h>
#endif

#ifdef _win_
    #include <util/network/socket.h>
#endif

namespace NYT::NThreading {

////////////////////////////////////////////////////////////////////////////////

TNotificationHandle::TNotificationHandle(bool blocking)
{
#ifdef _linux_
    EventFD_ = HandleEintr(
        eventfd,
        0,
        EFD_CLOEXEC | (blocking ? 0 : EFD_NONBLOCK));
    if (EventFD_ < 0) {
        throw TSimpleException("Error creating notification handle");
    }
#elif defined(_win_)
    TPipeHandle::Pipe(Reader_, Writer_, EOpenModeFlag::CloseOnExec);
    if (!blocking) {
        SetNonBlock(Reader_);
    }
#else
#ifdef _darwin_
    YT_VERIFY(HandleEintr(pipe, PipeFDs_) == 0);
#else
    YT_VERIFY(HandleEintr(pipe2, PipeFDs_, O_CLOEXEC) == 0);
#endif
    if (!blocking) {
        YT_VERIFY(fcntl(PipeFDs_[0], F_SETFL, O_NONBLOCK) == 0);
    }
#endif
}

TNotificationHandle::~TNotificationHandle()
{
#ifdef _linux_
    YT_VERIFY(HandleEintr(close, EventFD_) == 0);
#elif !defined(_win_)
    YT_VERIFY(HandleEintr(close, PipeFDs_[0]) == 0);
    YT_VERIFY(HandleEintr(close, PipeFDs_[1]) == 0);
#endif
}

void TNotificationHandle::Raise()
{
#ifdef _linux_
    uint64_t one = 1;
    YT_VERIFY(HandleEintr(write, EventFD_, &one, sizeof(one)) == sizeof(one));
#elif defined(_win_)
    char c = 'x';
    YT_VERIFY(Writer_.Write(&c, sizeof(char)) == sizeof(char));
#else
    char c = 'x';
    YT_VERIFY(HandleEintr(write, PipeFDs_[1], &c, sizeof(char)) == sizeof(char));
#endif
}

void TNotificationHandle::Clear()
{
#ifdef _linux_
    uint64_t count = 0;
    auto ret = HandleEintr(read, EventFD_, &count, sizeof(count));
    // For edge-triggered one could clear multiple events, others get nothing.
    YT_VERIFY(ret == sizeof(count) || (ret < 0 && errno == EAGAIN));
#elif defined(_win_)
    while (true) {
        char c;
        auto ret = Reader_.Read(&c, sizeof(c));
        YT_VERIFY(ret == sizeof(c) || (ret == SOCKET_ERROR && WSAGetLastError() == WSAEWOULDBLOCK));
        if (ret == SOCKET_ERROR) {
            break;
        }
    }
#else
    while (true) {
        char c;
        auto ret = HandleEintr(read, PipeFDs_[0], &c, sizeof(c));
        YT_VERIFY(ret == sizeof(c) || (ret < 0 && errno == EAGAIN));
        if (ret < 0) {
            break;
        }
    }
#endif
}

int TNotificationHandle::GetFD() const
{
#ifdef _linux_
    return EventFD_;
#elif defined(_win_)
    return Reader_;
#else
    return PipeFDs_[0];
#endif
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NThreading