aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/yt/threading/event_count.h
blob: 422cb23d77cacd67e19216895fdd443e9b42290d (plain) (tree)


















































































































































                                                                                
#pragma once

#ifndef _linux_
    #include <util/system/mutex.h>
    #include <util/system/condvar.h>
#endif

#include <limits>
#include <atomic>

namespace NYT::NThreading {

////////////////////////////////////////////////////////////////////////////////

//! Event count: a condition variable for lock free algorithms.
/*!
 * This is an adapted version from Facebook's Folly. See
 * https://raw.github.com/facebook/folly/master/folly/experimental/EventCount.h
 * http://www.1024cores.net/home/lock-free-algorithms/eventcounts
 * for details.
 *
 * Event counts allow you to convert a non-blocking lock-free / wait-free
 * algorithm into a blocking one, by isolating the blocking logic. You call
 * PrepareWait() before checking your condition and then either CancelWait()
 * or Wait() depending on whether the condition was true. When another
 * thread makes the condition true, it must call NotifyOne() / NotifyAll() just
 * like a regular condition variable.
 *
 * Let "<" denote the happens-before relationship.
 * Consider 2 threads (T1 and T2) and 3 events:
 * - E1: T1 returns from PrepareWait
 * - E2: T1 calls Wait (obviously E1 < E2, intra-thread)
 * - E3: T2 calls NotifyAll
 *
 * If E1 < E3, then E2's Wait() will complete (and T1 will either wake up,
 * or not block at all)
 *
 * This means that you can use an EventCount in the following manner:
 *
 * Waiter:
 *   if (!condition()) { // Handle fast path first.
 *     for (;;) {
 *       auto cookie = ec.PrepareWait();
 *       if (condition()) {
 *         ec.CancelWait();
 *         break;
 *       } else {
 *         ec.Wait(cookie);
 *       }
 *     }
 *  }
 *
 *  (This pattern is encapsulated in Await())
 *
 * Poster:
 *   ... make condition true...
 *   ec.NotifyAll();
 *
 * Note that, just like with regular condition variables, the waiter needs to
 * be tolerant of spurious wakeups and needs to recheck the condition after
 * being woken up. Also, as there is no mutual exclusion implied, "checking"
 * the condition likely means attempting an operation on an underlying
 * data structure (push into a lock-free queue, etc) and returning true on
 * success and false on failure.
 */
class TEventCount final
{
public:
    TEventCount() = default;
    TEventCount(const TEventCount&) = delete;
    TEventCount(TEventCount&&) = delete;

    class TCookie
    {
    public:
        explicit TCookie(ui32 epoch)
            : Epoch_(epoch)
        { }

    private:
        friend class TEventCount;
        ui32 Epoch_;
    };

    void NotifyOne();
    void NotifyAll();
    void NotifyMany(int count);

    TCookie PrepareWait();
    void CancelWait();
    bool Wait(TCookie cookie, TInstant deadline = TInstant::Max());
    bool Wait(TCookie cookie, TDuration timeout);

    //! Wait for |condition()| to become |true|.
    //! Will clean up appropriately if |condition()| throws, and then rethrow.
    template <class TCondition>
    bool Await(TCondition&& condition, TInstant deadline = TInstant::Max());
    template <class TCondition>
    bool Await(TCondition&& condition, TDuration timeout);

private:
    //! Lower 32 bits: number of waiters.
    //! Upper 32 bits: epoch
    std::atomic<ui64> Value_ = 0;

    static constexpr ui64 AddWaiter  = static_cast<ui64>(1);
    static constexpr ui64 SubWaiter  = static_cast<ui64>(-1);

    static constexpr ui64 EpochShift = 32;
    static constexpr ui64 AddEpoch   = static_cast<ui64>(1) << EpochShift;

    static constexpr ui64 WaiterMask = AddEpoch - 1;

#ifndef _linux_
    TCondVar ConditionVariable_;
    TMutex Mutex_;
#endif
};

////////////////////////////////////////////////////////////////////////////////

//! A single-shot non-resettable event implemented on top of TEventCount.
class TEvent final
{
public:
    TEvent() = default;
    TEvent(const TEvent&) = delete;
    TEvent(TEvent&&) = delete;

    void NotifyOne();
    void NotifyAll();

    bool Test() const;
    bool Wait(TInstant deadline = TInstant::Max());
    bool Wait(TDuration timeout);

private:
    std::atomic<bool> Set_ = false;
    TEventCount EventCount_;
};

////////////////////////////////////////////////////////////////////////////////

} // namespace NYT::NThreading

#define EVENT_COUNT_INL_H_
#include "event_count-inl.h"
#undef EVENT_COUNT_INL_H_