aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-04-18 00:51:50 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-04-18 00:51:50 +0000
commitfcf98cbcba210753db1754ca6e28c295c535ffbb (patch)
tree1b529bd303f9c788e4f398933d7deb47cfd8c3b2 /library/cpp
parent1f62eb9b72a10d093a2acd13c434a8a94fa695bc (diff)
parent8e5325590b3037c576e7f9981903f5112e181ffe (diff)
downloadydb-fcf98cbcba210753db1754ca6e28c295c535ffbb.tar.gz
Merge branch 'rightlib' into merge-libs-250418-0050
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/containers/dense_hash/dense_hash.h10
-rw-r--r--library/cpp/monlib/dynamic_counters/page.cpp91
-rw-r--r--library/cpp/tld/tlds-alpha-by-domain.txt2
-rw-r--r--library/cpp/yt/threading/atomic_object.h2
-rw-r--r--library/cpp/yt/threading/rw_spin_lock-inl.h37
-rw-r--r--library/cpp/yt/threading/rw_spin_lock.h39
-rw-r--r--library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp56
-rw-r--r--library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp160
-rw-r--r--library/cpp/yt/threading/unittests/ya.make5
-rw-r--r--library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h101
-rw-r--r--library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp25
-rw-r--r--library/cpp/yt/threading/writer_starving_rw_spin_lock.h115
-rw-r--r--library/cpp/yt/threading/ya.make1
13 files changed, 593 insertions, 51 deletions
diff --git a/library/cpp/containers/dense_hash/dense_hash.h b/library/cpp/containers/dense_hash/dense_hash.h
index 739479c25a..b5feb16eef 100644
--- a/library/cpp/containers/dense_hash/dense_hash.h
+++ b/library/cpp/containers/dense_hash/dense_hash.h
@@ -168,14 +168,10 @@ public:
} else {
initSize = FastClp2(initSize);
}
- BucketMask = initSize - 1;
+ Buckets.clear();
+ BucketMask = 0;
NumFilled = 0;
- TVector<value_type> tmp;
- for (size_type i = 0; i < initSize; ++i) {
- tmp.emplace_back(EmptyMarker, mapped_type{});
- }
- tmp.swap(Buckets);
- GrowThreshold = Max<size_type>(1, initSize * MaxLoadFactor / 100) - 1;
+ Grow(initSize);
}
template <class K>
diff --git a/library/cpp/monlib/dynamic_counters/page.cpp b/library/cpp/monlib/dynamic_counters/page.cpp
index 5cd750026f..73b1309d81 100644
--- a/library/cpp/monlib/dynamic_counters/page.cpp
+++ b/library/cpp/monlib/dynamic_counters/page.cpp
@@ -4,6 +4,7 @@
#include <library/cpp/monlib/service/pages/templates.h>
#include <library/cpp/string_utils/quote/quote.h>
+#include <util/string/builder.h>
#include <util/string/split.h>
#include <util/system/tls.h>
@@ -26,6 +27,19 @@ TMaybe<EFormat> ParseFormat(TStringBuf str) {
}
}
+namespace {
+
+TStringBuf GetParams(NMonitoring::IMonHttpRequest& request) {
+ TStringBuf uri = request.GetUri();
+ TStringBuf params = uri.After('?');
+ if (params.Size() == uri.Size()) {
+ params.Clear();
+ }
+ return params;
+}
+
+}
+
void TDynamicCountersPage::Output(NMonitoring::IMonHttpRequest& request) {
if (OutputCallback) {
OutputCallback();
@@ -37,28 +51,51 @@ void TDynamicCountersPage::Output(NMonitoring::IMonHttpRequest& request) {
};
TVector<TStringBuf> parts;
- StringSplitter(request.GetPathInfo())
- .Split('/')
- .SkipEmpty()
- .Collect(&parts);
-
- TMaybe<EFormat> format = !parts.empty() ? ParseFormat(parts.back()) : Nothing();
- if (format) {
- parts.pop_back();
- }
+ TMaybe<EFormat> format;
+ TStringBuf params = GetParams(request);
+
+ if (request.GetPathInfo().empty() && !params.empty()) {
+ StringSplitter(params).Split('&').SkipEmpty().Consume([&](TStringBuf part) {
+ TStringBuf name;
+ TStringBuf value;
+ part.Split('=', name, value);
+ if (name.StartsWith("@")) {
+ if (name == "@format") {
+ format = ParseFormat(value);
+ } else if (name == "@name_label") {
+ nameLabel = value;
+ } else if (name == "@private") {
+ visibility = TCountableBase::EVisibility::Private;
+ }
+ } else {
+ parts.push_back(part);
+ }
+ return true;
+ });
+ } else {
+ StringSplitter(request.GetPathInfo())
+ .Split('/')
+ .SkipEmpty()
+ .Collect(&parts);
+
+ format = !parts.empty() ? ParseFormat(parts.back()) : Nothing();
+ if (format) {
+ parts.pop_back();
+ }
- if (!parts.empty() && parts.back().StartsWith(TStringBuf("name_label="))) {
- TVector<TString> labels;
- StringSplitter(parts.back()).Split('=').SkipEmpty().Collect(&labels);
- if (labels.size() == 2U) {
- nameLabel = labels.back();
+ if (!parts.empty() && parts.back().StartsWith(TStringBuf("name_label="))) {
+ TVector<TString> labels;
+ StringSplitter(parts.back()).Split('=').SkipEmpty().Collect(&labels);
+ if (labels.size() == 2U) {
+ nameLabel = labels.back();
+ }
+ parts.pop_back();
}
- parts.pop_back();
- }
- if (!parts.empty() && parts.back() == TStringBuf("private")) {
- visibility = TCountableBase::EVisibility::Private;
- parts.pop_back();
+ if (!parts.empty() && parts.back() == TStringBuf("private")) {
+ visibility = TCountableBase::EVisibility::Private;
+ parts.pop_back();
+ }
}
auto counters = Counters;
@@ -121,9 +158,15 @@ void TDynamicCountersPage::HandleAbsentSubgroup(IMonHttpRequest& request) {
void TDynamicCountersPage::BeforePre(IMonHttpRequest& request) {
IOutputStream& out = request.Output();
+ TStringBuf params = GetParams(request);
+ TStringBuilder base;
+ base << Path << '?';
+ if (!params.empty()) {
+ base << params << '&';
+ }
HTML(out) {
DIV() {
- out << "<a href='" << request.GetPath() << "/json'>Counters as JSON</a>";
+ out << "<a href='" << base << "@format=json'>Counters as JSON</a>";
out << " for Solomon";
}
@@ -133,9 +176,11 @@ void TDynamicCountersPage::BeforePre(IMonHttpRequest& request) {
UL() {
currentCounters->EnumerateSubgroups([&](const TString& name, const TString& value) {
LI() {
- TString pathPart = name + "=" + value;
- Quote(pathPart, "");
- out << "\n<a href='" << request.GetPath() << "/" << pathPart << "'>" << name << " " << value << "</a>";
+ auto escName = name;
+ auto escValue = value;
+ Quote(escName);
+ Quote(escValue);
+ out << "\n<a href='" << base << escName << '=' << escValue << "'>" << name << " " << value << "</a>";
}
});
}
diff --git a/library/cpp/tld/tlds-alpha-by-domain.txt b/library/cpp/tld/tlds-alpha-by-domain.txt
index 3353c0add6..88d9cd720d 100644
--- a/library/cpp/tld/tlds-alpha-by-domain.txt
+++ b/library/cpp/tld/tlds-alpha-by-domain.txt
@@ -1,4 +1,4 @@
-# Version 2025041300, Last Updated Sun Apr 13 07:07:01 2025 UTC
+# Version 2025041502, Last Updated Wed Apr 16 07:07:01 2025 UTC
AAA
AARP
ABB
diff --git a/library/cpp/yt/threading/atomic_object.h b/library/cpp/yt/threading/atomic_object.h
index 8b642c0f4f..a77ade0a00 100644
--- a/library/cpp/yt/threading/atomic_object.h
+++ b/library/cpp/yt/threading/atomic_object.h
@@ -1,6 +1,6 @@
#pragma once
-#include <library/cpp/yt/threading/rw_spin_lock.h>
+#include <library/cpp/yt/threading/writer_starving_rw_spin_lock.h>
#include <concepts>
diff --git a/library/cpp/yt/threading/rw_spin_lock-inl.h b/library/cpp/yt/threading/rw_spin_lock-inl.h
index 779de1b64a..0a31b1d9de 100644
--- a/library/cpp/yt/threading/rw_spin_lock-inl.h
+++ b/library/cpp/yt/threading/rw_spin_lock-inl.h
@@ -31,7 +31,7 @@ inline void TReaderWriterSpinLock::AcquireReaderForkFriendly() noexcept
inline void TReaderWriterSpinLock::ReleaseReader() noexcept
{
auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release);
- Y_ASSERT((prevValue & ~WriterMask) != 0);
+ Y_ASSERT((prevValue & ~(WriterMask | WriterReadyMask)) != 0);
NDetail::RecordSpinLockReleased();
}
@@ -45,14 +45,14 @@ inline void TReaderWriterSpinLock::AcquireWriter() noexcept
inline void TReaderWriterSpinLock::ReleaseWriter() noexcept
{
- auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release);
+ auto prevValue = Value_.fetch_and(~(WriterMask | WriterReadyMask), std::memory_order::release);
Y_ASSERT(prevValue & WriterMask);
NDetail::RecordSpinLockReleased();
}
inline bool TReaderWriterSpinLock::IsLocked() const noexcept
{
- return Value_.load() != UnlockedValue;
+ return (Value_.load() & ~WriterReadyMask) != 0;
}
inline bool TReaderWriterSpinLock::IsLockedByReader() const noexcept
@@ -68,7 +68,7 @@ inline bool TReaderWriterSpinLock::IsLockedByWriter() const noexcept
inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept
{
auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire);
- if ((oldValue & WriterMask) != 0) {
+ if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed);
return false;
}
@@ -79,7 +79,7 @@ inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept
inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept
{
auto oldValue = Value_.load(std::memory_order::relaxed);
- if ((oldValue & WriterMask) != 0) {
+ if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
return false;
}
return TryAcquireReader();
@@ -88,7 +88,7 @@ inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept
inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept
{
auto oldValue = Value_.load(std::memory_order::relaxed);
- if ((oldValue & WriterMask) != 0) {
+ if ((oldValue & (WriterMask | WriterReadyMask)) != 0) {
return false;
}
auto newValue = oldValue + ReaderDelta;
@@ -98,22 +98,35 @@ inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept
return acquired;
}
-inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept
+inline bool TReaderWriterSpinLock::TryAcquireWriterWithExpectedValue(TValue expected) noexcept
{
- auto expected = UnlockedValue;
-
- bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire);
+ bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire);
NDetail::RecordSpinLockAcquired(acquired);
return acquired;
}
+inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept
+{
+ // NB(pavook): we cannot expect writer ready to be set, as this method
+ // might be called without indicating writer readiness and we cannot
+ // indicate readiness on the hot path. This means that code calling
+ // TryAcquireWriter will spin against code calling AcquireWriter.
+ return TryAcquireWriterWithExpectedValue(UnlockedValue);
+}
+
inline bool TReaderWriterSpinLock::TryAndTryAcquireWriter() noexcept
{
auto oldValue = Value_.load(std::memory_order::relaxed);
- if (oldValue != UnlockedValue) {
+
+ if ((oldValue & WriterReadyMask) == 0) {
+ oldValue = Value_.fetch_or(WriterReadyMask, std::memory_order::relaxed);
+ }
+
+ if ((oldValue & (~WriterReadyMask)) != 0) {
return false;
}
- return TryAcquireWriter();
+
+ return TryAcquireWriterWithExpectedValue(WriterReadyMask);
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h
index a915e677e8..64a241bb6b 100644
--- a/library/cpp/yt/threading/rw_spin_lock.h
+++ b/library/cpp/yt/threading/rw_spin_lock.h
@@ -16,8 +16,23 @@ namespace NYT::NThreading {
//! Single-writer multiple-readers spin lock.
/*!
- * Reader-side calls are pretty cheap.
- * The lock is unfair.
+ * Reader-side acquires are pretty cheap, and readers don't spin unless writers
+ * are present.
+ *
+ * The lock is unfair, but writers are prioritized over readers, that is,
+ * if AcquireWriter() is called at some time, then some writer
+ * (not necessarily the same one that called AcquireWriter) will succeed
+ * in the next time. This is implemented by an additional flag "WriterReady",
+ * that writers set on arrival. No readers can proceed until this flag is reset.
+ *
+ * WARNING: You probably should not use this lock if forks are possible: see
+ * fork_aware_rw_spin_lock.h for a proper fork-safe lock which does the housekeeping for you.
+ *
+ * WARNING: This lock is not recursive: you can't call AcquireReader() twice in the same
+ * thread, as that may lead to a deadlock. For the same reason you shouldn't do WaitFor or any
+ * other context switch under lock.
+ *
+ * See tla+/spinlock.tla for the formally verified lock's properties.
*/
class TReaderWriterSpinLock
: public TSpinLockBase
@@ -29,18 +44,26 @@ public:
/*!
* Optimized for the case of read-intensive workloads.
* Cheap (just one atomic increment and no spinning if no writers are present).
- * Don't use this call if forks are possible: forking at some
+ *
+ * WARNING: Don't use this call if forks are possible: forking at some
* intermediate point inside #AcquireReader may corrupt the lock state and
- * leave lock forever stuck for the child process.
+ * leave the lock stuck forever for the child process.
+ *
+ * WARNING: The lock is not recursive/reentrant, i.e. it assumes that no thread calls
+ * AcquireReader() if the reader is already acquired for it.
*/
void AcquireReader() noexcept;
//! Acquires the reader lock.
/*!
* A more expensive version of #AcquireReader (includes at least
* one atomic load and CAS; also may spin even if just readers are present).
+ *
* In contrast to #AcquireReader, this method can be used in the presence of forks.
- * Note that fork-friendliness alone does not provide fork-safety: additional
- * actions must be performed to release the lock after a fork.
+ *
+ * WARNING: fork-friendliness alone does not provide fork-safety: additional
+ * actions must be performed to release the lock after a fork. This means you
+ * probably should NOT use this lock in the presence of forks, consider
+ * fork_aware_rw_spin_lock.h instead as a proper fork-safe lock.
*/
void AcquireReaderForkFriendly() noexcept;
//! Tries acquiring the reader lock; see #AcquireReader.
@@ -94,10 +117,12 @@ private:
using TValue = ui32;
static constexpr TValue UnlockedValue = 0;
static constexpr TValue WriterMask = 1;
- static constexpr TValue ReaderDelta = 2;
+ static constexpr TValue WriterReadyMask = 2;
+ static constexpr TValue ReaderDelta = 4;
std::atomic<TValue> Value_ = UnlockedValue;
+ bool TryAcquireWriterWithExpectedValue(TValue expected) noexcept;
bool TryAndTryAcquireReader() noexcept;
bool TryAndTryAcquireWriter() noexcept;
diff --git a/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp
new file mode 100644
index 0000000000..653772604c
--- /dev/null
+++ b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp
@@ -0,0 +1,56 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yt/threading/rw_spin_lock.h>
+
+#include <util/thread/pool.h>
+
+#include <latch>
+#include <thread>
+
+namespace NYT::NThreading {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TReaderWriterSpinLockTest, WriterPriority)
+{
+ int readerThreads = 10;
+ std::latch latch(readerThreads + 1);
+ std::atomic<size_t> finishedCount = {0};
+
+ TReaderWriterSpinLock lock;
+
+ volatile std::atomic<ui32> x = {0};
+
+ auto readerTask = [&latch, &lock, &finishedCount, &x] () {
+ latch.arrive_and_wait();
+ while (true) {
+ {
+ auto guard = ReaderGuard(lock);
+ // do some stuff
+ for (ui32 i = 0; i < 10'000u; ++i) {
+ x.fetch_add(i);
+ }
+ }
+ if (finishedCount.fetch_add(1) > 20'000) {
+ break;
+ }
+ }
+ };
+
+ auto readerPool = CreateThreadPool(readerThreads);
+ for (int i = 0; i < readerThreads; ++i) {
+ readerPool->SafeAddFunc(readerTask);
+ }
+
+ latch.arrive_and_wait();
+ while (finishedCount.load() == 0);
+ auto guard = WriterGuard(lock);
+ EXPECT_LE(finishedCount.load(), 1'000u);
+ DoNotOptimizeAway(x);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NConcurrency
diff --git a/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp
new file mode 100644
index 0000000000..26e58fff74
--- /dev/null
+++ b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp
@@ -0,0 +1,160 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yt/threading/rw_spin_lock.h>
+#include <library/cpp/yt/threading/fork_aware_spin_lock.h>
+
+#include <util/thread/pool.h>
+
+#include <sys/wait.h>
+
+namespace NYT::NThreading {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TReaderWriterSpinLockTest, ForkFriendlyness)
+{
+ std::atomic<bool> stopped = {false};
+ YT_DECLARE_SPIN_LOCK(TReaderWriterSpinLock, lock);
+
+ auto readerTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ ForkFriendlyReaderGuard(lock);
+ }
+ };
+
+ auto tryReaderTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ // NB(pavook): TryAcquire instead of Acquire to minimize checks.
+ bool acquired = lock.TryAcquireReaderForkFriendly();
+ if (acquired) {
+ lock.ReleaseReader();
+ }
+ }
+ };
+
+ auto tryWriterTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ Sleep(TDuration::MicroSeconds(1));
+ bool acquired = lock.TryAcquireWriter();
+ if (acquired) {
+ lock.ReleaseWriter();
+ }
+ }
+ };
+
+ auto writerTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ Sleep(TDuration::MicroSeconds(1));
+ WriterGuard(lock);
+ }
+ };
+
+ int readerCount = 20;
+ int writerCount = 10;
+
+ auto reader = CreateThreadPool(readerCount);
+ auto writer = CreateThreadPool(writerCount);
+
+ for (int i = 0; i < readerCount / 2; ++i) {
+ reader->SafeAddFunc(readerTask);
+ reader->SafeAddFunc(tryReaderTask);
+ }
+ for (int i = 0; i < writerCount / 2; ++i) {
+ writer->SafeAddFunc(writerTask);
+ writer->SafeAddFunc(tryWriterTask);
+ }
+
+ // And let the chaos begin!
+ int forkCount = 2000;
+ for (int iter = 1; iter <= forkCount; ++iter) {
+ pid_t pid;
+ {
+ auto guard = WriterGuard(lock);
+ pid = fork();
+ }
+
+ YT_VERIFY(pid >= 0);
+
+ // NB(pavook): check different orders to maximize chaos.
+ if (iter % 2 == 0) {
+ ReaderGuard(lock);
+ }
+ WriterGuard(lock);
+ ReaderGuard(lock);
+ if (pid == 0) {
+ // NB(pavook): thread pools are no longer with us.
+ _exit(0);
+ }
+ }
+
+ for (int i = 1; i <= forkCount; ++i) {
+ int status;
+ YT_VERIFY(waitpid(0, &status, 0) > 0);
+ YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0);
+ }
+
+ stopped.store(true);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TForkAwareSpinLockTest, ForkSafety)
+{
+ std::atomic<bool> stopped = {false};
+ YT_DECLARE_SPIN_LOCK(TForkAwareSpinLock, lock);
+
+ auto acquireTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ Guard(lock);
+ }
+ };
+
+ // NB(pavook): TryAcquire instead of Acquire to minimize checks.
+ auto tryAcquireTask = [&lock, &stopped] () {
+ while (!stopped.load()) {
+ bool acquired = lock.TryAcquire();
+ if (acquired) {
+ lock.Release();
+ }
+ }
+ };
+
+ int workerCount = 20;
+
+ auto worker = CreateThreadPool(workerCount);
+
+ for (int i = 0; i < workerCount / 2; ++i) {
+ worker->SafeAddFunc(acquireTask);
+ worker->SafeAddFunc(tryAcquireTask);
+ }
+
+ // And let the chaos begin!
+ int forkCount = 2000;
+ for (int iter = 1; iter <= forkCount; ++iter) {
+ pid_t pid = fork();
+
+ YT_VERIFY(pid >= 0);
+
+ Guard(lock);
+ Guard(lock);
+
+ if (pid == 0) {
+ // NB(pavook): thread pools are no longer with us.
+ _exit(0);
+ }
+ }
+
+ for (int i = 1; i <= forkCount; ++i) {
+ int status;
+ YT_VERIFY(waitpid(0, &status, 0) > 0);
+ YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0);
+ }
+
+ stopped.store(true);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NConcurrency
diff --git a/library/cpp/yt/threading/unittests/ya.make b/library/cpp/yt/threading/unittests/ya.make
index ef9b5d2995..da006012c0 100644
--- a/library/cpp/yt/threading/unittests/ya.make
+++ b/library/cpp/yt/threading/unittests/ya.make
@@ -5,9 +5,14 @@ INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc)
SRCS(
count_down_latch_ut.cpp
recursive_spin_lock_ut.cpp
+ rw_spin_lock_ut.cpp
spin_wait_ut.cpp
)
+IF (NOT OS_WINDOWS)
+ SRC(spin_lock_fork_ut.cpp)
+ENDIF()
+
PEERDIR(
library/cpp/yt/assert
library/cpp/yt/threading
diff --git a/library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h b/library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h
new file mode 100644
index 0000000000..cf8bde715c
--- /dev/null
+++ b/library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h
@@ -0,0 +1,101 @@
+#pragma once
+#ifndef WRITER_STARVING_RW_SPIN_LOCK_INL_H_
+#error "Direct inclusion of this file is not allowed, include rw_spin_lock.h"
+// For the sake of sane code completion.
+#include "writer_starving_rw_spin_lock.h"
+#endif
+#undef WRITER_STARVING_RW_SPIN_LOCK_INL_H_
+
+#include "spin_wait.h"
+
+namespace NYT::NThreading {
+
+////////////////////////////////////////////////////////////////////////////////
+
+inline void TWriterStarvingRWSpinLock::AcquireReader() noexcept
+{
+ if (TryAcquireReader()) {
+ return;
+ }
+ AcquireReaderSlow();
+}
+
+inline void TWriterStarvingRWSpinLock::ReleaseReader() noexcept
+{
+ auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release);
+ Y_ASSERT((prevValue & ~WriterMask) != 0);
+ NDetail::RecordSpinLockReleased();
+}
+
+inline void TWriterStarvingRWSpinLock::AcquireWriter() noexcept
+{
+ if (TryAcquireWriter()) {
+ return;
+ }
+ AcquireWriterSlow();
+}
+
+inline void TWriterStarvingRWSpinLock::ReleaseWriter() noexcept
+{
+ auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release);
+ Y_ASSERT(prevValue & WriterMask);
+ NDetail::RecordSpinLockReleased();
+}
+
+inline bool TWriterStarvingRWSpinLock::IsLocked() const noexcept
+{
+ return Value_.load() != UnlockedValue;
+}
+
+inline bool TWriterStarvingRWSpinLock::IsLockedByReader() const noexcept
+{
+ return Value_.load() >= ReaderDelta;
+}
+
+inline bool TWriterStarvingRWSpinLock::IsLockedByWriter() const noexcept
+{
+ return (Value_.load() & WriterMask) != 0;
+}
+
+inline bool TWriterStarvingRWSpinLock::TryAcquireReader() noexcept
+{
+ auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire);
+ if ((oldValue & WriterMask) != 0) {
+ Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed);
+ return false;
+ }
+ NDetail::RecordSpinLockAcquired();
+ return true;
+}
+
+inline bool TWriterStarvingRWSpinLock::TryAndTryAcquireReader() noexcept
+{
+ auto oldValue = Value_.load(std::memory_order::relaxed);
+ if ((oldValue & WriterMask) != 0) {
+ return false;
+ }
+ return TryAcquireReader();
+}
+
+inline bool TWriterStarvingRWSpinLock::TryAcquireWriter() noexcept
+{
+ auto expected = UnlockedValue;
+
+ bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire);
+ NDetail::RecordSpinLockAcquired(acquired);
+ return acquired;
+}
+
+inline bool TWriterStarvingRWSpinLock::TryAndTryAcquireWriter() noexcept
+{
+ auto oldValue = Value_.load(std::memory_order::relaxed);
+ if (oldValue != UnlockedValue) {
+ return false;
+ }
+ return TryAcquireWriter();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NThreading
+
diff --git a/library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp b/library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp
new file mode 100644
index 0000000000..74c9f59db1
--- /dev/null
+++ b/library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp
@@ -0,0 +1,25 @@
+#include "writer_starving_rw_spin_lock.h"
+
+namespace NYT::NThreading {
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TWriterStarvingRWSpinLock::AcquireReaderSlow() noexcept
+{
+ TSpinWait spinWait(Location_, ESpinLockActivityKind::Read);
+ while (!TryAndTryAcquireReader()) {
+ spinWait.Wait();
+ }
+}
+
+void TWriterStarvingRWSpinLock::AcquireWriterSlow() noexcept
+{
+ TSpinWait spinWait(Location_, ESpinLockActivityKind::Write);
+ while (!TryAndTryAcquireWriter()) {
+ spinWait.Wait();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NThreading
diff --git a/library/cpp/yt/threading/writer_starving_rw_spin_lock.h b/library/cpp/yt/threading/writer_starving_rw_spin_lock.h
new file mode 100644
index 0000000000..8a456afe21
--- /dev/null
+++ b/library/cpp/yt/threading/writer_starving_rw_spin_lock.h
@@ -0,0 +1,115 @@
+#pragma once
+
+#include "public.h"
+#include "rw_spin_lock.h"
+#include "spin_lock_base.h"
+#include "spin_lock_count.h"
+
+#include <library/cpp/yt/memory/public.h>
+
+#include <util/system/rwlock.h>
+
+#include <atomic>
+
+namespace NYT::NThreading {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// TODO(pavook): deprecate it.
+
+//! Single-writer multiple-readers spin lock.
+/*!
+ * Reader-side calls are pretty cheap.
+ * WARNING: The lock is unfair, and readers can starve writers. See rw_spin_lock.h for a writer-prioritized lock.
+ * WARNING: Never use the bare lock if forks are possible: see fork_aware_rw_spin_lock.h for a fork-safe lock.
+ * Unlike rw_spin_lock.h, reader-side is reentrant here: it is possible to acquire the **reader** lock multiple times
+ * even in the single thread.
+ * This doesn't mean you should do it: in fact, you shouldn't: use separate locks for separate entities.
+ * If you see this class in your code, try migrating to the proper rw_spin_lock.h after ensuring you don't rely on
+ * reentrant locking.
+ */
+class TWriterStarvingRWSpinLock
+ : public TSpinLockBase
+{
+public:
+ using TSpinLockBase::TSpinLockBase;
+
+ //! Acquires the reader lock.
+ /*!
+ * Optimized for the case of read-intensive workloads.
+ * Cheap (just one atomic increment and no spinning if no writers are present).
+ * Don't use this call if forks are possible: forking at some
+ * intermediate point inside #AcquireReader may corrupt the lock state and
+ * leave lock forever stuck for the child process.
+ */
+ void AcquireReader() noexcept;
+ //! Tries acquiring the reader lock; see #AcquireReader.
+ //! Returns |true| on success.
+ bool TryAcquireReader() noexcept;
+ //! Releases the reader lock.
+ /*!
+ * Cheap (just one atomic decrement).
+ */
+ void ReleaseReader() noexcept;
+
+ //! Acquires the writer lock.
+ /*!
+ * Rather cheap (just one CAS).
+ */
+ void AcquireWriter() noexcept;
+ //! Tries acquiring the writer lock; see #AcquireWriter.
+ //! Returns |true| on success.
+ bool TryAcquireWriter() noexcept;
+ //! Releases the writer lock.
+ /*!
+ * Cheap (just one atomic store).
+ */
+ void ReleaseWriter() noexcept;
+
+ //! Returns true if the lock is taken (either by a reader or writer).
+ /*!
+ * This is inherently racy.
+ * Only use for debugging and diagnostic purposes.
+ */
+ bool IsLocked() const noexcept;
+
+ //! Returns true if the lock is taken by reader.
+ /*!
+ * This is inherently racy.
+ * Only use for debugging and diagnostic purposes.
+ */
+ bool IsLockedByReader() const noexcept;
+
+ //! Returns true if the lock is taken by writer.
+ /*!
+ * This is inherently racy.
+ * Only use for debugging and diagnostic purposes.
+ */
+ bool IsLockedByWriter() const noexcept;
+
+private:
+ using TValue = ui32;
+ static constexpr TValue UnlockedValue = 0;
+ static constexpr TValue WriterMask = 1;
+ static constexpr TValue ReaderDelta = 2;
+
+ std::atomic<TValue> Value_ = UnlockedValue;
+
+
+ bool TryAndTryAcquireReader() noexcept;
+ bool TryAndTryAcquireWriter() noexcept;
+
+ void AcquireReaderSlow() noexcept;
+ void AcquireWriterSlow() noexcept;
+};
+
+REGISTER_TRACKED_SPIN_LOCK_CLASS(TWriterStarvingRWSpinLock)
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NThreading
+
+#define WRITER_STARVING_RW_SPIN_LOCK_INL_H_
+#include "writer_starving_rw_spin_lock-inl.h"
+#undef WRITER_STARVING_RW_SPIN_LOCK_INL_H_
+
diff --git a/library/cpp/yt/threading/ya.make b/library/cpp/yt/threading/ya.make
index cc11e7974e..d25f0a7068 100644
--- a/library/cpp/yt/threading/ya.make
+++ b/library/cpp/yt/threading/ya.make
@@ -18,6 +18,7 @@ SRCS(
spin_lock.cpp
spin_wait.cpp
spin_wait_hook.cpp
+ writer_starving_rw_spin_lock.cpp
)
PEERDIR(