summaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorpavook <[email protected]>2025-04-10 15:42:18 +0300
committerpavook <[email protected]>2025-04-10 15:57:08 +0300
commite4ca0cae001c7a0cc845bca18daff015123a1294 (patch)
tree2c97822eea81dca4b8294a10c818101e65acc34f /library/cpp
parent649e2ccd071e7d216ad38e2dec8f3bee0918a392 (diff)
YT-24537: Prioritize writers in TReaderWriterSpinLock
commit_hash:94fee5363799655628bd7e2c144a7869a9d89002
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/yt/threading/rw_spin_lock-inl.h35
-rw-r--r--library/cpp/yt/threading/rw_spin_lock.h27
-rw-r--r--library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp56
-rw-r--r--library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp160
-rw-r--r--library/cpp/yt/threading/unittests/ya.make5
5 files changed, 266 insertions, 17 deletions
diff --git a/library/cpp/yt/threading/rw_spin_lock-inl.h b/library/cpp/yt/threading/rw_spin_lock-inl.h
index 779de1b64a8..0811bf548d6 100644
--- a/library/cpp/yt/threading/rw_spin_lock-inl.h
+++ b/library/cpp/yt/threading/rw_spin_lock-inl.h
@@ -31,7 +31,7 @@ inline void TReaderWriterSpinLock::AcquireReaderForkFriendly() noexcept
inline void TReaderWriterSpinLock::ReleaseReader() noexcept
{
auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release);
- Y_ASSERT((prevValue & ~WriterMask) != 0);
+ Y_ASSERT((prevValue & ~(WriterMask | WriterReadyMask)) != 0);
NDetail::RecordSpinLockReleased();
}
@@ -45,7 +45,7 @@ inline void TReaderWriterSpinLock::AcquireWriter() noexcept
inline void TReaderWriterSpinLock::ReleaseWriter() noexcept
{
- auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release);
+ auto prevValue = Value_.fetch_and(~(WriterMask | WriterReadyMask), std::memory_order::release);
Y_ASSERT(prevValue & WriterMask);
NDetail::RecordSpinLockReleased();
}
@@ -68,7 +68,7 @@ inline bool TReaderWriterSpinLock::IsLockedByWriter() const noexcept
inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept
{
auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire);
- if ((oldValue & WriterMask) != 0) {
+ if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed);
return false;
}
@@ -79,7 +79,7 @@ inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept
inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept
{
auto oldValue = Value_.load(std::memory_order::relaxed);
- if ((oldValue & WriterMask) != 0) {
+ if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
return false;
}
return TryAcquireReader();
@@ -88,7 +88,7 @@ inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept
inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept
{
auto oldValue = Value_.load(std::memory_order::relaxed);
- if ((oldValue & WriterMask) != 0) {
+ if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
return false;
}
auto newValue = oldValue + ReaderDelta;
@@ -98,22 +98,35 @@ inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept
return acquired;
}
-inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept
+inline bool TReaderWriterSpinLock::TryAcquireWriterWithExpectedValue(TValue expected) noexcept
{
- auto expected = UnlockedValue;
-
- bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire);
+ bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire);
NDetail::RecordSpinLockAcquired(acquired);
return acquired;
}
+inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept
+{
+ // NB(pavook): we cannot expect writer ready to be set, as this method
+ // might be called without indicating writer readiness and we cannot
+ // indicate readiness on the hot path. This means that code calling
+ // TryAcquireWriter will spin against code calling AcquireWriter.
+ return TryAcquireWriterWithExpectedValue(UnlockedValue);
+}
+
inline bool TReaderWriterSpinLock::TryAndTryAcquireWriter() noexcept
{
auto oldValue = Value_.load(std::memory_order::relaxed);
- if (oldValue != UnlockedValue) {
+
+ if ((oldValue & WriterReadyMask) == 0) {
+ oldValue = Value_.fetch_or(WriterReadyMask, std::memory_order::relaxed);
+ }
+
+ if ((oldValue & (~WriterReadyMask)) != 0) {
return false;
}
- return TryAcquireWriter();
+
+ return TryAcquireWriterWithExpectedValue(WriterReadyMask);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h
index a915e677e82..367c3952786 100644
--- a/library/cpp/yt/threading/rw_spin_lock.h
+++ b/library/cpp/yt/threading/rw_spin_lock.h
@@ -16,8 +16,16 @@ namespace NYT::NThreading {
//! Single-writer multiple-readers spin lock.
/*!
- * Reader-side calls are pretty cheap.
- * The lock is unfair.
+ * Reader-side acquires are pretty cheap, and readers don't spin unless writers
+ * are present.
+ *
+ * The lock is unfair, but writers are prioritized over readers, that is,
+ * if AcquireWriter() is called at some time, then some writer
+ * (not necessarily the same one that called AcquireWriter) will succeed
+ * in the next time.
+ *
+ * WARNING: You should not use this lock if forks are possible: see
+ * fork_aware_rw_spin_lock.h for a proper fork-safe lock.
*/
class TReaderWriterSpinLock
: public TSpinLockBase
@@ -29,18 +37,23 @@ public:
/*!
* Optimized for the case of read-intensive workloads.
* Cheap (just one atomic increment and no spinning if no writers are present).
+ *
* Don't use this call if forks are possible: forking at some
* intermediate point inside #AcquireReader may corrupt the lock state and
- * leave lock forever stuck for the child process.
+ * leave the lock stuck forever for the child process.
*/
void AcquireReader() noexcept;
//! Acquires the reader lock.
/*!
* A more expensive version of #AcquireReader (includes at least
* one atomic load and CAS; also may spin even if just readers are present).
+ *
* In contrast to #AcquireReader, this method can be used in the presence of forks.
- * Note that fork-friendliness alone does not provide fork-safety: additional
- * actions must be performed to release the lock after a fork.
+ *
+ * WARNING: fork-friendliness alone does not provide fork-safety: additional
+ * actions must be performed to release the lock after a fork. This means you
+ * probably should NOT use this lock in the presence of forks, consider
+ * fork_aware_rw_spin_lock.h instead as a proper fork-safe lock.
*/
void AcquireReaderForkFriendly() noexcept;
//! Tries acquiring the reader lock; see #AcquireReader.
@@ -94,10 +107,12 @@ private:
using TValue = ui32;
static constexpr TValue UnlockedValue = 0;
static constexpr TValue WriterMask = 1;
- static constexpr TValue ReaderDelta = 2;
+ static constexpr TValue WriterReadyMask = 2;
+ static constexpr TValue ReaderDelta = 4;
std::atomic<TValue> Value_ = UnlockedValue;
+ bool TryAcquireWriterWithExpectedValue(TValue expected) noexcept;
bool TryAndTryAcquireReader() noexcept;
bool TryAndTryAcquireWriter() noexcept;
diff --git a/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp
new file mode 100644
index 00000000000..653772604ce
--- /dev/null
+++ b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp
@@ -0,0 +1,56 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yt/threading/rw_spin_lock.h>
+
+#include <util/thread/pool.h>
+
+#include <latch>
+#include <thread>
+
+namespace NYT::NThreading {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TReaderWriterSpinLockTest, WriterPriority)
+{
+ int readerThreads = 10;
+ std::latch latch(readerThreads + 1);
+ std::atomic<size_t> finishedCount = {0};
+
+ TReaderWriterSpinLock lock;
+
+ volatile std::atomic<ui32> x = {0};
+
+ auto readerTask = [&latch, &lock, &finishedCount, &x] () {
+ latch.arrive_and_wait();
+ while (true) {
+ {
+ auto guard = ReaderGuard(lock);
+ // do some stuff
+ for (ui32 i = 0; i < 10'000u; ++i) {
+ x.fetch_add(i);
+ }
+ }
+ if (finishedCount.fetch_add(1) > 20'000) {
+ break;
+ }
+ }
+ };
+
+ auto readerPool = CreateThreadPool(readerThreads);
+ for (int i = 0; i < readerThreads; ++i) {
+ readerPool->SafeAddFunc(readerTask);
+ }
+
+ latch.arrive_and_wait();
+ while (finishedCount.load() == 0);
+ auto guard = WriterGuard(lock);
+ EXPECT_LE(finishedCount.load(), 1'000u);
+ DoNotOptimizeAway(x);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NConcurrency
diff --git a/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp
new file mode 100644
index 00000000000..26e58fff745
--- /dev/null
+++ b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp
@@ -0,0 +1,160 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yt/threading/rw_spin_lock.h>
+#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
+
+#include <util/thread/pool.h>
+
+#include <sys/wait.h>
+
+namespace NYT::NThreading {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TReaderWriterSpinLockTest, ForkFriendlyness)
+{
+ std::atomic<bool> stopped = {false};
+ YT_DECLARE_SPIN_LOCK(TReaderWriterSpinLock, lock);
+
+ auto readerTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ ForkFriendlyReaderGuard(lock);
+ }
+ };
+
+ auto tryReaderTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ // NB(pavook): TryAcquire instead of Acquire to minimize checks.
+ bool acquired = lock.TryAcquireReaderForkFriendly();
+ if (acquired) {
+ lock.ReleaseReader();
+ }
+ }
+ };
+
+ auto tryWriterTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ Sleep(TDuration::MicroSeconds(1));
+ bool acquired = lock.TryAcquireWriter();
+ if (acquired) {
+ lock.ReleaseWriter();
+ }
+ }
+ };
+
+ auto writerTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ Sleep(TDuration::MicroSeconds(1));
+ WriterGuard(lock);
+ }
+ };
+
+ int readerCount = 20;
+ int writerCount = 10;
+
+ auto reader = CreateThreadPool(readerCount);
+ auto writer = CreateThreadPool(writerCount);
+
+ for (int i = 0; i < readerCount / 2; ++i) {
+ reader->SafeAddFunc(readerTask);
+ reader->SafeAddFunc(tryReaderTask);
+ }
+ for (int i = 0; i < writerCount / 2; ++i) {
+ writer->SafeAddFunc(writerTask);
+ writer->SafeAddFunc(tryWriterTask);
+ }
+
+ // And let the chaos begin!
+ int forkCount = 2000;
+ for (int iter = 1; iter <= forkCount; ++iter) {
+ pid_t pid;
+ {
+ auto guard = WriterGuard(lock);
+ pid = fork();
+ }
+
+ YT_VERIFY(pid >= 0);
+
+ // NB(pavook): check different orders to maximize chaos.
+ if (iter % 2 == 0) {
+ ReaderGuard(lock);
+ }
+ WriterGuard(lock);
+ ReaderGuard(lock);
+ if (pid == 0) {
+ // NB(pavook): thread pools are no longer with us.
+ _exit(0);
+ }
+ }
+
+ for (int i = 1; i <= forkCount; ++i) {
+ int status;
+ YT_VERIFY(waitpid(0, &status, 0) > 0);
+ YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0);
+ }
+
+ stopped.store(true);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TForkAwareSpinLockTest, ForkSafety)
+{
+ std::atomic<bool> stopped = {false};
+ YT_DECLARE_SPIN_LOCK(TForkAwareSpinLock, lock);
+
+ auto acquireTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ Guard(lock);
+ }
+ };
+
+ // NB(pavook): TryAcquire instead of Acquire to minimize checks.
+ auto tryAcquireTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ bool acquired = lock.TryAcquire();
+ if (acquired) {
+ lock.Release();
+ }
+ }
+ };
+
+ int workerCount = 20;
+
+ auto worker = CreateThreadPool(workerCount);
+
+ for (int i = 0; i < workerCount / 2; ++i) {
+ worker->SafeAddFunc(acquireTask);
+ worker->SafeAddFunc(tryAcquireTask);
+ }
+
+ // And let the chaos begin!
+ int forkCount = 2000;
+ for (int iter = 1; iter <= forkCount; ++iter) {
+ pid_t pid = fork();
+
+ YT_VERIFY(pid >= 0);
+
+ Guard(lock);
+ Guard(lock);
+
+ if (pid == 0) {
+ // NB(pavook): thread pools are no longer with us.
+ _exit(0);
+ }
+ }
+
+ for (int i = 1; i <= forkCount; ++i) {
+ int status;
+ YT_VERIFY(waitpid(0, &status, 0) > 0);
+ YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0);
+ }
+
+ stopped.store(true);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NConcurrency
diff --git a/library/cpp/yt/threading/unittests/ya.make b/library/cpp/yt/threading/unittests/ya.make
index ef9b5d29951..da006012c00 100644
--- a/library/cpp/yt/threading/unittests/ya.make
+++ b/library/cpp/yt/threading/unittests/ya.make
@@ -5,9 +5,14 @@ INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc)
SRCS(
count_down_latch_ut.cpp
recursive_spin_lock_ut.cpp
+ rw_spin_lock_ut.cpp
spin_wait_ut.cpp
)
+IF (NOT OS_WINDOWS)
+ SRC(spin_lock_fork_ut.cpp)
+ENDIF()
+
PEERDIR(
library/cpp/yt/assert
library/cpp/yt/threading