aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/yt/threading/event_count.h
blob: 422cb23d77cacd67e19216895fdd443e9b42290d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#pragma once

#ifndef _linux_
    #include <util/system/mutex.h>
    #include <util/system/condvar.h>
#endif

#include <limits>
#include <atomic>

namespace NYT::NThreading {

////////////////////////////////////////////////////////////////////////////////

//! Event count: a condition variable for lock free algorithms.
/*!
 * This is an adapted version from Facebook's Folly. See
 * https://raw.github.com/facebook/folly/master/folly/experimental/EventCount.h
 * http://www.1024cores.net/home/lock-free-algorithms/eventcounts
 * for details.
 *
 * Event counts allow you to convert a non-blocking lock-free / wait-free
 * algorithm into a blocking one, by isolating the blocking logic. You call
 * PrepareWait() before checking your condition and then either CancelWait()
 * or Wait() depending on whether the condition was true. When another
 * thread makes the condition true, it must call NotifyOne() / NotifyAll() just
 * like a regular condition variable.
 *
 * Let "<" denote the happens-before relationship.
 * Consider 2 threads (T1 and T2) and 3 events:
 * - E1: T1 returns from PrepareWait
 * - E2: T1 calls Wait (obviously E1 < E2, intra-thread)
 * - E3: T2 calls NotifyAll
 *
 * If E1 < E3, then E2's Wait() will complete (and T1 will either wake up,
 * or not block at all)
 *
 * This means that you can use an EventCount in the following manner:
 *
 * Waiter:
 *   if (!condition()) { // Handle fast path first.
 *     for (;;) {
 *       auto cookie = ec.PrepareWait();
 *       if (condition()) {
 *         ec.CancelWait();
 *         break;
 *       } else {
 *         ec.Wait(cookie);
 *       }
 *     }
 *  }
 *
 *  (This pattern is encapsulated in Await())
 *
 * Poster:
 *   ... make condition true...
 *   ec.NotifyAll();
 *
 * Note that, just like with regular condition variables, the waiter needs to
 * be tolerant of spurious wakeups and needs to recheck the condition after
 * being woken up. Also, as there is no mutual exclusion implied, "checking"
 * the condition likely means attempting an operation on an underlying
 * data structure (push into a lock-free queue, etc) and returning true on
 * success and false on failure.
 */
class TEventCount final
{
public:
    TEventCount() = default;
    TEventCount(const TEventCount&) = delete;
    TEventCount(TEventCount&&) = delete;

    class TCookie
    {
    public:
        explicit TCookie(ui32 epoch)
            : Epoch_(epoch)
        { }

    private:
        friend class TEventCount;
        ui32 Epoch_;
    };

    void NotifyOne();
    void NotifyAll();
    void NotifyMany(int count);

    TCookie PrepareWait();
    void CancelWait();
    bool Wait(TCookie cookie, TInstant deadline = TInstant::Max());
    bool Wait(TCookie cookie, TDuration timeout);

    //! Wait for |condition()| to become |true|.
    //! Will clean up appropriately if |condition()| throws, and then rethrow.
    template <class TCondition>
    bool Await(TCondition&& condition, TInstant deadline = TInstant::Max());
    template <class TCondition>
    bool Await(TCondition&& condition, TDuration timeout);

private:
    //! Lower 32 bits: number of waiters.
    //! Upper 32 bits: epoch
    std::atomic<ui64> Value_ = 0;

    static constexpr ui64 AddWaiter  = static_cast<ui64>(1);
    static constexpr ui64 SubWaiter  = static_cast<ui64>(-1);

    static constexpr ui64 EpochShift = 32;
    static constexpr ui64 AddEpoch   = static_cast<ui64>(1) << EpochShift;

    static constexpr ui64 WaiterMask = AddEpoch - 1;

#ifndef _linux_
    TCondVar ConditionVariable_;
    TMutex Mutex_;
#endif
};

////////////////////////////////////////////////////////////////////////////////

//! A single-shot non-resettable event implemented on top of TEventCount.
class TEvent final
{
public:
    TEvent() = default;
    TEvent(const TEvent&) = delete;
    TEvent(TEvent&&) = delete;

    void NotifyOne();
    void NotifyAll();

    bool Test() const;
    bool Wait(TInstant deadline = TInstant::Max());
    bool Wait(TDuration timeout);

private:
    std::atomic<bool> Set_ = false;
    TEventCount EventCount_;
};

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NThreading

#define EVENT_COUNT_INL_H_
#include "event_count-inl.h"
#undef EVENT_COUNT_INL_H_