aboutsummaryrefslogblamecommitdiffstats
path: root/library/cpp/threading/hot_swap/hot_swap.h
blob: e155614b533f6f65f2b133e399629ae21ea8797d (plain) (tree)






























































































































































































                                                                                                                                               
#pragma once

#include <util/generic/cast.h>
#include <util/generic/ptr.h>
#include <util/generic/utility.h>
#include <library/cpp/deprecated/atomic/atomic.h>
#include <util/system/guard.h>
#include <util/system/spinlock.h>
#include <util/system/yassert.h>

namespace NHotSwapPrivate {
    // Special guard object for THotSwap
    class TWriterLock {
    public:
        // Implements multi-lock wait-free interface for readers
        void Acquire() noexcept;
        void Release() noexcept;

        void WaitAllReaders() const noexcept;

    private:
        TAtomic ReadersCount = 0;
    };

}

/// Object container that can be switched to another such object concurrently.
/// T must support atomic reference counting
///
/// Typical usage is when we have rarely changed, but frequently used data.
/// If we want to use reference counting, we can't concurrently change and read
/// intrusive pointer without extra synchronization.
/// This class provides such synchronization mechanism with minimal read time.
///
///
/// Usage sample
///
/// THotSwap<T> Obj;
///
/// thread 1:
/// ...
/// TIntrusivePtr<T> obj = Obj.AtomicLoad(); // get current object
/// ... use of obj
///
/// thread 2:
/// ...
/// Obj.AtomicStore(new T()); // set new object
///
template <class T, class Ops = TDefaultIntrusivePtrOps<T>>
class THotSwap {
public:
    using TPtr = TIntrusivePtr<T, Ops>;

public:
    THotSwap() noexcept {
    }

    explicit THotSwap(T* p) noexcept {
        AtomicStore(p);
    }

    explicit THotSwap(const TPtr& p) noexcept
        : THotSwap(p.Get())
    {
    }

    THotSwap(const THotSwap& p) noexcept
        : THotSwap(p.AtomicLoad())
    {
    }

    THotSwap(THotSwap&& other) noexcept {
        DoSwap(RawPtr, other.RawPtr); // we don't need thread safety, because both objects are local
    }

    ~THotSwap() noexcept {
        AtomicStore(nullptr);
    }

    THotSwap& operator=(const THotSwap& p) noexcept {
        AtomicStore(p.AtomicLoad());
        return *this;
    }

    /// Wait-free read pointer to object
    ///
    /// @returns          Current value of stored object
    TPtr AtomicLoad() const noexcept {
        const TAtomicBase lockIndex = GetLockIndex();
        auto guard = Guard(WriterLocks[lockIndex]); // non-blocking (for other AtomicLoad()'s) guard
        return GetRawPtr();
    }

    /// Update to new object
    ///
    /// @param[in] p      New value to store
    void AtomicStore(T* p) noexcept;

    /// Update to new object
    ///
    /// @param[in] p      New value to store
    void AtomicStore(const TPtr& p) noexcept {
        AtomicStore(p.Get());
    }

private:
    T* GetRawPtr() const noexcept {
        return reinterpret_cast<T*>(AtomicGet(RawPtr));
    }

    TAtomicBase GetLockIndex() const noexcept {
        return AtomicGet(LockIndex);
    }

    TAtomicBase SwitchLockIndex() noexcept; // returns previous index value
    void SwitchRawPtr(T* from, T* to) noexcept;
    void WaitReaders() noexcept;

private:
    TAtomic RawPtr = 0; // T* // Pointer to current value
    static_assert(sizeof(TAtomic) == sizeof(T*), "TAtomic can't represent a pointer value");

    TAdaptiveLock UpdateMutex;                           // Guarantee that AtomicStore() will be one at a time
    mutable NHotSwapPrivate::TWriterLock WriterLocks[2]; // Guarantee that AtomicStore() will wait for all concurrent AtomicLoad()'s completion
    TAtomic LockIndex = 0;
};

// Atomic operations of AtomicLoad:
// r:1 index = LockIndex
// r:2 WriterLocks[index].ReadersCount++
// r:3 p = RawPtr
// r:4 p->RefCount++
// r:5 WriterLocks[index].ReadersCount--

// Important atomic operations of AtomicStore(newRawPtr):
// w:1 RawPtr = newRawPtr
// w:2 LockIndex = 1
// w:3 WriterLocks[0].Wait()
// w:4 LockIndex = 0
// w:5 WriterLocks[1].Wait()

// w:3 (first wait) is needed for sequences:
// r:1-3, w:1-2, r:4-5, w:3-5 // the most frequent case
// w1:1, r:1, w1:2-5, r:2-3, w2:1-2, r:4-5, w2:3-5

// w:5 (second wait) is needed for sequences:
// w1:1-2, r:1, w1:3-5, r:2-3, w2:1-4, r:4-5, w2:5
// If there was only one wait,
// in this case writer wouldn't wait appropriate reader

// w1, w2 - two different writers

template <class T, class Ops>
void THotSwap<T, Ops>::AtomicStore(T* p) noexcept {
    TPtr oldPtr;
    with_lock (UpdateMutex) {
        oldPtr = GetRawPtr();

        SwitchRawPtr(oldPtr.Get(), p);
        Y_ASSERT(!oldPtr || oldPtr.RefCount() > 0);

        // Wait all AtomicLoad()'s to properly take old pointer value concurrently
        WaitReaders();

        // Release lock and then kill (maybe) old object
    }
}

template <class T, class Ops>
TAtomicBase THotSwap<T, Ops>::SwitchLockIndex() noexcept {
    const TAtomicBase prevIndex = AtomicGet(LockIndex);
    Y_ASSERT(prevIndex == 0 || prevIndex == 1);
    AtomicSet(LockIndex, prevIndex ^ 1);
    return prevIndex;
}

template <class T, class Ops>
void THotSwap<T, Ops>::WaitReaders() noexcept {
    WriterLocks[SwitchLockIndex()].WaitAllReaders();
    WriterLocks[SwitchLockIndex()].WaitAllReaders();
}

template <class T, class Ops>
void THotSwap<T, Ops>::SwitchRawPtr(T* from, T* to) noexcept {
    if (to)
        Ops::Ref(to); // Ref() for new value

    AtomicSet(RawPtr, reinterpret_cast<TAtomicBase>(to));

    if (from)
        Ops::UnRef(from); // Unref() for old value
}