aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/yt/threading/event_count-inl.h
blob: 18f7bfec52f27337f6b913a8f8c87c10703570f7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#ifndef EVENT_COUNT_INL_H_
#error "Direct inclusion of this file is not allowed, include event_count.h"
// For the sake of sane code completion.
#include "event_count.h"
#endif
#undef EVENT_COUNT_INL_H_

#include <library/cpp/yt/assert/assert.h>

#include "futex.h"

#include <errno.h>

namespace NYT::NThreading {

////////////////////////////////////////////////////////////////////////////////

inline void TEventCount::NotifyOne()
{
    NotifyMany(1);
}

inline void TEventCount::NotifyAll()
{
    NotifyMany(std::numeric_limits<int>::max());
}

inline void TEventCount::NotifyMany(int count)
{
    // The order is important: Epoch is incremented before Waiters is checked.
    // prepareWait() increments Waiters before checking Epoch, so it is
    // impossible to miss a wakeup.
#ifndef _linux_
    TGuard<TMutex> guard(Mutex_);
#endif

    ui64 prev = Value_.fetch_add(AddEpoch, std::memory_order::acq_rel);
    if (Y_UNLIKELY((prev & WaiterMask) != 0)) {
#ifdef _linux_
        FutexWake(
            reinterpret_cast<int*>(&Value_) + 1, // assume little-endian architecture
            count);
#else
        if (count == 1) {
            ConditionVariable_.Signal();
        } else {
            ConditionVariable_.BroadCast();
        }
#endif
    }
}

inline TEventCount::TCookie TEventCount::PrepareWait()
{
    ui64 value = Value_.load(std::memory_order::acquire);
    return TCookie(static_cast<ui32>(value >> EpochShift));
}

inline void TEventCount::CancelWait()
{ }

inline bool TEventCount::Wait(TCookie cookie, TInstant deadline)
{
    Value_.fetch_add(AddWaiter, std::memory_order::acq_rel);

    bool result = true;
#ifdef _linux_
    while ((Value_.load(std::memory_order::acquire) >> EpochShift) == cookie.Epoch_) {
        auto timeout = deadline - TInstant::Now();

        auto futexResult = FutexWait(
            reinterpret_cast<int*>(&Value_) + 1, // assume little-endian architecture
            cookie.Epoch_,
            timeout);

        if (futexResult != 0 && errno == ETIMEDOUT) {
            result = false;
            break;
        }
    }
#else
    TGuard<TMutex> guard(Mutex_);
    if ((Value_.load(std::memory_order::acquire) >> EpochShift) == cookie.Epoch_) {
        result = ConditionVariable_.WaitD(Mutex_, deadline);
    }
#endif
    ui64 prev = Value_.fetch_add(SubWaiter, std::memory_order::seq_cst);
    YT_ASSERT((prev & WaiterMask) != 0);
    return result;
}

inline bool TEventCount::Wait(TCookie cookie, TDuration timeout)
{
    return Wait(cookie, timeout.ToDeadLine());
}

template <class TCondition>
bool TEventCount::Await(TCondition&& condition, TInstant deadline)
{
    if (condition()) {
        // Fast path.
        return true;
    }

    // condition() is the only thing that may throw, everything else is
    // noexcept, so we can hoist the try/catch block outside of the loop
    try {
        for (;;) {
            auto cookie = PrepareWait();
            if (condition()) {
                CancelWait();
                break;
            }
            if (!Wait(cookie, deadline)) {
                return false;
            }
        }
    } catch (...) {
        CancelWait();
        throw;
    }
    return true;
}

template <class TCondition>
bool TEventCount::Await(TCondition&& condition, TDuration timeout)
{
    return Await(std::forward<TCondition>(condition), timeout.ToDeadLine());
}
////////////////////////////////////////////////////////////////////////////////

inline void TEvent::NotifyOne()
{
    Set_.store(true, std::memory_order::release);
    EventCount_.NotifyOne();
}

inline void TEvent::NotifyAll()
{
    Set_.store(true, std::memory_order::release);
    EventCount_.NotifyAll();
}

inline bool TEvent::Test() const
{
    return Set_.load(std::memory_order::acquire);
}

inline bool TEvent::Wait(TInstant deadline)
{
    return EventCount_.Await(
        [&] {
            return Set_.load(std::memory_order::acquire);
        },
        deadline);
}

inline bool TEvent::Wait(TDuration timeout)
{
    return Wait(timeout.ToDeadLine());
}

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NThreading