aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpavook <pavook@yandex-team.com>2025-04-15 19:37:21 +0300
committerpavook <pavook@yandex-team.com>2025-04-15 19:53:08 +0300
commit3b1c16e57e7e2e4499c9d1b592f350f5a4a1a960 (patch)
tree1f9692892d7c27660bae2c8f47cfd8a7986bd639
parent218ec6a31e4d1f39811692f17f54a77a5755ee4c (diff)
downloadydb-3b1c16e57e7e2e4499c9d1b592f350f5a4a1a960.tar.gz
Revert commit rXXXXXX, YT-24537: Prioritize writers in TReaderWriterSpinLock
It turns out, RWSpinLock previously accidentally supported recursive AcquireReaders, and this property was broken with the change. commit_hash:f996e7b52ef8b3d37118034530a094af0efbe435
-rw-r--r--library/cpp/yt/threading/rw_spin_lock-inl.h35
-rw-r--r--library/cpp/yt/threading/rw_spin_lock.h27
-rw-r--r--library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp56
-rw-r--r--library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp160
-rw-r--r--library/cpp/yt/threading/unittests/ya.make5
5 files changed, 17 insertions, 266 deletions
diff --git a/library/cpp/yt/threading/rw_spin_lock-inl.h b/library/cpp/yt/threading/rw_spin_lock-inl.h
index 0811bf548d6..779de1b64a8 100644
--- a/library/cpp/yt/threading/rw_spin_lock-inl.h
+++ b/library/cpp/yt/threading/rw_spin_lock-inl.h
@@ -31,7 +31,7 @@ inline void TReaderWriterSpinLock::AcquireReaderForkFriendly() noexcept
inline void TReaderWriterSpinLock::ReleaseReader() noexcept
{
auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release);
- Y_ASSERT((prevValue & ~(WriterMask | WriterReadyMask)) != 0);
+ Y_ASSERT((prevValue & ~WriterMask) != 0);
NDetail::RecordSpinLockReleased();
}
@@ -45,7 +45,7 @@ inline void TReaderWriterSpinLock::AcquireWriter() noexcept
inline void TReaderWriterSpinLock::ReleaseWriter() noexcept
{
- auto prevValue = Value_.fetch_and(~(WriterMask | WriterReadyMask), std::memory_order::release);
+ auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release);
Y_ASSERT(prevValue & WriterMask);
NDetail::RecordSpinLockReleased();
}
@@ -68,7 +68,7 @@ inline bool TReaderWriterSpinLock::IsLockedByWriter() const noexcept
inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept
{
auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire);
- if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
+ if ((oldValue & WriterMask) != 0) {
Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed);
return false;
}
@@ -79,7 +79,7 @@ inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept
inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept
{
auto oldValue = Value_.load(std::memory_order::relaxed);
- if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
+ if ((oldValue & WriterMask) != 0) {
return false;
}
return TryAcquireReader();
@@ -88,7 +88,7 @@ inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept
inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept
{
auto oldValue = Value_.load(std::memory_order::relaxed);
- if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
+ if ((oldValue & WriterMask) != 0) {
return false;
}
auto newValue = oldValue + ReaderDelta;
@@ -98,35 +98,22 @@ inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept
return acquired;
}
-inline bool TReaderWriterSpinLock::TryAcquireWriterWithExpectedValue(TValue expected) noexcept
+inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept
{
- bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire);
+ auto expected = UnlockedValue;
+
+ bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire);
NDetail::RecordSpinLockAcquired(acquired);
return acquired;
}
-inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept
-{
- // NB(pavook): we cannot expect writer ready to be set, as this method
- // might be called without indicating writer readiness and we cannot
- // indicate readiness on the hot path. This means that code calling
- // TryAcquireWriter will spin against code calling AcquireWriter.
- return TryAcquireWriterWithExpectedValue(UnlockedValue);
-}
-
inline bool TReaderWriterSpinLock::TryAndTryAcquireWriter() noexcept
{
auto oldValue = Value_.load(std::memory_order::relaxed);
-
- if ((oldValue & WriterReadyMask) == 0) {
- oldValue = Value_.fetch_or(WriterReadyMask, std::memory_order::relaxed);
- }
-
- if ((oldValue & (~WriterReadyMask)) != 0) {
+ if (oldValue != UnlockedValue) {
return false;
}
-
- return TryAcquireWriterWithExpectedValue(WriterReadyMask);
+ return TryAcquireWriter();
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h
index 367c3952786..a915e677e82 100644
--- a/library/cpp/yt/threading/rw_spin_lock.h
+++ b/library/cpp/yt/threading/rw_spin_lock.h
@@ -16,16 +16,8 @@ namespace NYT::NThreading {
//! Single-writer multiple-readers spin lock.
/*!
- * Reader-side acquires are pretty cheap, and readers don't spin unless writers
- * are present.
- *
- * The lock is unfair, but writers are prioritized over readers, that is,
- * if AcquireWriter() is called at some time, then some writer
- * (not necessarily the same one that called AcquireWriter) will succeed
- * in the next time.
- *
- * WARNING: You should not use this lock if forks are possible: see
- * fork_aware_rw_spin_lock.h for a proper fork-safe lock.
+ * Reader-side calls are pretty cheap.
+ * The lock is unfair.
*/
class TReaderWriterSpinLock
: public TSpinLockBase
@@ -37,23 +29,18 @@ public:
/*!
* Optimized for the case of read-intensive workloads.
* Cheap (just one atomic increment and no spinning if no writers are present).
- *
* Don't use this call if forks are possible: forking at some
* intermediate point inside #AcquireReader may corrupt the lock state and
- * leave the lock stuck forever for the child process.
+ * leave lock forever stuck for the child process.
*/
void AcquireReader() noexcept;
//! Acquires the reader lock.
/*!
* A more expensive version of #AcquireReader (includes at least
* one atomic load and CAS; also may spin even if just readers are present).
- *
* In contrast to #AcquireReader, this method can be used in the presence of forks.
- *
- * WARNING: fork-friendliness alone does not provide fork-safety: additional
- * actions must be performed to release the lock after a fork. This means you
- * probably should NOT use this lock in the presence of forks, consider
- * fork_aware_rw_spin_lock.h instead as a proper fork-safe lock.
+ * Note that fork-friendliness alone does not provide fork-safety: additional
+ * actions must be performed to release the lock after a fork.
*/
void AcquireReaderForkFriendly() noexcept;
//! Tries acquiring the reader lock; see #AcquireReader.
@@ -107,12 +94,10 @@ private:
using TValue = ui32;
static constexpr TValue UnlockedValue = 0;
static constexpr TValue WriterMask = 1;
- static constexpr TValue WriterReadyMask = 2;
- static constexpr TValue ReaderDelta = 4;
+ static constexpr TValue ReaderDelta = 2;
std::atomic<TValue> Value_ = UnlockedValue;
- bool TryAcquireWriterWithExpectedValue(TValue expected) noexcept;
bool TryAndTryAcquireReader() noexcept;
bool TryAndTryAcquireWriter() noexcept;
diff --git a/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp
deleted file mode 100644
index 653772604ce..00000000000
--- a/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-#include <library/cpp/testing/gtest/gtest.h>
-
-#include <library/cpp/yt/threading/rw_spin_lock.h>
-
-#include <util/thread/pool.h>
-
-#include <latch>
-#include <thread>
-
-namespace NYT::NThreading {
-namespace {
-
-////////////////////////////////////////////////////////////////////////////////
-
-TEST(TReaderWriterSpinLockTest, WriterPriority)
-{
- int readerThreads = 10;
- std::latch latch(readerThreads + 1);
- std::atomic<size_t> finishedCount = {0};
-
- TReaderWriterSpinLock lock;
-
- volatile std::atomic<ui32> x = {0};
-
- auto readerTask = [&latch, &lock, &finishedCount, &x] () {
- latch.arrive_and_wait();
- while (true) {
- {
- auto guard = ReaderGuard(lock);
- // do some stuff
- for (ui32 i = 0; i < 10'000u; ++i) {
- x.fetch_add(i);
- }
- }
- if (finishedCount.fetch_add(1) > 20'000) {
- break;
- }
- }
- };
-
- auto readerPool = CreateThreadPool(readerThreads);
- for (int i = 0; i < readerThreads; ++i) {
- readerPool->SafeAddFunc(readerTask);
- }
-
- latch.arrive_and_wait();
- while (finishedCount.load() == 0);
- auto guard = WriterGuard(lock);
- EXPECT_LE(finishedCount.load(), 1'000u);
- DoNotOptimizeAway(x);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace
-} // namespace NYT::NConcurrency
diff --git a/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp
deleted file mode 100644
index 26e58fff745..00000000000
--- a/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp
+++ /dev/null
@@ -1,160 +0,0 @@
-#include <library/cpp/testing/gtest/gtest.h>
-
-#include <library/cpp/yt/threading/rw_spin_lock.h>
-#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
-
-#include <util/thread/pool.h>
-
-#include <sys/wait.h>
-
-namespace NYT::NThreading {
-namespace {
-
-////////////////////////////////////////////////////////////////////////////////
-
-TEST(TReaderWriterSpinLockTest, ForkFriendlyness)
-{
- std::atomic<bool> stopped = {false};
- YT_DECLARE_SPIN_LOCK(TReaderWriterSpinLock, lock);
-
- auto readerTask = [&lock, &stopped] () {
- while (!stopped.load()) {
- ForkFriendlyReaderGuard(lock);
- }
- };
-
- auto tryReaderTask = [&lock, &stopped] () {
- while (!stopped.load()) {
- // NB(pavook): TryAcquire instead of Acquire to minimize checks.
- bool acquired = lock.TryAcquireReaderForkFriendly();
- if (acquired) {
- lock.ReleaseReader();
- }
- }
- };
-
- auto tryWriterTask = [&lock, &stopped] () {
- while (!stopped.load()) {
- Sleep(TDuration::MicroSeconds(1));
- bool acquired = lock.TryAcquireWriter();
- if (acquired) {
- lock.ReleaseWriter();
- }
- }
- };
-
- auto writerTask = [&lock, &stopped] () {
- while (!stopped.load()) {
- Sleep(TDuration::MicroSeconds(1));
- WriterGuard(lock);
- }
- };
-
- int readerCount = 20;
- int writerCount = 10;
-
- auto reader = CreateThreadPool(readerCount);
- auto writer = CreateThreadPool(writerCount);
-
- for (int i = 0; i < readerCount / 2; ++i) {
- reader->SafeAddFunc(readerTask);
- reader->SafeAddFunc(tryReaderTask);
- }
- for (int i = 0; i < writerCount / 2; ++i) {
- writer->SafeAddFunc(writerTask);
- writer->SafeAddFunc(tryWriterTask);
- }
-
- // And let the chaos begin!
- int forkCount = 2000;
- for (int iter = 1; iter <= forkCount; ++iter) {
- pid_t pid;
- {
- auto guard = WriterGuard(lock);
- pid = fork();
- }
-
- YT_VERIFY(pid >= 0);
-
- // NB(pavook): check different orders to maximize chaos.
- if (iter % 2 == 0) {
- ReaderGuard(lock);
- }
- WriterGuard(lock);
- ReaderGuard(lock);
- if (pid == 0) {
- // NB(pavook): thread pools are no longer with us.
- _exit(0);
- }
- }
-
- for (int i = 1; i <= forkCount; ++i) {
- int status;
- YT_VERIFY(waitpid(0, &status, 0) > 0);
- YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0);
- }
-
- stopped.store(true);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TEST(TForkAwareSpinLockTest, ForkSafety)
-{
- std::atomic<bool> stopped = {false};
- YT_DECLARE_SPIN_LOCK(TForkAwareSpinLock, lock);
-
- auto acquireTask = [&lock, &stopped] () {
- while (!stopped.load()) {
- Guard(lock);
- }
- };
-
- // NB(pavook): TryAcquire instead of Acquire to minimize checks.
- auto tryAcquireTask = [&lock, &stopped] () {
- while (!stopped.load()) {
- bool acquired = lock.TryAcquire();
- if (acquired) {
- lock.Release();
- }
- }
- };
-
- int workerCount = 20;
-
- auto worker = CreateThreadPool(workerCount);
-
- for (int i = 0; i < workerCount / 2; ++i) {
- worker->SafeAddFunc(acquireTask);
- worker->SafeAddFunc(tryAcquireTask);
- }
-
- // And let the chaos begin!
- int forkCount = 2000;
- for (int iter = 1; iter <= forkCount; ++iter) {
- pid_t pid = fork();
-
- YT_VERIFY(pid >= 0);
-
- Guard(lock);
- Guard(lock);
-
- if (pid == 0) {
- // NB(pavook): thread pools are no longer with us.
- _exit(0);
- }
- }
-
- for (int i = 1; i <= forkCount; ++i) {
- int status;
- YT_VERIFY(waitpid(0, &status, 0) > 0);
- YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0);
- }
-
- stopped.store(true);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace
-} // namespace NYT::NConcurrency
diff --git a/library/cpp/yt/threading/unittests/ya.make b/library/cpp/yt/threading/unittests/ya.make
index da006012c00..ef9b5d29951 100644
--- a/library/cpp/yt/threading/unittests/ya.make
+++ b/library/cpp/yt/threading/unittests/ya.make
@@ -5,14 +5,9 @@ INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc)
SRCS(
count_down_latch_ut.cpp
recursive_spin_lock_ut.cpp
- rw_spin_lock_ut.cpp
spin_wait_ut.cpp
)
-IF (NOT OS_WINDOWS)
- SRC(spin_lock_fork_ut.cpp)
-ENDIF()
-
PEERDIR(
library/cpp/yt/assert
library/cpp/yt/threading