diff options
author | pavook <[email protected]> | 2025-04-10 15:42:18 +0300 |
---|---|---|
committer | pavook <[email protected]> | 2025-04-10 15:57:08 +0300 |
commit | e4ca0cae001c7a0cc845bca18daff015123a1294 (patch) | |
tree | 2c97822eea81dca4b8294a10c818101e65acc34f /library/cpp | |
parent | 649e2ccd071e7d216ad38e2dec8f3bee0918a392 (diff) |
YT-24537: Prioritize writers in TReaderWriterSpinLock
commit_hash:94fee5363799655628bd7e2c144a7869a9d89002
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/yt/threading/rw_spin_lock-inl.h | 35 | ||||
-rw-r--r-- | library/cpp/yt/threading/rw_spin_lock.h | 27 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp | 56 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp | 160 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/ya.make | 5 |
5 files changed, 266 insertions, 17 deletions
diff --git a/library/cpp/yt/threading/rw_spin_lock-inl.h b/library/cpp/yt/threading/rw_spin_lock-inl.h index 779de1b64a8..0811bf548d6 100644 --- a/library/cpp/yt/threading/rw_spin_lock-inl.h +++ b/library/cpp/yt/threading/rw_spin_lock-inl.h @@ -31,7 +31,7 @@ inline void TReaderWriterSpinLock::AcquireReaderForkFriendly() noexcept inline void TReaderWriterSpinLock::ReleaseReader() noexcept { auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release); - Y_ASSERT((prevValue & ~WriterMask) != 0); + Y_ASSERT((prevValue & ~(WriterMask | WriterReadyMask)) != 0); NDetail::RecordSpinLockReleased(); } @@ -45,7 +45,7 @@ inline void TReaderWriterSpinLock::AcquireWriter() noexcept inline void TReaderWriterSpinLock::ReleaseWriter() noexcept { - auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release); + auto prevValue = Value_.fetch_and(~(WriterMask | WriterReadyMask), std::memory_order::release); Y_ASSERT(prevValue & WriterMask); NDetail::RecordSpinLockReleased(); } @@ -68,7 +68,7 @@ inline bool TReaderWriterSpinLock::IsLockedByWriter() const noexcept inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept { auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire); - if ((oldValue & WriterMask) != 0) { + if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed); return false; } @@ -79,7 +79,7 @@ inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if ((oldValue & WriterMask) != 0) { + if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { return false; } return TryAcquireReader(); @@ -88,7 +88,7 @@ inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if ((oldValue & WriterMask) != 0) { + if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { return false; } auto newValue = oldValue + ReaderDelta; @@ -98,22 +98,35 @@ inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept return acquired; } -inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept +inline bool TReaderWriterSpinLock::TryAcquireWriterWithExpectedValue(TValue expected) noexcept { - auto expected = UnlockedValue; - - bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire); + bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire); NDetail::RecordSpinLockAcquired(acquired); return acquired; } +inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept +{ + // NB(pavook): we cannot expect writer ready to be set, as this method + // might be called without indicating writer readiness and we cannot + // indicate readiness on the hot path. This means that code calling + // TryAcquireWriter will spin against code calling AcquireWriter. + return TryAcquireWriterWithExpectedValue(UnlockedValue); +} + inline bool TReaderWriterSpinLock::TryAndTryAcquireWriter() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if (oldValue != UnlockedValue) { + + if ((oldValue & WriterReadyMask) == 0) { + oldValue = Value_.fetch_or(WriterReadyMask, std::memory_order::relaxed); + } + + if ((oldValue & (~WriterReadyMask)) != 0) { return false; } - return TryAcquireWriter(); + + return TryAcquireWriterWithExpectedValue(WriterReadyMask); } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h index a915e677e82..367c3952786 100644 --- a/library/cpp/yt/threading/rw_spin_lock.h +++ b/library/cpp/yt/threading/rw_spin_lock.h @@ -16,8 +16,16 @@ namespace NYT::NThreading { //! Single-writer multiple-readers spin lock. /*! - * Reader-side calls are pretty cheap. - * The lock is unfair. + * Reader-side acquires are pretty cheap, and readers don't spin unless writers + * are present. + * + * The lock is unfair, but writers are prioritized over readers, that is, + * if AcquireWriter() is called at some time, then some writer + * (not necessarily the same one that called AcquireWriter) will succeed + * in the next time. + * + * WARNING: You should not use this lock if forks are possible: see + * fork_aware_rw_spin_lock.h for a proper fork-safe lock. */ class TReaderWriterSpinLock : public TSpinLockBase @@ -29,18 +37,23 @@ public: /*! * Optimized for the case of read-intensive workloads. * Cheap (just one atomic increment and no spinning if no writers are present). + * * Don't use this call if forks are possible: forking at some * intermediate point inside #AcquireReader may corrupt the lock state and - * leave lock forever stuck for the child process. + * leave the lock stuck forever for the child process. */ void AcquireReader() noexcept; //! Acquires the reader lock. /*! * A more expensive version of #AcquireReader (includes at least * one atomic load and CAS; also may spin even if just readers are present). + * * In contrast to #AcquireReader, this method can be used in the presence of forks. - * Note that fork-friendliness alone does not provide fork-safety: additional - * actions must be performed to release the lock after a fork. + * + * WARNING: fork-friendliness alone does not provide fork-safety: additional + * actions must be performed to release the lock after a fork. This means you + * probably should NOT use this lock in the presence of forks, consider + * fork_aware_rw_spin_lock.h instead as a proper fork-safe lock. */ void AcquireReaderForkFriendly() noexcept; //! Tries acquiring the reader lock; see #AcquireReader. @@ -94,10 +107,12 @@ private: using TValue = ui32; static constexpr TValue UnlockedValue = 0; static constexpr TValue WriterMask = 1; - static constexpr TValue ReaderDelta = 2; + static constexpr TValue WriterReadyMask = 2; + static constexpr TValue ReaderDelta = 4; std::atomic<TValue> Value_ = UnlockedValue; + bool TryAcquireWriterWithExpectedValue(TValue expected) noexcept; bool TryAndTryAcquireReader() noexcept; bool TryAndTryAcquireWriter() noexcept; diff --git a/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp new file mode 100644 index 00000000000..653772604ce --- /dev/null +++ b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp @@ -0,0 +1,56 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yt/threading/rw_spin_lock.h> + +#include <util/thread/pool.h> + +#include <latch> +#include <thread> + +namespace NYT::NThreading { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TReaderWriterSpinLockTest, WriterPriority) +{ + int readerThreads = 10; + std::latch latch(readerThreads + 1); + std::atomic<size_t> finishedCount = {0}; + + TReaderWriterSpinLock lock; + + volatile std::atomic<ui32> x = {0}; + + auto readerTask = [&latch, &lock, &finishedCount, &x] () { + latch.arrive_and_wait(); + while (true) { + { + auto guard = ReaderGuard(lock); + // do some stuff + for (ui32 i = 0; i < 10'000u; ++i) { + x.fetch_add(i); + } + } + if (finishedCount.fetch_add(1) > 20'000) { + break; + } + } + }; + + auto readerPool = CreateThreadPool(readerThreads); + for (int i = 0; i < readerThreads; ++i) { + readerPool->SafeAddFunc(readerTask); + } + + latch.arrive_and_wait(); + while (finishedCount.load() == 0); + auto guard = WriterGuard(lock); + EXPECT_LE(finishedCount.load(), 1'000u); + DoNotOptimizeAway(x); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NConcurrency diff --git a/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp new file mode 100644 index 00000000000..26e58fff745 --- /dev/null +++ b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp @@ -0,0 +1,160 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yt/threading/rw_spin_lock.h> +#include <library/cpp/yt/threading/fork_aware_spin_lock.h> + +#include <util/thread/pool.h> + +#include <sys/wait.h> + +namespace NYT::NThreading { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TReaderWriterSpinLockTest, ForkFriendlyness) +{ + std::atomic<bool> stopped = {false}; + YT_DECLARE_SPIN_LOCK(TReaderWriterSpinLock, lock); + + auto readerTask = [&lock, &stopped] () { + while (!stopped.load()) { + ForkFriendlyReaderGuard(lock); + } + }; + + auto tryReaderTask = [&lock, &stopped] () { + while (!stopped.load()) { + // NB(pavook): TryAcquire instead of Acquire to minimize checks. + bool acquired = lock.TryAcquireReaderForkFriendly(); + if (acquired) { + lock.ReleaseReader(); + } + } + }; + + auto tryWriterTask = [&lock, &stopped] () { + while (!stopped.load()) { + Sleep(TDuration::MicroSeconds(1)); + bool acquired = lock.TryAcquireWriter(); + if (acquired) { + lock.ReleaseWriter(); + } + } + }; + + auto writerTask = [&lock, &stopped] () { + while (!stopped.load()) { + Sleep(TDuration::MicroSeconds(1)); + WriterGuard(lock); + } + }; + + int readerCount = 20; + int writerCount = 10; + + auto reader = CreateThreadPool(readerCount); + auto writer = CreateThreadPool(writerCount); + + for (int i = 0; i < readerCount / 2; ++i) { + reader->SafeAddFunc(readerTask); + reader->SafeAddFunc(tryReaderTask); + } + for (int i = 0; i < writerCount / 2; ++i) { + writer->SafeAddFunc(writerTask); + writer->SafeAddFunc(tryWriterTask); + } + + // And let the chaos begin! + int forkCount = 2000; + for (int iter = 1; iter <= forkCount; ++iter) { + pid_t pid; + { + auto guard = WriterGuard(lock); + pid = fork(); + } + + YT_VERIFY(pid >= 0); + + // NB(pavook): check different orders to maximize chaos. + if (iter % 2 == 0) { + ReaderGuard(lock); + } + WriterGuard(lock); + ReaderGuard(lock); + if (pid == 0) { + // NB(pavook): thread pools are no longer with us. + _exit(0); + } + } + + for (int i = 1; i <= forkCount; ++i) { + int status; + YT_VERIFY(waitpid(0, &status, 0) > 0); + YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0); + } + + stopped.store(true); +} + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TForkAwareSpinLockTest, ForkSafety) +{ + std::atomic<bool> stopped = {false}; + YT_DECLARE_SPIN_LOCK(TForkAwareSpinLock, lock); + + auto acquireTask = [&lock, &stopped] () { + while (!stopped.load()) { + Guard(lock); + } + }; + + // NB(pavook): TryAcquire instead of Acquire to minimize checks. + auto tryAcquireTask = [&lock, &stopped] () { + while (!stopped.load()) { + bool acquired = lock.TryAcquire(); + if (acquired) { + lock.Release(); + } + } + }; + + int workerCount = 20; + + auto worker = CreateThreadPool(workerCount); + + for (int i = 0; i < workerCount / 2; ++i) { + worker->SafeAddFunc(acquireTask); + worker->SafeAddFunc(tryAcquireTask); + } + + // And let the chaos begin! + int forkCount = 2000; + for (int iter = 1; iter <= forkCount; ++iter) { + pid_t pid = fork(); + + YT_VERIFY(pid >= 0); + + Guard(lock); + Guard(lock); + + if (pid == 0) { + // NB(pavook): thread pools are no longer with us. + _exit(0); + } + } + + for (int i = 1; i <= forkCount; ++i) { + int status; + YT_VERIFY(waitpid(0, &status, 0) > 0); + YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0); + } + + stopped.store(true); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NConcurrency diff --git a/library/cpp/yt/threading/unittests/ya.make b/library/cpp/yt/threading/unittests/ya.make index ef9b5d29951..da006012c00 100644 --- a/library/cpp/yt/threading/unittests/ya.make +++ b/library/cpp/yt/threading/unittests/ya.make @@ -5,9 +5,14 @@ INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc) SRCS( count_down_latch_ut.cpp recursive_spin_lock_ut.cpp + rw_spin_lock_ut.cpp spin_wait_ut.cpp ) +IF (NOT OS_WINDOWS) + SRC(spin_lock_fork_ut.cpp) +ENDIF() + PEERDIR( library/cpp/yt/assert library/cpp/yt/threading |