diff options
author | pavook <pavook@yandex-team.com> | 2025-04-15 19:37:21 +0300 |
---|---|---|
committer | pavook <pavook@yandex-team.com> | 2025-04-15 19:53:08 +0300 |
commit | 3b1c16e57e7e2e4499c9d1b592f350f5a4a1a960 (patch) | |
tree | 1f9692892d7c27660bae2c8f47cfd8a7986bd639 | |
parent | 218ec6a31e4d1f39811692f17f54a77a5755ee4c (diff) | |
download | ydb-3b1c16e57e7e2e4499c9d1b592f350f5a4a1a960.tar.gz |
Revert commit rXXXXXX, YT-24537: Prioritize writers in TReaderWriterSpinLock
It turns out, RWSpinLock previously accidentally supported recursive AcquireReaders, and this property was broken with the change.
commit_hash:f996e7b52ef8b3d37118034530a094af0efbe435
-rw-r--r-- | library/cpp/yt/threading/rw_spin_lock-inl.h | 35 | ||||
-rw-r--r-- | library/cpp/yt/threading/rw_spin_lock.h | 27 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp | 56 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp | 160 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/ya.make | 5 |
5 files changed, 17 insertions, 266 deletions
diff --git a/library/cpp/yt/threading/rw_spin_lock-inl.h b/library/cpp/yt/threading/rw_spin_lock-inl.h index 0811bf548d6..779de1b64a8 100644 --- a/library/cpp/yt/threading/rw_spin_lock-inl.h +++ b/library/cpp/yt/threading/rw_spin_lock-inl.h @@ -31,7 +31,7 @@ inline void TReaderWriterSpinLock::AcquireReaderForkFriendly() noexcept inline void TReaderWriterSpinLock::ReleaseReader() noexcept { auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release); - Y_ASSERT((prevValue & ~(WriterMask | WriterReadyMask)) != 0); + Y_ASSERT((prevValue & ~WriterMask) != 0); NDetail::RecordSpinLockReleased(); } @@ -45,7 +45,7 @@ inline void TReaderWriterSpinLock::AcquireWriter() noexcept inline void TReaderWriterSpinLock::ReleaseWriter() noexcept { - auto prevValue = Value_.fetch_and(~(WriterMask | WriterReadyMask), std::memory_order::release); + auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release); Y_ASSERT(prevValue & WriterMask); NDetail::RecordSpinLockReleased(); } @@ -68,7 +68,7 @@ inline bool TReaderWriterSpinLock::IsLockedByWriter() const noexcept inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept { auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire); - if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { + if ((oldValue & WriterMask) != 0) { Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed); return false; } @@ -79,7 +79,7 @@ inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { + if ((oldValue & WriterMask) != 0) { return false; } return TryAcquireReader(); @@ -88,7 +88,7 @@ inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { + if ((oldValue & WriterMask) != 0) { return false; } auto newValue = oldValue + ReaderDelta; @@ -98,35 +98,22 @@ inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept return acquired; } -inline bool TReaderWriterSpinLock::TryAcquireWriterWithExpectedValue(TValue expected) noexcept +inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept { - bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire); + auto expected = UnlockedValue; + + bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire); NDetail::RecordSpinLockAcquired(acquired); return acquired; } -inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept -{ - // NB(pavook): we cannot expect writer ready to be set, as this method - // might be called without indicating writer readiness and we cannot - // indicate readiness on the hot path. This means that code calling - // TryAcquireWriter will spin against code calling AcquireWriter. - return TryAcquireWriterWithExpectedValue(UnlockedValue); -} - inline bool TReaderWriterSpinLock::TryAndTryAcquireWriter() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - - if ((oldValue & WriterReadyMask) == 0) { - oldValue = Value_.fetch_or(WriterReadyMask, std::memory_order::relaxed); - } - - if ((oldValue & (~WriterReadyMask)) != 0) { + if (oldValue != UnlockedValue) { return false; } - - return TryAcquireWriterWithExpectedValue(WriterReadyMask); + return TryAcquireWriter(); } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h index 367c3952786..a915e677e82 100644 --- a/library/cpp/yt/threading/rw_spin_lock.h +++ b/library/cpp/yt/threading/rw_spin_lock.h @@ -16,16 +16,8 @@ namespace NYT::NThreading { //! Single-writer multiple-readers spin lock. /*! - * Reader-side acquires are pretty cheap, and readers don't spin unless writers - * are present. - * - * The lock is unfair, but writers are prioritized over readers, that is, - * if AcquireWriter() is called at some time, then some writer - * (not necessarily the same one that called AcquireWriter) will succeed - * in the next time. - * - * WARNING: You should not use this lock if forks are possible: see - * fork_aware_rw_spin_lock.h for a proper fork-safe lock. + * Reader-side calls are pretty cheap. + * The lock is unfair. */ class TReaderWriterSpinLock : public TSpinLockBase @@ -37,23 +29,18 @@ public: /*! * Optimized for the case of read-intensive workloads. * Cheap (just one atomic increment and no spinning if no writers are present). - * * Don't use this call if forks are possible: forking at some * intermediate point inside #AcquireReader may corrupt the lock state and - * leave the lock stuck forever for the child process. + * leave lock forever stuck for the child process. */ void AcquireReader() noexcept; //! Acquires the reader lock. /*! * A more expensive version of #AcquireReader (includes at least * one atomic load and CAS; also may spin even if just readers are present). - * * In contrast to #AcquireReader, this method can be used in the presence of forks. - * - * WARNING: fork-friendliness alone does not provide fork-safety: additional - * actions must be performed to release the lock after a fork. This means you - * probably should NOT use this lock in the presence of forks, consider - * fork_aware_rw_spin_lock.h instead as a proper fork-safe lock. + * Note that fork-friendliness alone does not provide fork-safety: additional + * actions must be performed to release the lock after a fork. */ void AcquireReaderForkFriendly() noexcept; //! Tries acquiring the reader lock; see #AcquireReader. @@ -107,12 +94,10 @@ private: using TValue = ui32; static constexpr TValue UnlockedValue = 0; static constexpr TValue WriterMask = 1; - static constexpr TValue WriterReadyMask = 2; - static constexpr TValue ReaderDelta = 4; + static constexpr TValue ReaderDelta = 2; std::atomic<TValue> Value_ = UnlockedValue; - bool TryAcquireWriterWithExpectedValue(TValue expected) noexcept; bool TryAndTryAcquireReader() noexcept; bool TryAndTryAcquireWriter() noexcept; diff --git a/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp deleted file mode 100644 index 653772604ce..00000000000 --- a/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp +++ /dev/null @@ -1,56 +0,0 @@ -#include <library/cpp/testing/gtest/gtest.h> - -#include <library/cpp/yt/threading/rw_spin_lock.h> - -#include <util/thread/pool.h> - -#include <latch> -#include <thread> - -namespace NYT::NThreading { -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -TEST(TReaderWriterSpinLockTest, WriterPriority) -{ - int readerThreads = 10; - std::latch latch(readerThreads + 1); - std::atomic<size_t> finishedCount = {0}; - - TReaderWriterSpinLock lock; - - volatile std::atomic<ui32> x = {0}; - - auto readerTask = [&latch, &lock, &finishedCount, &x] () { - latch.arrive_and_wait(); - while (true) { - { - auto guard = ReaderGuard(lock); - // do some stuff - for (ui32 i = 0; i < 10'000u; ++i) { - x.fetch_add(i); - } - } - if (finishedCount.fetch_add(1) > 20'000) { - break; - } - } - }; - - auto readerPool = CreateThreadPool(readerThreads); - for (int i = 0; i < readerThreads; ++i) { - readerPool->SafeAddFunc(readerTask); - } - - latch.arrive_and_wait(); - while (finishedCount.load() == 0); - auto guard = WriterGuard(lock); - EXPECT_LE(finishedCount.load(), 1'000u); - DoNotOptimizeAway(x); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace -} // namespace NYT::NConcurrency diff --git a/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp deleted file mode 100644 index 26e58fff745..00000000000 --- a/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp +++ /dev/null @@ -1,160 +0,0 @@ -#include <library/cpp/testing/gtest/gtest.h> - -#include <library/cpp/yt/threading/rw_spin_lock.h> -#include <library/cpp/yt/threading/fork_aware_spin_lock.h> - -#include <util/thread/pool.h> - -#include <sys/wait.h> - -namespace NYT::NThreading { -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -TEST(TReaderWriterSpinLockTest, ForkFriendlyness) -{ - std::atomic<bool> stopped = {false}; - YT_DECLARE_SPIN_LOCK(TReaderWriterSpinLock, lock); - - auto readerTask = [&lock, &stopped] () { - while (!stopped.load()) { - ForkFriendlyReaderGuard(lock); - } - }; - - auto tryReaderTask = [&lock, &stopped] () { - while (!stopped.load()) { - // NB(pavook): TryAcquire instead of Acquire to minimize checks. - bool acquired = lock.TryAcquireReaderForkFriendly(); - if (acquired) { - lock.ReleaseReader(); - } - } - }; - - auto tryWriterTask = [&lock, &stopped] () { - while (!stopped.load()) { - Sleep(TDuration::MicroSeconds(1)); - bool acquired = lock.TryAcquireWriter(); - if (acquired) { - lock.ReleaseWriter(); - } - } - }; - - auto writerTask = [&lock, &stopped] () { - while (!stopped.load()) { - Sleep(TDuration::MicroSeconds(1)); - WriterGuard(lock); - } - }; - - int readerCount = 20; - int writerCount = 10; - - auto reader = CreateThreadPool(readerCount); - auto writer = CreateThreadPool(writerCount); - - for (int i = 0; i < readerCount / 2; ++i) { - reader->SafeAddFunc(readerTask); - reader->SafeAddFunc(tryReaderTask); - } - for (int i = 0; i < writerCount / 2; ++i) { - writer->SafeAddFunc(writerTask); - writer->SafeAddFunc(tryWriterTask); - } - - // And let the chaos begin! - int forkCount = 2000; - for (int iter = 1; iter <= forkCount; ++iter) { - pid_t pid; - { - auto guard = WriterGuard(lock); - pid = fork(); - } - - YT_VERIFY(pid >= 0); - - // NB(pavook): check different orders to maximize chaos. - if (iter % 2 == 0) { - ReaderGuard(lock); - } - WriterGuard(lock); - ReaderGuard(lock); - if (pid == 0) { - // NB(pavook): thread pools are no longer with us. - _exit(0); - } - } - - for (int i = 1; i <= forkCount; ++i) { - int status; - YT_VERIFY(waitpid(0, &status, 0) > 0); - YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0); - } - - stopped.store(true); -} - -//////////////////////////////////////////////////////////////////////////////// - -TEST(TForkAwareSpinLockTest, ForkSafety) -{ - std::atomic<bool> stopped = {false}; - YT_DECLARE_SPIN_LOCK(TForkAwareSpinLock, lock); - - auto acquireTask = [&lock, &stopped] () { - while (!stopped.load()) { - Guard(lock); - } - }; - - // NB(pavook): TryAcquire instead of Acquire to minimize checks. - auto tryAcquireTask = [&lock, &stopped] () { - while (!stopped.load()) { - bool acquired = lock.TryAcquire(); - if (acquired) { - lock.Release(); - } - } - }; - - int workerCount = 20; - - auto worker = CreateThreadPool(workerCount); - - for (int i = 0; i < workerCount / 2; ++i) { - worker->SafeAddFunc(acquireTask); - worker->SafeAddFunc(tryAcquireTask); - } - - // And let the chaos begin! - int forkCount = 2000; - for (int iter = 1; iter <= forkCount; ++iter) { - pid_t pid = fork(); - - YT_VERIFY(pid >= 0); - - Guard(lock); - Guard(lock); - - if (pid == 0) { - // NB(pavook): thread pools are no longer with us. - _exit(0); - } - } - - for (int i = 1; i <= forkCount; ++i) { - int status; - YT_VERIFY(waitpid(0, &status, 0) > 0); - YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0); - } - - stopped.store(true); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace -} // namespace NYT::NConcurrency diff --git a/library/cpp/yt/threading/unittests/ya.make b/library/cpp/yt/threading/unittests/ya.make index da006012c00..ef9b5d29951 100644 --- a/library/cpp/yt/threading/unittests/ya.make +++ b/library/cpp/yt/threading/unittests/ya.make @@ -5,14 +5,9 @@ INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc) SRCS( count_down_latch_ut.cpp recursive_spin_lock_ut.cpp - rw_spin_lock_ut.cpp spin_wait_ut.cpp ) -IF (NOT OS_WINDOWS) - SRC(spin_lock_fork_ut.cpp) -ENDIF() - PEERDIR( library/cpp/yt/assert library/cpp/yt/threading |