diff options
author | pavook <pavook@yandex-team.com> | 2025-04-17 14:21:12 +0300 |
---|---|---|
committer | pavook <pavook@yandex-team.com> | 2025-04-17 14:47:23 +0300 |
commit | 6b567e38e0404cb1f94132fb48f6cb4b8ab1c800 (patch) | |
tree | 3550eaa8af61186a1ddcc034da85d4e05bdd6330 | |
parent | b2a08a179200e82675d3b171b2b76a920e758d74 (diff) | |
download | ydb-6b567e38e0404cb1f94132fb48f6cb4b8ab1c800.tar.gz |
YT-24537: Prioritize writers in TReaderWriterSpinLock, rename old version to TWriterStarvingRWLock
Previously, it was possible that `TReaderWriterSpinLock` wouldn't let the writer through if there's a steady flow of readers.
This change addresses that by:
1. Prioritizing writers inside the spinlock by adding an additional `WriterReady` flag that writers set on arrival. This flag doesn't allow any readers to come through.
2. Adding the proper tests to verify this functionality, as well as spinlock's behaviour under forks.
3. Clarifying the documentation about spinlock guarantees
4. Adding a TLA+ model, formally specifying and verifying the guarantees of the new spinlock.
5. Renaming the old lock to `TWriterStarvingRWSpinLock`, and replacing all usages inside YT with the new version (renaming all usages outside of YT to the WriterStarving version).
This is a second attempt of REVIEW: 8233768, the first one was rolled back as it lead to deadlocks in user code with reentrant reader locks:
the case of `AcquireReader(thread0) -> AcquireWriter(thread1) -> AcquireReader(thread0)` is a deadlock, as `thread0` will not be able to acquire the lock (for the second time) before `thread1` frees writer lock, and `thread1` will not be able to acquire writer lock before the reader lock will be released by `thread0`, which won't happen until `thread0` acquires the lock for the second time. See/for more context and a real example of such situation. Analogous problem can happen with fibers: this is why you shouldn't allow context switches under the lock.
Wondering why this ugly name `WriterStarvingRWSpinLock` appeared in your beautiful code? No worries, if you are **sure** that you don't use reentrant locks or fiber switches under the lock, you can freely replace your usage with the new `ReaderWriterSpinLock`. The replacement is drop-in.
[nodiff:caesar]
commit_hash:97683f854defca00cc283f5a2a10a1730b3c9174
-rw-r--r-- | library/cpp/yt/threading/atomic_object.h | 2 | ||||
-rw-r--r-- | library/cpp/yt/threading/rw_spin_lock-inl.h | 37 | ||||
-rw-r--r-- | library/cpp/yt/threading/rw_spin_lock.h | 39 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp | 56 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp | 160 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/ya.make | 5 | ||||
-rw-r--r-- | library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h | 101 | ||||
-rw-r--r-- | library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp | 25 | ||||
-rw-r--r-- | library/cpp/yt/threading/writer_starving_rw_spin_lock.h | 115 | ||||
-rw-r--r-- | library/cpp/yt/threading/ya.make | 1 |
10 files changed, 521 insertions, 20 deletions
diff --git a/library/cpp/yt/threading/atomic_object.h b/library/cpp/yt/threading/atomic_object.h index 8b642c0f4fb..a77ade0a00d 100644 --- a/library/cpp/yt/threading/atomic_object.h +++ b/library/cpp/yt/threading/atomic_object.h @@ -1,6 +1,6 @@ #pragma once -#include <library/cpp/yt/threading/rw_spin_lock.h> +#include <library/cpp/yt/threading/writer_starving_rw_spin_lock.h> #include <concepts> diff --git a/library/cpp/yt/threading/rw_spin_lock-inl.h b/library/cpp/yt/threading/rw_spin_lock-inl.h index 779de1b64a8..0a31b1d9dec 100644 --- a/library/cpp/yt/threading/rw_spin_lock-inl.h +++ b/library/cpp/yt/threading/rw_spin_lock-inl.h @@ -31,7 +31,7 @@ inline void TReaderWriterSpinLock::AcquireReaderForkFriendly() noexcept inline void TReaderWriterSpinLock::ReleaseReader() noexcept { auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release); - Y_ASSERT((prevValue & ~WriterMask) != 0); + Y_ASSERT((prevValue & ~(WriterMask | WriterReadyMask)) != 0); NDetail::RecordSpinLockReleased(); } @@ -45,14 +45,14 @@ inline void TReaderWriterSpinLock::AcquireWriter() noexcept inline void TReaderWriterSpinLock::ReleaseWriter() noexcept { - auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release); + auto prevValue = Value_.fetch_and(~(WriterMask | WriterReadyMask), std::memory_order::release); Y_ASSERT(prevValue & WriterMask); NDetail::RecordSpinLockReleased(); } inline bool TReaderWriterSpinLock::IsLocked() const noexcept { - return Value_.load() != UnlockedValue; + return (Value_.load() & ~WriterReadyMask) != 0; } inline bool TReaderWriterSpinLock::IsLockedByReader() const noexcept @@ -68,7 +68,7 @@ inline bool TReaderWriterSpinLock::IsLockedByWriter() const noexcept inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept { auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire); - if ((oldValue & WriterMask) != 0) { + if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed); return false; } @@ -79,7 +79,7 @@ inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if ((oldValue & WriterMask) != 0) { + if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { return false; } return TryAcquireReader(); @@ -88,7 +88,7 @@ inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if ((oldValue & WriterMask) != 0) { + if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { return false; } auto newValue = oldValue + ReaderDelta; @@ -98,22 +98,35 @@ inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept return acquired; } -inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept +inline bool TReaderWriterSpinLock::TryAcquireWriterWithExpectedValue(TValue expected) noexcept { - auto expected = UnlockedValue; - - bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire); + bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire); NDetail::RecordSpinLockAcquired(acquired); return acquired; } +inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept +{ + // NB(pavook): we cannot expect writer ready to be set, as this method + // might be called without indicating writer readiness and we cannot + // indicate readiness on the hot path. This means that code calling + // TryAcquireWriter will spin against code calling AcquireWriter. + return TryAcquireWriterWithExpectedValue(UnlockedValue); +} + inline bool TReaderWriterSpinLock::TryAndTryAcquireWriter() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if (oldValue != UnlockedValue) { + + if ((oldValue & WriterReadyMask) == 0) { + oldValue = Value_.fetch_or(WriterReadyMask, std::memory_order::relaxed); + } + + if ((oldValue & (~WriterReadyMask)) != 0) { return false; } - return TryAcquireWriter(); + + return TryAcquireWriterWithExpectedValue(WriterReadyMask); } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h index a915e677e82..64a241bb6b4 100644 --- a/library/cpp/yt/threading/rw_spin_lock.h +++ b/library/cpp/yt/threading/rw_spin_lock.h @@ -16,8 +16,23 @@ namespace NYT::NThreading { //! Single-writer multiple-readers spin lock. /*! - * Reader-side calls are pretty cheap. - * The lock is unfair. + * Reader-side acquires are pretty cheap, and readers don't spin unless writers + * are present. + * + * The lock is unfair, but writers are prioritized over readers, that is, + * if AcquireWriter() is called at some time, then some writer + * (not necessarily the same one that called AcquireWriter) will succeed + * in the next time. This is implemented by an additional flag "WriterReady", + * that writers set on arrival. No readers can proceed until this flag is reset. + * + * WARNING: You probably should not use this lock if forks are possible: see + * fork_aware_rw_spin_lock.h for a proper fork-safe lock which does the housekeeping for you. + * + * WARNING: This lock is not recursive: you can't call AcquireReader() twice in the same + * thread, as that may lead to a deadlock. For the same reason you shouldn't do WaitFor or any + * other context switch under lock. + * + * See tla+/spinlock.tla for the formally verified lock's properties. */ class TReaderWriterSpinLock : public TSpinLockBase @@ -29,18 +44,26 @@ public: /*! * Optimized for the case of read-intensive workloads. * Cheap (just one atomic increment and no spinning if no writers are present). - * Don't use this call if forks are possible: forking at some + * + * WARNING: Don't use this call if forks are possible: forking at some * intermediate point inside #AcquireReader may corrupt the lock state and - * leave lock forever stuck for the child process. + * leave the lock stuck forever for the child process. + * + * WARNING: The lock is not recursive/reentrant, i.e. it assumes that no thread calls + * AcquireReader() if the reader is already acquired for it. */ void AcquireReader() noexcept; //! Acquires the reader lock. /*! * A more expensive version of #AcquireReader (includes at least * one atomic load and CAS; also may spin even if just readers are present). + * * In contrast to #AcquireReader, this method can be used in the presence of forks. - * Note that fork-friendliness alone does not provide fork-safety: additional - * actions must be performed to release the lock after a fork. + * + * WARNING: fork-friendliness alone does not provide fork-safety: additional + * actions must be performed to release the lock after a fork. This means you + * probably should NOT use this lock in the presence of forks, consider + * fork_aware_rw_spin_lock.h instead as a proper fork-safe lock. */ void AcquireReaderForkFriendly() noexcept; //! Tries acquiring the reader lock; see #AcquireReader. @@ -94,10 +117,12 @@ private: using TValue = ui32; static constexpr TValue UnlockedValue = 0; static constexpr TValue WriterMask = 1; - static constexpr TValue ReaderDelta = 2; + static constexpr TValue WriterReadyMask = 2; + static constexpr TValue ReaderDelta = 4; std::atomic<TValue> Value_ = UnlockedValue; + bool TryAcquireWriterWithExpectedValue(TValue expected) noexcept; bool TryAndTryAcquireReader() noexcept; bool TryAndTryAcquireWriter() noexcept; diff --git a/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp new file mode 100644 index 00000000000..653772604ce --- /dev/null +++ b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp @@ -0,0 +1,56 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yt/threading/rw_spin_lock.h> + +#include <util/thread/pool.h> + +#include <latch> +#include <thread> + +namespace NYT::NThreading { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TReaderWriterSpinLockTest, WriterPriority) +{ + int readerThreads = 10; + std::latch latch(readerThreads + 1); + std::atomic<size_t> finishedCount = {0}; + + TReaderWriterSpinLock lock; + + volatile std::atomic<ui32> x = {0}; + + auto readerTask = [&latch, &lock, &finishedCount, &x] () { + latch.arrive_and_wait(); + while (true) { + { + auto guard = ReaderGuard(lock); + // do some stuff + for (ui32 i = 0; i < 10'000u; ++i) { + x.fetch_add(i); + } + } + if (finishedCount.fetch_add(1) > 20'000) { + break; + } + } + }; + + auto readerPool = CreateThreadPool(readerThreads); + for (int i = 0; i < readerThreads; ++i) { + readerPool->SafeAddFunc(readerTask); + } + + latch.arrive_and_wait(); + while (finishedCount.load() == 0); + auto guard = WriterGuard(lock); + EXPECT_LE(finishedCount.load(), 1'000u); + DoNotOptimizeAway(x); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NConcurrency diff --git a/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp new file mode 100644 index 00000000000..26e58fff745 --- /dev/null +++ b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp @@ -0,0 +1,160 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yt/threading/rw_spin_lock.h> +#include <library/cpp/yt/threading/fork_aware_spin_lock.h> + +#include <util/thread/pool.h> + +#include <sys/wait.h> + +namespace NYT::NThreading { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TReaderWriterSpinLockTest, ForkFriendlyness) +{ + std::atomic<bool> stopped = {false}; + YT_DECLARE_SPIN_LOCK(TReaderWriterSpinLock, lock); + + auto readerTask = [&lock, &stopped] () { + while (!stopped.load()) { + ForkFriendlyReaderGuard(lock); + } + }; + + auto tryReaderTask = [&lock, &stopped] () { + while (!stopped.load()) { + // NB(pavook): TryAcquire instead of Acquire to minimize checks. + bool acquired = lock.TryAcquireReaderForkFriendly(); + if (acquired) { + lock.ReleaseReader(); + } + } + }; + + auto tryWriterTask = [&lock, &stopped] () { + while (!stopped.load()) { + Sleep(TDuration::MicroSeconds(1)); + bool acquired = lock.TryAcquireWriter(); + if (acquired) { + lock.ReleaseWriter(); + } + } + }; + + auto writerTask = [&lock, &stopped] () { + while (!stopped.load()) { + Sleep(TDuration::MicroSeconds(1)); + WriterGuard(lock); + } + }; + + int readerCount = 20; + int writerCount = 10; + + auto reader = CreateThreadPool(readerCount); + auto writer = CreateThreadPool(writerCount); + + for (int i = 0; i < readerCount / 2; ++i) { + reader->SafeAddFunc(readerTask); + reader->SafeAddFunc(tryReaderTask); + } + for (int i = 0; i < writerCount / 2; ++i) { + writer->SafeAddFunc(writerTask); + writer->SafeAddFunc(tryWriterTask); + } + + // And let the chaos begin! + int forkCount = 2000; + for (int iter = 1; iter <= forkCount; ++iter) { + pid_t pid; + { + auto guard = WriterGuard(lock); + pid = fork(); + } + + YT_VERIFY(pid >= 0); + + // NB(pavook): check different orders to maximize chaos. + if (iter % 2 == 0) { + ReaderGuard(lock); + } + WriterGuard(lock); + ReaderGuard(lock); + if (pid == 0) { + // NB(pavook): thread pools are no longer with us. + _exit(0); + } + } + + for (int i = 1; i <= forkCount; ++i) { + int status; + YT_VERIFY(waitpid(0, &status, 0) > 0); + YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0); + } + + stopped.store(true); +} + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TForkAwareSpinLockTest, ForkSafety) +{ + std::atomic<bool> stopped = {false}; + YT_DECLARE_SPIN_LOCK(TForkAwareSpinLock, lock); + + auto acquireTask = [&lock, &stopped] () { + while (!stopped.load()) { + Guard(lock); + } + }; + + // NB(pavook): TryAcquire instead of Acquire to minimize checks. + auto tryAcquireTask = [&lock, &stopped] () { + while (!stopped.load()) { + bool acquired = lock.TryAcquire(); + if (acquired) { + lock.Release(); + } + } + }; + + int workerCount = 20; + + auto worker = CreateThreadPool(workerCount); + + for (int i = 0; i < workerCount / 2; ++i) { + worker->SafeAddFunc(acquireTask); + worker->SafeAddFunc(tryAcquireTask); + } + + // And let the chaos begin! + int forkCount = 2000; + for (int iter = 1; iter <= forkCount; ++iter) { + pid_t pid = fork(); + + YT_VERIFY(pid >= 0); + + Guard(lock); + Guard(lock); + + if (pid == 0) { + // NB(pavook): thread pools are no longer with us. + _exit(0); + } + } + + for (int i = 1; i <= forkCount; ++i) { + int status; + YT_VERIFY(waitpid(0, &status, 0) > 0); + YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0); + } + + stopped.store(true); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NConcurrency diff --git a/library/cpp/yt/threading/unittests/ya.make b/library/cpp/yt/threading/unittests/ya.make index ef9b5d29951..da006012c00 100644 --- a/library/cpp/yt/threading/unittests/ya.make +++ b/library/cpp/yt/threading/unittests/ya.make @@ -5,9 +5,14 @@ INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc) SRCS( count_down_latch_ut.cpp recursive_spin_lock_ut.cpp + rw_spin_lock_ut.cpp spin_wait_ut.cpp ) +IF (NOT OS_WINDOWS) + SRC(spin_lock_fork_ut.cpp) +ENDIF() + PEERDIR( library/cpp/yt/assert library/cpp/yt/threading diff --git a/library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h b/library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h new file mode 100644 index 00000000000..cf8bde715cc --- /dev/null +++ b/library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h @@ -0,0 +1,101 @@ +#pragma once +#ifndef WRITER_STARVING_RW_SPIN_LOCK_INL_H_ +#error "Direct inclusion of this file is not allowed, include rw_spin_lock.h" +// For the sake of sane code completion. +#include "writer_starving_rw_spin_lock.h" +#endif +#undef WRITER_STARVING_RW_SPIN_LOCK_INL_H_ + +#include "spin_wait.h" + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +inline void TWriterStarvingRWSpinLock::AcquireReader() noexcept +{ + if (TryAcquireReader()) { + return; + } + AcquireReaderSlow(); +} + +inline void TWriterStarvingRWSpinLock::ReleaseReader() noexcept +{ + auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release); + Y_ASSERT((prevValue & ~WriterMask) != 0); + NDetail::RecordSpinLockReleased(); +} + +inline void TWriterStarvingRWSpinLock::AcquireWriter() noexcept +{ + if (TryAcquireWriter()) { + return; + } + AcquireWriterSlow(); +} + +inline void TWriterStarvingRWSpinLock::ReleaseWriter() noexcept +{ + auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release); + Y_ASSERT(prevValue & WriterMask); + NDetail::RecordSpinLockReleased(); +} + +inline bool TWriterStarvingRWSpinLock::IsLocked() const noexcept +{ + return Value_.load() != UnlockedValue; +} + +inline bool TWriterStarvingRWSpinLock::IsLockedByReader() const noexcept +{ + return Value_.load() >= ReaderDelta; +} + +inline bool TWriterStarvingRWSpinLock::IsLockedByWriter() const noexcept +{ + return (Value_.load() & WriterMask) != 0; +} + +inline bool TWriterStarvingRWSpinLock::TryAcquireReader() noexcept +{ + auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire); + if ((oldValue & WriterMask) != 0) { + Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed); + return false; + } + NDetail::RecordSpinLockAcquired(); + return true; +} + +inline bool TWriterStarvingRWSpinLock::TryAndTryAcquireReader() noexcept +{ + auto oldValue = Value_.load(std::memory_order::relaxed); + if ((oldValue & WriterMask) != 0) { + return false; + } + return TryAcquireReader(); +} + +inline bool TWriterStarvingRWSpinLock::TryAcquireWriter() noexcept +{ + auto expected = UnlockedValue; + + bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire); + NDetail::RecordSpinLockAcquired(acquired); + return acquired; +} + +inline bool TWriterStarvingRWSpinLock::TryAndTryAcquireWriter() noexcept +{ + auto oldValue = Value_.load(std::memory_order::relaxed); + if (oldValue != UnlockedValue) { + return false; + } + return TryAcquireWriter(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading + diff --git a/library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp b/library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp new file mode 100644 index 00000000000..74c9f59db13 --- /dev/null +++ b/library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp @@ -0,0 +1,25 @@ +#include "writer_starving_rw_spin_lock.h" + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +void TWriterStarvingRWSpinLock::AcquireReaderSlow() noexcept +{ + TSpinWait spinWait(Location_, ESpinLockActivityKind::Read); + while (!TryAndTryAcquireReader()) { + spinWait.Wait(); + } +} + +void TWriterStarvingRWSpinLock::AcquireWriterSlow() noexcept +{ + TSpinWait spinWait(Location_, ESpinLockActivityKind::Write); + while (!TryAndTryAcquireWriter()) { + spinWait.Wait(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading diff --git a/library/cpp/yt/threading/writer_starving_rw_spin_lock.h b/library/cpp/yt/threading/writer_starving_rw_spin_lock.h new file mode 100644 index 00000000000..8a456afe21b --- /dev/null +++ b/library/cpp/yt/threading/writer_starving_rw_spin_lock.h @@ -0,0 +1,115 @@ +#pragma once + +#include "public.h" +#include "rw_spin_lock.h" +#include "spin_lock_base.h" +#include "spin_lock_count.h" + +#include <library/cpp/yt/memory/public.h> + +#include <util/system/rwlock.h> + +#include <atomic> + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +// TODO(pavook): deprecate it. + +//! Single-writer multiple-readers spin lock. +/*! + * Reader-side calls are pretty cheap. + * WARNING: The lock is unfair, and readers can starve writers. See rw_spin_lock.h for a writer-prioritized lock. + * WARNING: Never use the bare lock if forks are possible: see fork_aware_rw_spin_lock.h for a fork-safe lock. + * Unlike rw_spin_lock.h, reader-side is reentrant here: it is possible to acquire the **reader** lock multiple times + * even in the single thread. + * This doesn't mean you should do it: in fact, you shouldn't: use separate locks for separate entities. + * If you see this class in your code, try migrating to the proper rw_spin_lock.h after ensuring you don't rely on + * reentrant locking. + */ +class TWriterStarvingRWSpinLock + : public TSpinLockBase +{ +public: + using TSpinLockBase::TSpinLockBase; + + //! Acquires the reader lock. + /*! + * Optimized for the case of read-intensive workloads. + * Cheap (just one atomic increment and no spinning if no writers are present). + * Don't use this call if forks are possible: forking at some + * intermediate point inside #AcquireReader may corrupt the lock state and + * leave lock forever stuck for the child process. + */ + void AcquireReader() noexcept; + //! Tries acquiring the reader lock; see #AcquireReader. + //! Returns |true| on success. + bool TryAcquireReader() noexcept; + //! Releases the reader lock. + /*! + * Cheap (just one atomic decrement). + */ + void ReleaseReader() noexcept; + + //! Acquires the writer lock. + /*! + * Rather cheap (just one CAS). + */ + void AcquireWriter() noexcept; + //! Tries acquiring the writer lock; see #AcquireWriter. + //! Returns |true| on success. + bool TryAcquireWriter() noexcept; + //! Releases the writer lock. + /*! + * Cheap (just one atomic store). + */ + void ReleaseWriter() noexcept; + + //! Returns true if the lock is taken (either by a reader or writer). + /*! + * This is inherently racy. + * Only use for debugging and diagnostic purposes. + */ + bool IsLocked() const noexcept; + + //! Returns true if the lock is taken by reader. + /*! + * This is inherently racy. + * Only use for debugging and diagnostic purposes. + */ + bool IsLockedByReader() const noexcept; + + //! Returns true if the lock is taken by writer. + /*! + * This is inherently racy. + * Only use for debugging and diagnostic purposes. + */ + bool IsLockedByWriter() const noexcept; + +private: + using TValue = ui32; + static constexpr TValue UnlockedValue = 0; + static constexpr TValue WriterMask = 1; + static constexpr TValue ReaderDelta = 2; + + std::atomic<TValue> Value_ = UnlockedValue; + + + bool TryAndTryAcquireReader() noexcept; + bool TryAndTryAcquireWriter() noexcept; + + void AcquireReaderSlow() noexcept; + void AcquireWriterSlow() noexcept; +}; + +REGISTER_TRACKED_SPIN_LOCK_CLASS(TWriterStarvingRWSpinLock) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading + +#define WRITER_STARVING_RW_SPIN_LOCK_INL_H_ +#include "writer_starving_rw_spin_lock-inl.h" +#undef WRITER_STARVING_RW_SPIN_LOCK_INL_H_ + diff --git a/library/cpp/yt/threading/ya.make b/library/cpp/yt/threading/ya.make index cc11e7974ef..d25f0a70681 100644 --- a/library/cpp/yt/threading/ya.make +++ b/library/cpp/yt/threading/ya.make @@ -18,6 +18,7 @@ SRCS( spin_lock.cpp spin_wait.cpp spin_wait_hook.cpp + writer_starving_rw_spin_lock.cpp ) PEERDIR( |