diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-04-18 00:51:50 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-04-18 00:51:50 +0000 |
commit | fcf98cbcba210753db1754ca6e28c295c535ffbb (patch) | |
tree | 1b529bd303f9c788e4f398933d7deb47cfd8c3b2 /library/cpp | |
parent | 1f62eb9b72a10d093a2acd13c434a8a94fa695bc (diff) | |
parent | 8e5325590b3037c576e7f9981903f5112e181ffe (diff) | |
download | ydb-fcf98cbcba210753db1754ca6e28c295c535ffbb.tar.gz |
Merge branch 'rightlib' into merge-libs-250418-0050
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/containers/dense_hash/dense_hash.h | 10 | ||||
-rw-r--r-- | library/cpp/monlib/dynamic_counters/page.cpp | 91 | ||||
-rw-r--r-- | library/cpp/tld/tlds-alpha-by-domain.txt | 2 | ||||
-rw-r--r-- | library/cpp/yt/threading/atomic_object.h | 2 | ||||
-rw-r--r-- | library/cpp/yt/threading/rw_spin_lock-inl.h | 37 | ||||
-rw-r--r-- | library/cpp/yt/threading/rw_spin_lock.h | 39 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp | 56 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp | 160 | ||||
-rw-r--r-- | library/cpp/yt/threading/unittests/ya.make | 5 | ||||
-rw-r--r-- | library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h | 101 | ||||
-rw-r--r-- | library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp | 25 | ||||
-rw-r--r-- | library/cpp/yt/threading/writer_starving_rw_spin_lock.h | 115 | ||||
-rw-r--r-- | library/cpp/yt/threading/ya.make | 1 |
13 files changed, 593 insertions, 51 deletions
diff --git a/library/cpp/containers/dense_hash/dense_hash.h b/library/cpp/containers/dense_hash/dense_hash.h index 739479c25a..b5feb16eef 100644 --- a/library/cpp/containers/dense_hash/dense_hash.h +++ b/library/cpp/containers/dense_hash/dense_hash.h @@ -168,14 +168,10 @@ public: } else { initSize = FastClp2(initSize); } - BucketMask = initSize - 1; + Buckets.clear(); + BucketMask = 0; NumFilled = 0; - TVector<value_type> tmp; - for (size_type i = 0; i < initSize; ++i) { - tmp.emplace_back(EmptyMarker, mapped_type{}); - } - tmp.swap(Buckets); - GrowThreshold = Max<size_type>(1, initSize * MaxLoadFactor / 100) - 1; + Grow(initSize); } template <class K> diff --git a/library/cpp/monlib/dynamic_counters/page.cpp b/library/cpp/monlib/dynamic_counters/page.cpp index 5cd750026f..73b1309d81 100644 --- a/library/cpp/monlib/dynamic_counters/page.cpp +++ b/library/cpp/monlib/dynamic_counters/page.cpp @@ -4,6 +4,7 @@ #include <library/cpp/monlib/service/pages/templates.h> #include <library/cpp/string_utils/quote/quote.h> +#include <util/string/builder.h> #include <util/string/split.h> #include <util/system/tls.h> @@ -26,6 +27,19 @@ TMaybe<EFormat> ParseFormat(TStringBuf str) { } } +namespace { + +TStringBuf GetParams(NMonitoring::IMonHttpRequest& request) { + TStringBuf uri = request.GetUri(); + TStringBuf params = uri.After('?'); + if (params.Size() == uri.Size()) { + params.Clear(); + } + return params; +} + +} + void TDynamicCountersPage::Output(NMonitoring::IMonHttpRequest& request) { if (OutputCallback) { OutputCallback(); @@ -37,28 +51,51 @@ void TDynamicCountersPage::Output(NMonitoring::IMonHttpRequest& request) { }; TVector<TStringBuf> parts; - StringSplitter(request.GetPathInfo()) - .Split('/') - .SkipEmpty() - .Collect(&parts); - - TMaybe<EFormat> format = !parts.empty() ? ParseFormat(parts.back()) : Nothing(); - if (format) { - parts.pop_back(); - } + TMaybe<EFormat> format; + TStringBuf params = GetParams(request); + + if (request.GetPathInfo().empty() && !params.empty()) { + StringSplitter(params).Split('&').SkipEmpty().Consume([&](TStringBuf part) { + TStringBuf name; + TStringBuf value; + part.Split('=', name, value); + if (name.StartsWith("@")) { + if (name == "@format") { + format = ParseFormat(value); + } else if (name == "@name_label") { + nameLabel = value; + } else if (name == "@private") { + visibility = TCountableBase::EVisibility::Private; + } + } else { + parts.push_back(part); + } + return true; + }); + } else { + StringSplitter(request.GetPathInfo()) + .Split('/') + .SkipEmpty() + .Collect(&parts); + + format = !parts.empty() ? ParseFormat(parts.back()) : Nothing(); + if (format) { + parts.pop_back(); + } - if (!parts.empty() && parts.back().StartsWith(TStringBuf("name_label="))) { - TVector<TString> labels; - StringSplitter(parts.back()).Split('=').SkipEmpty().Collect(&labels); - if (labels.size() == 2U) { - nameLabel = labels.back(); + if (!parts.empty() && parts.back().StartsWith(TStringBuf("name_label="))) { + TVector<TString> labels; + StringSplitter(parts.back()).Split('=').SkipEmpty().Collect(&labels); + if (labels.size() == 2U) { + nameLabel = labels.back(); + } + parts.pop_back(); } - parts.pop_back(); - } - if (!parts.empty() && parts.back() == TStringBuf("private")) { - visibility = TCountableBase::EVisibility::Private; - parts.pop_back(); + if (!parts.empty() && parts.back() == TStringBuf("private")) { + visibility = TCountableBase::EVisibility::Private; + parts.pop_back(); + } } auto counters = Counters; @@ -121,9 +158,15 @@ void TDynamicCountersPage::HandleAbsentSubgroup(IMonHttpRequest& request) { void TDynamicCountersPage::BeforePre(IMonHttpRequest& request) { IOutputStream& out = request.Output(); + TStringBuf params = GetParams(request); + TStringBuilder base; + base << Path << '?'; + if (!params.empty()) { + base << params << '&'; + } HTML(out) { DIV() { - out << "<a href='" << request.GetPath() << "/json'>Counters as JSON</a>"; + out << "<a href='" << base << "@format=json'>Counters as JSON</a>"; out << " for Solomon"; } @@ -133,9 +176,11 @@ void TDynamicCountersPage::BeforePre(IMonHttpRequest& request) { UL() { currentCounters->EnumerateSubgroups([&](const TString& name, const TString& value) { LI() { - TString pathPart = name + "=" + value; - Quote(pathPart, ""); - out << "\n<a href='" << request.GetPath() << "/" << pathPart << "'>" << name << " " << value << "</a>"; + auto escName = name; + auto escValue = value; + Quote(escName); + Quote(escValue); + out << "\n<a href='" << base << escName << '=' << escValue << "'>" << name << " " << value << "</a>"; } }); } diff --git a/library/cpp/tld/tlds-alpha-by-domain.txt b/library/cpp/tld/tlds-alpha-by-domain.txt index 3353c0add6..88d9cd720d 100644 --- a/library/cpp/tld/tlds-alpha-by-domain.txt +++ b/library/cpp/tld/tlds-alpha-by-domain.txt @@ -1,4 +1,4 @@ -# Version 2025041300, Last Updated Sun Apr 13 07:07:01 2025 UTC +# Version 2025041502, Last Updated Wed Apr 16 07:07:01 2025 UTC AAA AARP ABB diff --git a/library/cpp/yt/threading/atomic_object.h b/library/cpp/yt/threading/atomic_object.h index 8b642c0f4f..a77ade0a00 100644 --- a/library/cpp/yt/threading/atomic_object.h +++ b/library/cpp/yt/threading/atomic_object.h @@ -1,6 +1,6 @@ #pragma once -#include <library/cpp/yt/threading/rw_spin_lock.h> +#include <library/cpp/yt/threading/writer_starving_rw_spin_lock.h> #include <concepts> diff --git a/library/cpp/yt/threading/rw_spin_lock-inl.h b/library/cpp/yt/threading/rw_spin_lock-inl.h index 779de1b64a..0a31b1d9de 100644 --- a/library/cpp/yt/threading/rw_spin_lock-inl.h +++ b/library/cpp/yt/threading/rw_spin_lock-inl.h @@ -31,7 +31,7 @@ inline void TReaderWriterSpinLock::AcquireReaderForkFriendly() noexcept inline void TReaderWriterSpinLock::ReleaseReader() noexcept { auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release); - Y_ASSERT((prevValue & ~WriterMask) != 0); + Y_ASSERT((prevValue & ~(WriterMask | WriterReadyMask)) != 0); NDetail::RecordSpinLockReleased(); } @@ -45,14 +45,14 @@ inline void TReaderWriterSpinLock::AcquireWriter() noexcept inline void TReaderWriterSpinLock::ReleaseWriter() noexcept { - auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release); + auto prevValue = Value_.fetch_and(~(WriterMask | WriterReadyMask), std::memory_order::release); Y_ASSERT(prevValue & WriterMask); NDetail::RecordSpinLockReleased(); } inline bool TReaderWriterSpinLock::IsLocked() const noexcept { - return Value_.load() != UnlockedValue; + return (Value_.load() & ~WriterReadyMask) != 0; } inline bool TReaderWriterSpinLock::IsLockedByReader() const noexcept @@ -68,7 +68,7 @@ inline bool TReaderWriterSpinLock::IsLockedByWriter() const noexcept inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept { auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire); - if ((oldValue & WriterMask) != 0) { + if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed); return false; } @@ -79,7 +79,7 @@ inline bool TReaderWriterSpinLock::TryAcquireReader() noexcept inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if ((oldValue & WriterMask) != 0) { + if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { return false; } return TryAcquireReader(); @@ -88,7 +88,7 @@ inline bool TReaderWriterSpinLock::TryAndTryAcquireReader() noexcept inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if ((oldValue & WriterMask) != 0) { + if ((oldValue & (WriterMask | WriterReadyMask)) != 0) { return false; } auto newValue = oldValue + ReaderDelta; @@ -98,22 +98,35 @@ inline bool TReaderWriterSpinLock::TryAcquireReaderForkFriendly() noexcept return acquired; } -inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept +inline bool TReaderWriterSpinLock::TryAcquireWriterWithExpectedValue(TValue expected) noexcept { - auto expected = UnlockedValue; - - bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire); + bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire); NDetail::RecordSpinLockAcquired(acquired); return acquired; } +inline bool TReaderWriterSpinLock::TryAcquireWriter() noexcept +{ + // NB(pavook): we cannot expect writer ready to be set, as this method + // might be called without indicating writer readiness and we cannot + // indicate readiness on the hot path. This means that code calling + // TryAcquireWriter will spin against code calling AcquireWriter. + return TryAcquireWriterWithExpectedValue(UnlockedValue); +} + inline bool TReaderWriterSpinLock::TryAndTryAcquireWriter() noexcept { auto oldValue = Value_.load(std::memory_order::relaxed); - if (oldValue != UnlockedValue) { + + if ((oldValue & WriterReadyMask) == 0) { + oldValue = Value_.fetch_or(WriterReadyMask, std::memory_order::relaxed); + } + + if ((oldValue & (~WriterReadyMask)) != 0) { return false; } - return TryAcquireWriter(); + + return TryAcquireWriterWithExpectedValue(WriterReadyMask); } //////////////////////////////////////////////////////////////////////////////// diff --git a/library/cpp/yt/threading/rw_spin_lock.h b/library/cpp/yt/threading/rw_spin_lock.h index a915e677e8..64a241bb6b 100644 --- a/library/cpp/yt/threading/rw_spin_lock.h +++ b/library/cpp/yt/threading/rw_spin_lock.h @@ -16,8 +16,23 @@ namespace NYT::NThreading { //! Single-writer multiple-readers spin lock. /*! - * Reader-side calls are pretty cheap. - * The lock is unfair. + * Reader-side acquires are pretty cheap, and readers don't spin unless writers + * are present. + * + * The lock is unfair, but writers are prioritized over readers, that is, + * if AcquireWriter() is called at some time, then some writer + * (not necessarily the same one that called AcquireWriter) will succeed + * in the next time. This is implemented by an additional flag "WriterReady", + * that writers set on arrival. No readers can proceed until this flag is reset. + * + * WARNING: You probably should not use this lock if forks are possible: see + * fork_aware_rw_spin_lock.h for a proper fork-safe lock which does the housekeeping for you. + * + * WARNING: This lock is not recursive: you can't call AcquireReader() twice in the same + * thread, as that may lead to a deadlock. For the same reason you shouldn't do WaitFor or any + * other context switch under lock. + * + * See tla+/spinlock.tla for the formally verified lock's properties. */ class TReaderWriterSpinLock : public TSpinLockBase @@ -29,18 +44,26 @@ public: /*! * Optimized for the case of read-intensive workloads. * Cheap (just one atomic increment and no spinning if no writers are present). - * Don't use this call if forks are possible: forking at some + * + * WARNING: Don't use this call if forks are possible: forking at some * intermediate point inside #AcquireReader may corrupt the lock state and - * leave lock forever stuck for the child process. + * leave the lock stuck forever for the child process. + * + * WARNING: The lock is not recursive/reentrant, i.e. it assumes that no thread calls + * AcquireReader() if the reader is already acquired for it. */ void AcquireReader() noexcept; //! Acquires the reader lock. /*! * A more expensive version of #AcquireReader (includes at least * one atomic load and CAS; also may spin even if just readers are present). + * * In contrast to #AcquireReader, this method can be used in the presence of forks. - * Note that fork-friendliness alone does not provide fork-safety: additional - * actions must be performed to release the lock after a fork. + * + * WARNING: fork-friendliness alone does not provide fork-safety: additional + * actions must be performed to release the lock after a fork. This means you + * probably should NOT use this lock in the presence of forks, consider + * fork_aware_rw_spin_lock.h instead as a proper fork-safe lock. */ void AcquireReaderForkFriendly() noexcept; //! Tries acquiring the reader lock; see #AcquireReader. @@ -94,10 +117,12 @@ private: using TValue = ui32; static constexpr TValue UnlockedValue = 0; static constexpr TValue WriterMask = 1; - static constexpr TValue ReaderDelta = 2; + static constexpr TValue WriterReadyMask = 2; + static constexpr TValue ReaderDelta = 4; std::atomic<TValue> Value_ = UnlockedValue; + bool TryAcquireWriterWithExpectedValue(TValue expected) noexcept; bool TryAndTryAcquireReader() noexcept; bool TryAndTryAcquireWriter() noexcept; diff --git a/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp new file mode 100644 index 0000000000..653772604c --- /dev/null +++ b/library/cpp/yt/threading/unittests/rw_spin_lock_ut.cpp @@ -0,0 +1,56 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yt/threading/rw_spin_lock.h> + +#include <util/thread/pool.h> + +#include <latch> +#include <thread> + +namespace NYT::NThreading { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TReaderWriterSpinLockTest, WriterPriority) +{ + int readerThreads = 10; + std::latch latch(readerThreads + 1); + std::atomic<size_t> finishedCount = {0}; + + TReaderWriterSpinLock lock; + + volatile std::atomic<ui32> x = {0}; + + auto readerTask = [&latch, &lock, &finishedCount, &x] () { + latch.arrive_and_wait(); + while (true) { + { + auto guard = ReaderGuard(lock); + // do some stuff + for (ui32 i = 0; i < 10'000u; ++i) { + x.fetch_add(i); + } + } + if (finishedCount.fetch_add(1) > 20'000) { + break; + } + } + }; + + auto readerPool = CreateThreadPool(readerThreads); + for (int i = 0; i < readerThreads; ++i) { + readerPool->SafeAddFunc(readerTask); + } + + latch.arrive_and_wait(); + while (finishedCount.load() == 0); + auto guard = WriterGuard(lock); + EXPECT_LE(finishedCount.load(), 1'000u); + DoNotOptimizeAway(x); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NConcurrency diff --git a/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp new file mode 100644 index 0000000000..26e58fff74 --- /dev/null +++ b/library/cpp/yt/threading/unittests/spin_lock_fork_ut.cpp @@ -0,0 +1,160 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yt/threading/rw_spin_lock.h> +#include <library/cpp/yt/threading/fork_aware_spin_lock.h> + +#include <util/thread/pool.h> + +#include <sys/wait.h> + +namespace NYT::NThreading { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TReaderWriterSpinLockTest, ForkFriendlyness) +{ + std::atomic<bool> stopped = {false}; + YT_DECLARE_SPIN_LOCK(TReaderWriterSpinLock, lock); + + auto readerTask = [&lock, &stopped] () { + while (!stopped.load()) { + ForkFriendlyReaderGuard(lock); + } + }; + + auto tryReaderTask = [&lock, &stopped] () { + while (!stopped.load()) { + // NB(pavook): TryAcquire instead of Acquire to minimize checks. + bool acquired = lock.TryAcquireReaderForkFriendly(); + if (acquired) { + lock.ReleaseReader(); + } + } + }; + + auto tryWriterTask = [&lock, &stopped] () { + while (!stopped.load()) { + Sleep(TDuration::MicroSeconds(1)); + bool acquired = lock.TryAcquireWriter(); + if (acquired) { + lock.ReleaseWriter(); + } + } + }; + + auto writerTask = [&lock, &stopped] () { + while (!stopped.load()) { + Sleep(TDuration::MicroSeconds(1)); + WriterGuard(lock); + } + }; + + int readerCount = 20; + int writerCount = 10; + + auto reader = CreateThreadPool(readerCount); + auto writer = CreateThreadPool(writerCount); + + for (int i = 0; i < readerCount / 2; ++i) { + reader->SafeAddFunc(readerTask); + reader->SafeAddFunc(tryReaderTask); + } + for (int i = 0; i < writerCount / 2; ++i) { + writer->SafeAddFunc(writerTask); + writer->SafeAddFunc(tryWriterTask); + } + + // And let the chaos begin! + int forkCount = 2000; + for (int iter = 1; iter <= forkCount; ++iter) { + pid_t pid; + { + auto guard = WriterGuard(lock); + pid = fork(); + } + + YT_VERIFY(pid >= 0); + + // NB(pavook): check different orders to maximize chaos. + if (iter % 2 == 0) { + ReaderGuard(lock); + } + WriterGuard(lock); + ReaderGuard(lock); + if (pid == 0) { + // NB(pavook): thread pools are no longer with us. + _exit(0); + } + } + + for (int i = 1; i <= forkCount; ++i) { + int status; + YT_VERIFY(waitpid(0, &status, 0) > 0); + YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0); + } + + stopped.store(true); +} + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TForkAwareSpinLockTest, ForkSafety) +{ + std::atomic<bool> stopped = {false}; + YT_DECLARE_SPIN_LOCK(TForkAwareSpinLock, lock); + + auto acquireTask = [&lock, &stopped] () { + while (!stopped.load()) { + Guard(lock); + } + }; + + // NB(pavook): TryAcquire instead of Acquire to minimize checks. + auto tryAcquireTask = [&lock, &stopped] () { + while (!stopped.load()) { + bool acquired = lock.TryAcquire(); + if (acquired) { + lock.Release(); + } + } + }; + + int workerCount = 20; + + auto worker = CreateThreadPool(workerCount); + + for (int i = 0; i < workerCount / 2; ++i) { + worker->SafeAddFunc(acquireTask); + worker->SafeAddFunc(tryAcquireTask); + } + + // And let the chaos begin! + int forkCount = 2000; + for (int iter = 1; iter <= forkCount; ++iter) { + pid_t pid = fork(); + + YT_VERIFY(pid >= 0); + + Guard(lock); + Guard(lock); + + if (pid == 0) { + // NB(pavook): thread pools are no longer with us. + _exit(0); + } + } + + for (int i = 1; i <= forkCount; ++i) { + int status; + YT_VERIFY(waitpid(0, &status, 0) > 0); + YT_VERIFY(WIFEXITED(status) && WEXITSTATUS(status) == 0); + } + + stopped.store(true); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NConcurrency diff --git a/library/cpp/yt/threading/unittests/ya.make b/library/cpp/yt/threading/unittests/ya.make index ef9b5d2995..da006012c0 100644 --- a/library/cpp/yt/threading/unittests/ya.make +++ b/library/cpp/yt/threading/unittests/ya.make @@ -5,9 +5,14 @@ INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc) SRCS( count_down_latch_ut.cpp recursive_spin_lock_ut.cpp + rw_spin_lock_ut.cpp spin_wait_ut.cpp ) +IF (NOT OS_WINDOWS) + SRC(spin_lock_fork_ut.cpp) +ENDIF() + PEERDIR( library/cpp/yt/assert library/cpp/yt/threading diff --git a/library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h b/library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h new file mode 100644 index 0000000000..cf8bde715c --- /dev/null +++ b/library/cpp/yt/threading/writer_starving_rw_spin_lock-inl.h @@ -0,0 +1,101 @@ +#pragma once +#ifndef WRITER_STARVING_RW_SPIN_LOCK_INL_H_ +#error "Direct inclusion of this file is not allowed, include rw_spin_lock.h" +// For the sake of sane code completion. +#include "writer_starving_rw_spin_lock.h" +#endif +#undef WRITER_STARVING_RW_SPIN_LOCK_INL_H_ + +#include "spin_wait.h" + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +inline void TWriterStarvingRWSpinLock::AcquireReader() noexcept +{ + if (TryAcquireReader()) { + return; + } + AcquireReaderSlow(); +} + +inline void TWriterStarvingRWSpinLock::ReleaseReader() noexcept +{ + auto prevValue = Value_.fetch_sub(ReaderDelta, std::memory_order::release); + Y_ASSERT((prevValue & ~WriterMask) != 0); + NDetail::RecordSpinLockReleased(); +} + +inline void TWriterStarvingRWSpinLock::AcquireWriter() noexcept +{ + if (TryAcquireWriter()) { + return; + } + AcquireWriterSlow(); +} + +inline void TWriterStarvingRWSpinLock::ReleaseWriter() noexcept +{ + auto prevValue = Value_.fetch_and(~WriterMask, std::memory_order::release); + Y_ASSERT(prevValue & WriterMask); + NDetail::RecordSpinLockReleased(); +} + +inline bool TWriterStarvingRWSpinLock::IsLocked() const noexcept +{ + return Value_.load() != UnlockedValue; +} + +inline bool TWriterStarvingRWSpinLock::IsLockedByReader() const noexcept +{ + return Value_.load() >= ReaderDelta; +} + +inline bool TWriterStarvingRWSpinLock::IsLockedByWriter() const noexcept +{ + return (Value_.load() & WriterMask) != 0; +} + +inline bool TWriterStarvingRWSpinLock::TryAcquireReader() noexcept +{ + auto oldValue = Value_.fetch_add(ReaderDelta, std::memory_order::acquire); + if ((oldValue & WriterMask) != 0) { + Value_.fetch_sub(ReaderDelta, std::memory_order::relaxed); + return false; + } + NDetail::RecordSpinLockAcquired(); + return true; +} + +inline bool TWriterStarvingRWSpinLock::TryAndTryAcquireReader() noexcept +{ + auto oldValue = Value_.load(std::memory_order::relaxed); + if ((oldValue & WriterMask) != 0) { + return false; + } + return TryAcquireReader(); +} + +inline bool TWriterStarvingRWSpinLock::TryAcquireWriter() noexcept +{ + auto expected = UnlockedValue; + + bool acquired = Value_.compare_exchange_weak(expected, WriterMask, std::memory_order::acquire); + NDetail::RecordSpinLockAcquired(acquired); + return acquired; +} + +inline bool TWriterStarvingRWSpinLock::TryAndTryAcquireWriter() noexcept +{ + auto oldValue = Value_.load(std::memory_order::relaxed); + if (oldValue != UnlockedValue) { + return false; + } + return TryAcquireWriter(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading + diff --git a/library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp b/library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp new file mode 100644 index 0000000000..74c9f59db1 --- /dev/null +++ b/library/cpp/yt/threading/writer_starving_rw_spin_lock.cpp @@ -0,0 +1,25 @@ +#include "writer_starving_rw_spin_lock.h" + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +void TWriterStarvingRWSpinLock::AcquireReaderSlow() noexcept +{ + TSpinWait spinWait(Location_, ESpinLockActivityKind::Read); + while (!TryAndTryAcquireReader()) { + spinWait.Wait(); + } +} + +void TWriterStarvingRWSpinLock::AcquireWriterSlow() noexcept +{ + TSpinWait spinWait(Location_, ESpinLockActivityKind::Write); + while (!TryAndTryAcquireWriter()) { + spinWait.Wait(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading diff --git a/library/cpp/yt/threading/writer_starving_rw_spin_lock.h b/library/cpp/yt/threading/writer_starving_rw_spin_lock.h new file mode 100644 index 0000000000..8a456afe21 --- /dev/null +++ b/library/cpp/yt/threading/writer_starving_rw_spin_lock.h @@ -0,0 +1,115 @@ +#pragma once + +#include "public.h" +#include "rw_spin_lock.h" +#include "spin_lock_base.h" +#include "spin_lock_count.h" + +#include <library/cpp/yt/memory/public.h> + +#include <util/system/rwlock.h> + +#include <atomic> + +namespace NYT::NThreading { + +//////////////////////////////////////////////////////////////////////////////// + +// TODO(pavook): deprecate it. + +//! Single-writer multiple-readers spin lock. +/*! + * Reader-side calls are pretty cheap. + * WARNING: The lock is unfair, and readers can starve writers. See rw_spin_lock.h for a writer-prioritized lock. + * WARNING: Never use the bare lock if forks are possible: see fork_aware_rw_spin_lock.h for a fork-safe lock. + * Unlike rw_spin_lock.h, reader-side is reentrant here: it is possible to acquire the **reader** lock multiple times + * even in the single thread. + * This doesn't mean you should do it: in fact, you shouldn't: use separate locks for separate entities. + * If you see this class in your code, try migrating to the proper rw_spin_lock.h after ensuring you don't rely on + * reentrant locking. + */ +class TWriterStarvingRWSpinLock + : public TSpinLockBase +{ +public: + using TSpinLockBase::TSpinLockBase; + + //! Acquires the reader lock. + /*! + * Optimized for the case of read-intensive workloads. + * Cheap (just one atomic increment and no spinning if no writers are present). + * Don't use this call if forks are possible: forking at some + * intermediate point inside #AcquireReader may corrupt the lock state and + * leave lock forever stuck for the child process. + */ + void AcquireReader() noexcept; + //! Tries acquiring the reader lock; see #AcquireReader. + //! Returns |true| on success. + bool TryAcquireReader() noexcept; + //! Releases the reader lock. + /*! + * Cheap (just one atomic decrement). + */ + void ReleaseReader() noexcept; + + //! Acquires the writer lock. + /*! + * Rather cheap (just one CAS). + */ + void AcquireWriter() noexcept; + //! Tries acquiring the writer lock; see #AcquireWriter. + //! Returns |true| on success. + bool TryAcquireWriter() noexcept; + //! Releases the writer lock. + /*! + * Cheap (just one atomic store). + */ + void ReleaseWriter() noexcept; + + //! Returns true if the lock is taken (either by a reader or writer). + /*! + * This is inherently racy. + * Only use for debugging and diagnostic purposes. + */ + bool IsLocked() const noexcept; + + //! Returns true if the lock is taken by reader. + /*! + * This is inherently racy. + * Only use for debugging and diagnostic purposes. + */ + bool IsLockedByReader() const noexcept; + + //! Returns true if the lock is taken by writer. + /*! + * This is inherently racy. + * Only use for debugging and diagnostic purposes. + */ + bool IsLockedByWriter() const noexcept; + +private: + using TValue = ui32; + static constexpr TValue UnlockedValue = 0; + static constexpr TValue WriterMask = 1; + static constexpr TValue ReaderDelta = 2; + + std::atomic<TValue> Value_ = UnlockedValue; + + + bool TryAndTryAcquireReader() noexcept; + bool TryAndTryAcquireWriter() noexcept; + + void AcquireReaderSlow() noexcept; + void AcquireWriterSlow() noexcept; +}; + +REGISTER_TRACKED_SPIN_LOCK_CLASS(TWriterStarvingRWSpinLock) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NThreading + +#define WRITER_STARVING_RW_SPIN_LOCK_INL_H_ +#include "writer_starving_rw_spin_lock-inl.h" +#undef WRITER_STARVING_RW_SPIN_LOCK_INL_H_ + diff --git a/library/cpp/yt/threading/ya.make b/library/cpp/yt/threading/ya.make index cc11e7974e..d25f0a7068 100644 --- a/library/cpp/yt/threading/ya.make +++ b/library/cpp/yt/threading/ya.make @@ -18,6 +18,7 @@ SRCS( spin_lock.cpp spin_wait.cpp spin_wait_hook.cpp + writer_starving_rw_spin_lock.cpp ) PEERDIR( |