From e6cbfdef0d1ac4e826362c7af965e1c4176f74c0 Mon Sep 17 00:00:00 2001 From: maxpogrebnyak Date: Wed, 24 Jun 2026 07:59:35 +0300 Subject: Add zstd codec on hamster MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Добавлена поддержка HTTP-сжатия zstd в apphost http_adapter и включен эксперимент для WEB@hamster. Для этого зарегистрирован кодек zstd в общей HTTP-библиотеке, добавлен отдельный hamster-конфиг с приоритетом zstd. commit_hash:b959243e6a6508f93bd9920c2cc445fa012c8247 --- library/cpp/http/io/compression.cpp | 2 ++ library/cpp/http/io/ya.make | 1 + 2 files changed, 3 insertions(+) (limited to 'library/cpp') diff --git a/library/cpp/http/io/compression.cpp b/library/cpp/http/io/compression.cpp index 8fa1f62ae69..c68a4e976b6 100644 --- a/library/cpp/http/io/compression.cpp +++ b/library/cpp/http/io/compression.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -22,6 +23,7 @@ TCompressionCodecFactory::TCompressionCodecFactory() { Add("gzip", gzip, [](auto s) { return MakeHolder(s, ZLib::GZip); }); Add("deflate", gzip, [](auto s) { return MakeHolder(s, ZLib::ZLib); }); Add("br", [](auto s) { return MakeHolder(s); }, [](auto s) { return MakeHolder(s, 4); }); + Add("zstd", [](auto s) { return MakeHolder(s); }, [](auto s) { return MakeHolder(s, 3); }); Add("x-gzip", gzip, [](auto s) { return MakeHolder(s, ZLib::GZip); }); Add("x-deflate", gzip, [](auto s) { return MakeHolder(s, ZLib::ZLib); }); diff --git a/library/cpp/http/io/ya.make b/library/cpp/http/io/ya.make index 873e2f34e34..7ca76f44d92 100644 --- a/library/cpp/http/io/ya.make +++ b/library/cpp/http/io/ya.make @@ -13,6 +13,7 @@ PEERDIR( library/cpp/streams/brotli library/cpp/streams/bzip2 library/cpp/streams/lzma + library/cpp/streams/zstd ) SRCS( -- cgit v1.3 From 7493b7938546308fc8ead81e411e05c73b1ce9e4 Mon Sep 17 00:00:00 2001 From: babenko Date: Wed, 24 Jun 2026 11:20:18 +0300 Subject: Add ExactRefCountedCast and TRef::Contains Some handy helpers. `ExactRefCountedCast(p)`: exact-type downcast for `New()`-allocated objects. `New(`) builds a final `TRefCountedWrapper`, so this casts to the wrapper and upcasts back to `T*`; being final, the `dynamic_cast` lowers to a single `type_info` compare (~2ns vs ~20ns for the is-a path). `TRef::Contains(other)`: true iff other's range lies within this range. commit_hash:138e8719b8ecbd953437b81380e54f736db029ef --- library/cpp/yt/memory/exact_ref_counted_cast-inl.h | 46 ++++++++++ library/cpp/yt/memory/exact_ref_counted_cast.h | 36 ++++++++ library/cpp/yt/memory/ref-inl.h | 5 + library/cpp/yt/memory/ref.h | 3 + .../memory/unittests/exact_ref_counted_cast_ut.cpp | 102 +++++++++++++++++++++ library/cpp/yt/memory/unittests/ya.make | 1 + library/cpp/yt/memory/ya.make | 6 ++ 7 files changed, 199 insertions(+) create mode 100644 library/cpp/yt/memory/exact_ref_counted_cast-inl.h create mode 100644 library/cpp/yt/memory/exact_ref_counted_cast.h create mode 100644 library/cpp/yt/memory/unittests/exact_ref_counted_cast_ut.cpp (limited to 'library/cpp') diff --git a/library/cpp/yt/memory/exact_ref_counted_cast-inl.h b/library/cpp/yt/memory/exact_ref_counted_cast-inl.h new file mode 100644 index 00000000000..71163e26e0a --- /dev/null +++ b/library/cpp/yt/memory/exact_ref_counted_cast-inl.h @@ -0,0 +1,46 @@ +#ifndef EXACT_REF_COUNTED_CAST_INL_H_ +#error "Direct inclusion of this file is not allowed, include exact_ref_counted_cast.h" +// For the sake of sane code completion. +#include "exact_ref_counted_cast.h" +#endif + +#include "new.h" + +#include + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template +std::conditional_t, const T, T>* ExactRefCountedCast(U* ptr) noexcept +{ + static_assert(std::is_polymorphic_v, + "ExactRefCountedCast requires a polymorphic operand type"); + static_assert(std::is_base_of_v, + "ExactRefCountedCast target must derive from TRefCountedBase"); + static_assert(std::is_final_v>, + "TRefCountedWrapper must stay final for this cast to remain O(1)"); + + using TWrapper = std::conditional_t, const TRefCountedWrapper, TRefCountedWrapper>; + // Wrapper is final, so dynamic_cast lowers to one type_info compare. + return dynamic_cast(ptr); +} + +template +std::conditional_t, const T, T>& ExactRefCountedCast(U& ref) +{ + static_assert(std::is_polymorphic_v, + "ExactRefCountedCast requires a polymorphic operand type"); + static_assert(std::is_base_of_v, + "ExactRefCountedCast target must derive from TRefCountedBase"); + static_assert(std::is_final_v>, + "TRefCountedWrapper must stay final for this cast to remain O(1)"); + + using TWrapper = std::conditional_t, const TRefCountedWrapper, TRefCountedWrapper>; + return dynamic_cast(ref); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/library/cpp/yt/memory/exact_ref_counted_cast.h b/library/cpp/yt/memory/exact_ref_counted_cast.h new file mode 100644 index 00000000000..3b186103f3a --- /dev/null +++ b/library/cpp/yt/memory/exact_ref_counted_cast.h @@ -0,0 +1,36 @@ +#pragma once + +#include "ref_counted.h" + +#include + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +//! A downcast that succeeds iff the object was allocated as *exactly* |New()| +//! (not a subclass, not some other allocation path). +/*! + * |New()| does not construct a |T| but a final |TRefCountedWrapper| + * deriving from |T|, so |T| is never a leaf and |dynamic_cast| takes the + * slow is-a path. This casts to the wrapper instead and upcasts back to |T*|; + * since the wrapper is final, dynamic_cast lowers to one type_info compare -- + * O(1), as cheap on a miss as on a hit. + * + * Pointer form returns |nullptr| on a miss (or null operand); reference form + * throws |std::bad_cast|. Constness is preserved. |T| must derive from + * |TRefCountedBase| and |U| must be polymorphic. Requires RTTI. + */ +template +[[nodiscard]] std::conditional_t, const T, T>* ExactRefCountedCast(U* ptr) noexcept; + +template +[[nodiscard]] std::conditional_t, const T, T>& ExactRefCountedCast(U& ref); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT + +#define EXACT_REF_COUNTED_CAST_INL_H_ +#include "exact_ref_counted_cast-inl.h" +#undef EXACT_REF_COUNTED_CAST_INL_H_ diff --git a/library/cpp/yt/memory/ref-inl.h b/library/cpp/yt/memory/ref-inl.h index 632a98f130d..5c6dc11e0a2 100644 --- a/library/cpp/yt/memory/ref-inl.h +++ b/library/cpp/yt/memory/ref-inl.h @@ -65,6 +65,11 @@ Y_FORCE_INLINE TRef TRef::Slice(size_t startOffset, size_t endOffset) const return TRef(Begin() + startOffset, endOffset - startOffset); } +Y_FORCE_INLINE bool TRef::Contains(TRef other) const +{ + return other.Begin() >= Begin() && other.End() <= End(); +} + //////////////////////////////////////////////////////////////////////////////// Y_FORCE_INLINE TMutableRef::TMutableRef(void* data, size_t size) diff --git a/library/cpp/yt/memory/ref.h b/library/cpp/yt/memory/ref.h index a6c6c4f2260..1fc451f50aa 100644 --- a/library/cpp/yt/memory/ref.h +++ b/library/cpp/yt/memory/ref.h @@ -54,6 +54,9 @@ public: //! Creates a TRef for a part of existing range. TRef Slice(size_t startOffset, size_t endOffset) const; + //! Returns |true| if #other's range lies entirely within this range. + bool Contains(TRef other) const; + //! Compares the content for bitwise equality. static bool AreBitwiseEqual(TRef lhs, TRef rhs); }; diff --git a/library/cpp/yt/memory/unittests/exact_ref_counted_cast_ut.cpp b/library/cpp/yt/memory/unittests/exact_ref_counted_cast_ut.cpp new file mode 100644 index 00000000000..75cce328dc0 --- /dev/null +++ b/library/cpp/yt/memory/unittests/exact_ref_counted_cast_ut.cpp @@ -0,0 +1,102 @@ +#include + +#include +#include +#include + +#include +#include + +namespace NYT { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +struct TBase + : public TRefCounted +{ }; + +struct TDerived + : public TBase +{ + int Tag = 42; +}; + +struct TMoreDerived + : public TDerived +{ }; + +//////////////////////////////////////////////////////////////////////////////// +// Pointer form. + +TEST(TExactRefCountedCastTest, ExactMatch) +{ + auto object = New(); + TBase* base = object.Get(); + EXPECT_EQ(ExactRefCountedCast(base), object.Get()); + EXPECT_EQ(ExactRefCountedCast(base)->Tag, 42); +} + +TEST(TExactRefCountedCastTest, SubclassDoesNotMatch) +{ + auto object = New(); + TBase* base = object.Get(); + // Object was New(); an exact probe for TDerived must miss. + EXPECT_EQ(ExactRefCountedCast(base), nullptr); + // ...but the exact type matches. + EXPECT_EQ(ExactRefCountedCast(base), object.Get()); +} + +TEST(TExactRefCountedCastTest, BaseDoesNotMatch) +{ + auto object = New(); + TBase* base = object.Get(); + // It is exactly a TDerived, not a TBase. + EXPECT_EQ(ExactRefCountedCast(base), nullptr); +} + +TEST(TExactRefCountedCastTest, Null) +{ + TBase* base = nullptr; + EXPECT_EQ(ExactRefCountedCast(base), nullptr); +} + +TEST(TExactRefCountedCastTest, ConstnessPreserved) +{ + auto object = New(); + const TBase* base = object.Get(); + auto* result = ExactRefCountedCast(base); + static_assert(std::is_same_v); + EXPECT_EQ(result, object.Get()); +} + +//////////////////////////////////////////////////////////////////////////////// +// Reference form. + +TEST(TExactRefCountedCastTest, RefExactMatch) +{ + auto object = New(); + TBase& base = *object; + EXPECT_EQ(&ExactRefCountedCast(base), object.Get()); +} + +TEST(TExactRefCountedCastTest, RefSubclassThrows) +{ + auto object = New(); + TBase& base = *object; + EXPECT_THROW((void)ExactRefCountedCast(base), std::bad_cast); + EXPECT_EQ(&ExactRefCountedCast(base), object.Get()); +} + +TEST(TExactRefCountedCastTest, RefConstnessPreserved) +{ + auto object = New(); + const TBase& base = *object; + static_assert(std::is_same_v(base)), const TDerived&>); + EXPECT_EQ(&ExactRefCountedCast(base), object.Get()); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT diff --git a/library/cpp/yt/memory/unittests/ya.make b/library/cpp/yt/memory/unittests/ya.make index c3abcbefbe5..c545bc0d41e 100644 --- a/library/cpp/yt/memory/unittests/ya.make +++ b/library/cpp/yt/memory/unittests/ya.make @@ -9,6 +9,7 @@ SRCS( chunked_memory_pool_allocator_ut.cpp chunked_memory_pool_output_ut.cpp erased_storage_ut.cpp + exact_ref_counted_cast_ut.cpp free_list_ut.cpp function_view_ut.cpp intrusive_ptr_ut.cpp diff --git a/library/cpp/yt/memory/ya.make b/library/cpp/yt/memory/ya.make index 37b05048b76..de4cc766061 100644 --- a/library/cpp/yt/memory/ya.make +++ b/library/cpp/yt/memory/ya.make @@ -47,6 +47,12 @@ CHECK_DEPENDENT_DIRS( END() +IF (NOT OPENSOURCE) + RECURSE( + benchmark + ) +ENDIF() + RECURSE_FOR_TESTS( unittests ) -- cgit v1.3 From a224436a8d395602cd14b8e5aa3cecb466ba126f Mon Sep 17 00:00:00 2001 From: babenko Date: Wed, 24 Jun 2026 13:14:54 +0300 Subject: YT-28458: Add rseq-backed hot sensors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename `percpu.{cpp,h}` to `per_cpu.{cpp,h}` and update all includers. Add an rseq-backed implementation of the hot per-CPU counter, time counter and gauge (`rseq_sensor_impl.{h,cpp}`): each update commits to the calling CPU's shard lock-free via an rseq critical section (`library/cpp/yt/rseq` `AddPerCpu`/`StorePerCpu`) — no atomic and no lock on the fast path. The shard array is sized to `NRseq::GetCpuCount()` and folded into the sensor allocation (`NewWithExtraSpace`); reads aggregate with `LoadPerCpu`. The gauge keeps last-writer-wins semantics. Linux-only. The existing `TPerCpu{Counter,TimeCounter,Gauge}` stay (`per_cpu_sensor_impl.{h,cpp}`) as the atomic sharded fallback. The two are interchangeable and chosen per sensor at construction in `TSolomonRegistry`, so the hot Increment/Update path carries no dispatch: * The rseq fast path is **off by default**; opt in via `singletons/solomon_registry/enable_rseq`. Even when on, a hot sensor uses it only in a process where the kernel rseq area sits at a fixed thread-pointer offset (tcmalloc/glibc-owned), per the rseq library's runtime safety probe (`NRseq::IsPerCpuFastPathSafe`). Everything else — notably a `dlopen`'d YQL UDF whose `__rseq_abi` lands in dynamically allocated TLS — uses the atomic sharded sensors. * `TSolomonRegistry` is a reconfigurable singleton (`solomon_registry`) with an `enable_rseq` knob (default false), settable in static config and updatable via dynamic config. Off Linux the rseq sensors do not exist and the registry uses the atomic sharded sensors for hot requests. The per-CPU summary is unchanged (TTscp + spinlock). Unit tests cover the atomic sensors, the rseq sensors (Linux-only), and the simple sensors. The controller-agent memory-watchdog integration tests are made resilient to the (core-count-dependent) per-sensor footprint. Benchmark (hot per-CPU path, 64-core host), atomic sharded vs rseq: | Benchmark | atomic | rseq | speedup | | --- | --- | --- | --- | | BM_PerCpuCounter, threads:1 | 30.7 ns | 3.6 ns | ~8.5x | | BM_PerCpuCounter, threads:16 | 30.9 ns | 4.4 ns | ~7x | | BM_PerCpuGauge, threads:1 | 32.8 ns | 12.4 ns | ~2.6x | | BM_PerCpuGauge, threads:16 | 32.7 ns | 12.5 ns | ~2.5x | commit_hash:8c633d31f03b2cc862ed2217ae08342bf42adc52 --- library/cpp/yt/rseq/rseq.cpp | 8 +- library/cpp/yt/rseq/rseq.h | 20 +- library/cpp/yt/rseq/unittests/per_cpu_ut.cpp | 7 +- yt/yt/core/rpc/overload_controller.cpp | 2 +- yt/yt/library/profiling/per_cpu_sensor_impl.cpp | 134 +++++++++++ yt/yt/library/profiling/per_cpu_sensor_impl.h | 126 ++++++++++ yt/yt/library/profiling/percpu.cpp | 134 ----------- yt/yt/library/profiling/percpu.h | 120 --------- yt/yt/library/profiling/rseq_sensor_impl-inl.h | 48 ++++ yt/yt/library/profiling/rseq_sensor_impl.cpp | 88 +++++++ yt/yt/library/profiling/rseq_sensor_impl.h | 138 +++++++++++ yt/yt/library/profiling/sensor_impl.cpp | 268 --------------------- yt/yt/library/profiling/sensor_impl.h | 135 ----------- yt/yt/library/profiling/simple_sensor_impl.cpp | 268 +++++++++++++++++++++ yt/yt/library/profiling/simple_sensor_impl.h | 135 +++++++++++ yt/yt/library/profiling/solomon/config.cpp | 26 ++ yt/yt/library/profiling/solomon/config.h | 33 +++ .../solomon/configure_solomon_registry.cpp | 41 ++++ yt/yt/library/profiling/solomon/helpers.cpp | 15 +- yt/yt/library/profiling/solomon/public.h | 2 + yt/yt/library/profiling/solomon/registry.cpp | 94 ++++++-- yt/yt/library/profiling/solomon/registry.h | 7 + yt/yt/library/profiling/solomon/sensor_set.h | 2 +- yt/yt/library/profiling/solomon/ya.make | 1 + .../profiling/unittests/per_cpu_sensor_impl_ut.cpp | 71 ++++++ .../profiling/unittests/rseq_sensor_impl_ut.cpp | 73 ++++++ .../profiling/unittests/simple_sensor_impl_ut.cpp | 72 ++++++ yt/yt/library/profiling/unittests/ya.make | 9 + yt/yt/library/profiling/ya.make | 14 +- 29 files changed, 1405 insertions(+), 686 deletions(-) create mode 100644 yt/yt/library/profiling/per_cpu_sensor_impl.cpp create mode 100644 yt/yt/library/profiling/per_cpu_sensor_impl.h delete mode 100644 yt/yt/library/profiling/percpu.cpp delete mode 100644 yt/yt/library/profiling/percpu.h create mode 100644 yt/yt/library/profiling/rseq_sensor_impl-inl.h create mode 100644 yt/yt/library/profiling/rseq_sensor_impl.cpp create mode 100644 yt/yt/library/profiling/rseq_sensor_impl.h delete mode 100644 yt/yt/library/profiling/sensor_impl.cpp delete mode 100644 yt/yt/library/profiling/sensor_impl.h create mode 100644 yt/yt/library/profiling/simple_sensor_impl.cpp create mode 100644 yt/yt/library/profiling/simple_sensor_impl.h create mode 100644 yt/yt/library/profiling/solomon/configure_solomon_registry.cpp create mode 100644 yt/yt/library/profiling/unittests/per_cpu_sensor_impl_ut.cpp create mode 100644 yt/yt/library/profiling/unittests/rseq_sensor_impl_ut.cpp create mode 100644 yt/yt/library/profiling/unittests/simple_sensor_impl_ut.cpp (limited to 'library/cpp') diff --git a/library/cpp/yt/rseq/rseq.cpp b/library/cpp/yt/rseq/rseq.cpp index f8321cbb0bf..12b5a291d8d 100644 --- a/library/cpp/yt/rseq/rseq.cpp +++ b/library/cpp/yt/rseq/rseq.cpp @@ -133,7 +133,7 @@ YT_STATIC_INITIALIZER({ // the static TLS block, incl. tcmalloc) rather than a dlopen'd module's dynamically allocated // TLS, where the offset is valid only on the thread that computed it. Compares addresses // without dereferencing the suspect offset, so it is safe even when the offset is bogus. See -// IsPerCpuFastPathSafe. +// IsPerCpuFastPathSupported. YT_PREVENT_TLS_CACHING bool ValidateFastPathOnFreshThread() { if (OwnsRegistration) { @@ -177,7 +177,7 @@ YT_PREVENT_TLS_CACHING bool EnsureCurrentThreadRegistered() return ReadField(CpuIdFieldOffset) >= 0; } -bool IsPerCpuFastPathSafe() +bool IsPerCpuFastPathSupported() { // Decided once, lazily, on a freshly spawned thread (the check is meaningful only off the // thread that computed the offset) and cached -- cost is one thread spawn at first use. @@ -205,14 +205,12 @@ bool IsPerCpuFastPathSafe() #else // YT_RSEQ_AVAILABLE -#include "per_cpu.h" - namespace NYT::NRseq { //////////////////////////////////////////////////////////////////////////////// // No rseq fast path on this platform; hot sensors use the atomic fallback. -bool IsPerCpuFastPathSafe() +bool IsPerCpuFastPathSupported() { return false; } diff --git a/library/cpp/yt/rseq/rseq.h b/library/cpp/yt/rseq/rseq.h index cfea5101c29..b821e34047a 100644 --- a/library/cpp/yt/rseq/rseq.h +++ b/library/cpp/yt/rseq/rseq.h @@ -6,6 +6,24 @@ #define YT_RSEQ_AVAILABLE #endif +namespace NYT::NRseq { + +//////////////////////////////////////////////////////////////////////////////// + +//! Returns whether the per-CPU rseq fast path is safe to use in this process. +/*! + * The fast path reads the rseq area at a thread-pointer offset cached at startup, which is + * sound only when __rseq_abi sits at a fixed offset from the thread pointer (a glibc-owned + * area or the static TLS block, incl. tcmalloc) -- not when it lands in a dlopen'd module's + * dynamically allocated TLS. Returns false there (and where there is no fast path) so callers + * fall back to atomics. Decided once on a spawned thread and cached: one spawn at first use. + */ +bool IsPerCpuFastPathSupported(); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRseq + #ifdef YT_RSEQ_AVAILABLE #include @@ -16,7 +34,7 @@ namespace NYT::NRseq { //! Byte offset from the thread pointer to the rseq area's cpu_id field (glibc's area when //! glibc registers rseq, otherwise our own). A fixed offset across threads only when the area -//! is glibc-owned or in the static TLS block; #IsPerCpuFastPathSafe probes this and gates the +//! is glibc-owned or in the static TLS block; #IsPerCpuFastPathSupported probes this and gates the //! fast path. NB: 0 until our startup initializer runs, but nothing reads rseq before then. extern std::ptrdiff_t CpuIdFieldOffset; diff --git a/library/cpp/yt/rseq/unittests/per_cpu_ut.cpp b/library/cpp/yt/rseq/unittests/per_cpu_ut.cpp index f742c8589de..27588aee75e 100644 --- a/library/cpp/yt/rseq/unittests/per_cpu_ut.cpp +++ b/library/cpp/yt/rseq/unittests/per_cpu_ut.cpp @@ -1,6 +1,7 @@ #include #include +#include #include @@ -58,12 +59,12 @@ TEST(TPerCpuRseqTest, CpuCountIsSane) EXPECT_LE(GetCpuCount(), 1 << 20); } -TEST(TPerCpuRseqTest, FastPathSafetyIsStable) +TEST(TPerCpuRseqTest, FastPathSupportIsStable) { // The probe spawns a thread on first use and caches its verdict, so repeated calls must // agree. We avoid asserting a specific value: it depends on kernel rseq support. - bool safe = IsPerCpuFastPathSafe(); - EXPECT_EQ(safe, IsPerCpuFastPathSafe()); + bool supported = IsPerCpuFastPathSupported(); + EXPECT_EQ(supported, IsPerCpuFastPathSupported()); } TEST(TPerCpuRseqTest, ParsePossibleCpuCount) diff --git a/yt/yt/core/rpc/overload_controller.cpp b/yt/yt/core/rpc/overload_controller.cpp index 24faa7511ae..0ce55afcb9e 100644 --- a/yt/yt/core/rpc/overload_controller.cpp +++ b/yt/yt/core/rpc/overload_controller.cpp @@ -3,7 +3,7 @@ #include "config.h" #include "private.h" -#include +#include #include #include diff --git a/yt/yt/library/profiling/per_cpu_sensor_impl.cpp b/yt/yt/library/profiling/per_cpu_sensor_impl.cpp new file mode 100644 index 00000000000..dcdf47c0990 --- /dev/null +++ b/yt/yt/library/profiling/per_cpu_sensor_impl.cpp @@ -0,0 +1,134 @@ +#include "per_cpu_sensor_impl.h" + +#include "summary.h" + +#include + +namespace NYT::NProfiling { + +//////////////////////////////////////////////////////////////////////////////// + +void TPerCpuCounter::Increment(i64 delta) +{ + auto tscp = TTscp::Get(); + Shards_[tscp.ProcessorId].Value.fetch_add(delta, std::memory_order::relaxed); +} + +i64 TPerCpuCounter::GetValue() +{ + i64 total = 0; + for (const auto& shard : Shards_) { + total += shard.Value.load(); + } + return total; +} + +//////////////////////////////////////////////////////////////////////////////// + +void TPerCpuTimeCounter::Add(TDuration delta) +{ + auto tscp = TTscp::Get(); + Shards_[tscp.ProcessorId].Value.fetch_add(delta.GetValue(), std::memory_order::relaxed); +} + +TDuration TPerCpuTimeCounter::GetValue() +{ + TDuration total = TDuration::Zero(); + for (const auto& shard : Shards_) { + total += TDuration::FromValue(shard.Value.load()); + } + return total; +} + +//////////////////////////////////////////////////////////////////////////////// + +__int128 TPerCpuGauge::TWrite::Pack() +{ + static_assert(sizeof(TWrite) == 16); + + __int128 i; + memcpy(&i, this, 16); + return i; +} + +TPerCpuGauge::TWrite TPerCpuGauge::TWrite::Unpack(__int128 i) +{ + TWrite w; + memcpy(&w, &i, 16); + return w; +} + +void TPerCpuGauge::Update(double value) +{ + auto tscp = TTscp::Get(); + + TWrite write{value, tscp.Instant}; +#ifdef __clang__ + Shards_[tscp.ProcessorId].Value.store(write.Pack(), std::memory_order::relaxed); +#else + auto guard = Guard(Shards_[tscp.ProcessorId].Lock); + Shards_[tscp.ProcessorId].Value = write; +#endif +} + +double TPerCpuGauge::GetValue() +{ + double lastValue = 0.0; + TCpuInstant maxTimestamp = 0; + + for (const auto& shard : Shards_) { +#ifdef __clang__ + auto write = TWrite::Unpack(shard.Value.load()); +#else + auto guard = Guard(shard.Lock); + auto write = shard.Value; +#endif + + if (write.Timestamp > maxTimestamp) { + maxTimestamp = write.Timestamp; + lastValue = write.Value; + } + } + + return lastValue; +} + +//////////////////////////////////////////////////////////////////////////////// + +template +void TPerCpuSummary::Record(T value) +{ + auto tscp = TTscp::Get(); + auto guard = Guard(Shards_[tscp.ProcessorId].Lock); + Shards_[tscp.ProcessorId].Value.Record(value); +} + +template +TSummarySnapshot TPerCpuSummary::GetSummary() +{ + TSummarySnapshot value; + for (const auto& shard : Shards_) { + auto guard = Guard(shard.Lock); + value += shard.Value; + } + return value; +} + +template +TSummarySnapshot TPerCpuSummary::GetSummaryAndReset() +{ + TSummarySnapshot value; + for (auto& shard : Shards_) { + auto guard = Guard(shard.Lock); + value += shard.Value; + shard.Value = {}; + } + return value; +} + +template class TPerCpuSummary; +template class TPerCpuSummary; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/per_cpu_sensor_impl.h b/yt/yt/library/profiling/per_cpu_sensor_impl.h new file mode 100644 index 00000000000..16e2ab1ec3e --- /dev/null +++ b/yt/yt/library/profiling/per_cpu_sensor_impl.h @@ -0,0 +1,126 @@ +#pragma once + +#include "impl.h" +#include "summary.h" + +#include + +#include + +namespace NYT::NProfiling { + +//////////////////////////////////////////////////////////////////////////////// + +// The sharded sensors below index a fixed per-processor array by TTscp::Get() and update +// the shard with a plain atomic. They are the default hot implementation and the fallback +// everywhere the rseq fast path is not enabled; the rseq-backed counterparts live in +// yt/yt/library/profiling/rseq and are selected at construction time (see registry.cpp). + +class TPerCpuCounter + : public ICounter +{ +public: + void Increment(i64 delta) override; + + i64 GetValue() override; + +private: + struct alignas(CacheLineSize) TShard + { + std::atomic Value = 0; + }; + + std::array Shards_; +}; + +static_assert(sizeof(TPerCpuCounter) == 64 + 64 * 64); + +//////////////////////////////////////////////////////////////////////////////// + +class TPerCpuTimeCounter + : public ITimeCounter +{ +public: + void Add(TDuration delta) override; + + TDuration GetValue() override; + +private: + struct alignas(CacheLineSize) TShard + { + std::atomic Value = 0; + }; + + std::array Shards_; +}; + +static_assert(sizeof(TPerCpuTimeCounter) == 64 + 64 * 64); + +//////////////////////////////////////////////////////////////////////////////// + +class TPerCpuGauge + : public IGauge +{ +public: + void Update(double value) override; + + double GetValue() override; + +private: + struct TWrite + { + double Value; + TCpuInstant Timestamp; + + __int128 Pack(); + static TWrite Unpack(__int128 i); + }; + + struct alignas(CacheLineSize) TShard + { +#ifdef __clang__ + std::atomic<__int128> Value = 0; +#else + TSpinLock Lock; + TWrite Value; +#endif + }; + +#ifdef __clang__ + static_assert(std::atomic::is_always_lock_free); +#endif + + std::array Shards_; +}; + +// One cache line larger than the counters: IGauge inherits TRefCounted virtually. +static_assert(sizeof(TPerCpuGauge) == 2 * 64 + 64 * 64); + +//////////////////////////////////////////////////////////////////////////////// + +template +class TPerCpuSummary + : public ISummaryBase +{ +public: + void Record(T value) override; + + TSummarySnapshot GetSummary() override; + TSummarySnapshot GetSummaryAndReset() override; + +private: + struct alignas(CacheLineSize) TShard + { + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock); + TSummarySnapshot Value; + }; + + std::array Shards_; +}; + +DEFINE_REFCOUNTED_TYPE(TPerCpuSummary) +DEFINE_REFCOUNTED_TYPE(TPerCpuSummary) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/percpu.cpp b/yt/yt/library/profiling/percpu.cpp deleted file mode 100644 index 788f22bbea1..00000000000 --- a/yt/yt/library/profiling/percpu.cpp +++ /dev/null @@ -1,134 +0,0 @@ -#include "percpu.h" - -#include "summary.h" - -#include - -namespace NYT::NProfiling { - -//////////////////////////////////////////////////////////////////////////////// - -void TPerCpuCounter::Increment(i64 delta) -{ - auto tscp = TTscp::Get(); - Shards_[tscp.ProcessorId].Value.fetch_add(delta, std::memory_order::relaxed); -} - -i64 TPerCpuCounter::GetValue() -{ - i64 total = 0; - for (const auto& shard : Shards_) { - total += shard.Value.load(); - } - return total; -} - -//////////////////////////////////////////////////////////////////////////////// - -void TPerCpuTimeCounter::Add(TDuration delta) -{ - auto tscp = TTscp::Get(); - Shards_[tscp.ProcessorId].Value.fetch_add(delta.GetValue(), std::memory_order::relaxed); -} - -TDuration TPerCpuTimeCounter::GetValue() -{ - TDuration total = TDuration::Zero(); - for (const auto& shard : Shards_) { - total += TDuration::FromValue(shard.Value.load()); - } - return total; -} - -//////////////////////////////////////////////////////////////////////////////// - -__int128 TPerCpuGauge::TWrite::Pack() -{ - static_assert(sizeof(TWrite) == 16); - - __int128 i; - memcpy(&i, this, 16); - return i; -} - -TPerCpuGauge::TWrite TPerCpuGauge::TWrite::Unpack(__int128 i) -{ - TWrite w; - memcpy(&w, &i, 16); - return w; -} - -void TPerCpuGauge::Update(double value) -{ - auto tscp = TTscp::Get(); - - TWrite write{value, tscp.Instant}; -#ifdef __clang__ - Shards_[tscp.ProcessorId].Value.store(write.Pack(), std::memory_order::relaxed); -#else - auto guard = Guard(Shards_[tscp.ProcessorId].Lock); - Shards_[tscp.ProcessorId].Value = write; -#endif -} - -double TPerCpuGauge::GetValue() -{ - double lastValue = 0.0; - TCpuInstant maxTimestamp = 0; - - for (const auto& shard : Shards_) { -#ifdef __clang__ - auto write = TWrite::Unpack(shard.Value.load()); -#else - auto guard = Guard(shard.Lock); - auto write = shard.Value; -#endif - - if (write.Timestamp > maxTimestamp) { - maxTimestamp = write.Timestamp; - lastValue = write.Value; - } - } - - return lastValue; -} - -//////////////////////////////////////////////////////////////////////////////// - -template -void TPerCpuSummary::Record(T value) -{ - auto tscp = TTscp::Get(); - auto guard = Guard(Shards_[tscp.ProcessorId].Lock); - Shards_[tscp.ProcessorId].Value.Record(value); -} - -template -TSummarySnapshot TPerCpuSummary::GetSummary() -{ - TSummarySnapshot value; - for (const auto& shard : Shards_) { - auto guard = Guard(shard.Lock); - value += shard.Value; - } - return value; -} - -template -TSummarySnapshot TPerCpuSummary::GetSummaryAndReset() -{ - TSummarySnapshot value; - for (auto& shard : Shards_) { - auto guard = Guard(shard.Lock); - value += shard.Value; - shard.Value = {}; - } - return value; -} - -template class TPerCpuSummary; -template class TPerCpuSummary; - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/percpu.h b/yt/yt/library/profiling/percpu.h deleted file mode 100644 index 770f92c5ffa..00000000000 --- a/yt/yt/library/profiling/percpu.h +++ /dev/null @@ -1,120 +0,0 @@ -#pragma once - -#include "impl.h" -#include "summary.h" - -#include - -#include - -namespace NYT::NProfiling { - -//////////////////////////////////////////////////////////////////////////////// - -class TPerCpuCounter - : public ICounter -{ -public: - void Increment(i64 delta) override; - - i64 GetValue() override; - -private: - struct alignas(CacheLineSize) TShard - { - std::atomic Value = 0; - }; - - std::array Shards_; -}; - -static_assert(sizeof(TPerCpuCounter) == 64 + 64 * 64); - -//////////////////////////////////////////////////////////////////////////////// - -class TPerCpuTimeCounter - : public ITimeCounter -{ -public: - void Add(TDuration delta) override; - - TDuration GetValue() override; - -private: - struct alignas(CacheLineSize) TShard - { - std::atomic Value = 0; - }; - - std::array Shards_; -}; - -static_assert(sizeof(TPerCpuCounter) == 64 + 64 * 64); - -//////////////////////////////////////////////////////////////////////////////// - -class TPerCpuGauge - : public IGauge -{ -public: - void Update(double value) override; - - double GetValue() override; - -private: - struct TWrite - { - double Value; - TCpuInstant Timestamp; - - __int128 Pack(); - static TWrite Unpack(__int128 i); - }; - - struct alignas(CacheLineSize) TShard - { -#ifdef __clang__ - std::atomic<__int128> Value = 0; -#else - TSpinLock Lock; - TWrite Value; -#endif - }; - -#ifdef __clang__ - static_assert(std::atomic::is_always_lock_free); -#endif - - std::array Shards_; -}; - -static_assert(sizeof(TPerCpuCounter) == 64 + 64 * 64); - -//////////////////////////////////////////////////////////////////////////////// - -template -class TPerCpuSummary - : public ISummaryBase -{ -public: - void Record(T value) override; - - TSummarySnapshot GetSummary() override; - TSummarySnapshot GetSummaryAndReset() override; - -private: - struct alignas(CacheLineSize) TShard - { - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock); - TSummarySnapshot Value; - }; - - std::array Shards_; -}; - -DEFINE_REFCOUNTED_TYPE(TPerCpuSummary) -DEFINE_REFCOUNTED_TYPE(TPerCpuSummary) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/rseq_sensor_impl-inl.h b/yt/yt/library/profiling/rseq_sensor_impl-inl.h new file mode 100644 index 00000000000..167d35e0946 --- /dev/null +++ b/yt/yt/library/profiling/rseq_sensor_impl-inl.h @@ -0,0 +1,48 @@ +#ifndef RSEQ_SENSOR_IMPL_INL_H_ +#error "Direct inclusion of this file is not allowed, include rseq_sensor_impl.h" +// For the sake of sane code completion. +#include "rseq_sensor_impl.h" +#endif + +#include + +#include + +namespace NYT::NProfiling { + +//////////////////////////////////////////////////////////////////////////////// + +template +TIntrusivePtr TRseqCounterBase::Create() +{ + int shardCount = NRseq::GetCpuCount(); + return NewWithExtraSpace(shardCount * sizeof(TShard), shardCount); +} + +template +TRseqCounterBase::TRseqCounterBase(int shardCount) + : ShardCount_(shardCount) + , Shards_(static_cast(this->GetExtraSpacePtr())) +{ + for (int index = 0; index < ShardCount_; ++index) { + new (&Shards_[index]) TShard{}; + } +} + +template +TRseqCounterBase::~TRseqCounterBase() +{ + for (int index = 0; index < ShardCount_; ++index) { + Shards_[index].~TShard(); + } +} + +template +TMutableRange TRseqCounterBase::GetShards() +{ + return TMutableRange(Shards_, ShardCount_); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/rseq_sensor_impl.cpp b/yt/yt/library/profiling/rseq_sensor_impl.cpp new file mode 100644 index 00000000000..264a086cafe --- /dev/null +++ b/yt/yt/library/profiling/rseq_sensor_impl.cpp @@ -0,0 +1,88 @@ +#include "rseq_sensor_impl.h" + +#include + +#include + +#include + +namespace NYT::NProfiling { + +//////////////////////////////////////////////////////////////////////////////// + +TRseqCounter::TRseqCounter(int shardCount) + : TRseqCounterBase(shardCount) +{ } + +void TRseqCounter::Increment(i64 delta) +{ + NRseq::AddPerCpu(GetShards().Begin(), &TRseqCounterShard::Value, delta); +} + +i64 TRseqCounter::GetValue() +{ + i64 total = 0; + for (const auto& shard : GetShards()) { + total += __atomic_load_n(&shard.Value, __ATOMIC_RELAXED); + } + return total; +} + +//////////////////////////////////////////////////////////////////////////////// + +TRseqTimeCounter::TRseqTimeCounter(int shardCount) + : TRseqCounterBase(shardCount) +{ } + +void TRseqTimeCounter::Add(TDuration delta) +{ + NRseq::AddPerCpu(GetShards().Begin(), &TRseqTimeCounterShard::Value, delta.GetValue()); +} + +TDuration TRseqTimeCounter::GetValue() +{ + auto total = TDuration::Zero(); + for (const auto& shard : GetShards()) { + total += TDuration::FromValue(__atomic_load_n(&shard.Value, __ATOMIC_RELAXED)); + } + return total; +} + +//////////////////////////////////////////////////////////////////////////////// + +TRseqGauge::TRseqGauge(int shardCount) + : TRseqCounterBase(shardCount) +{ } + +void TRseqGauge::Update(double value) +{ + NRseq::StorePerCpu(GetShards().Begin(), &TRseqGaugeShard::Value, TRseqGaugeWrite{value, GetApproximateCpuInstant()}); +} + +double TRseqGauge::GetValue() +{ + double lastValue = 0.0; + TCpuInstant maxTimestamp = 0; + + for (const auto& shard : GetShards()) { + // Read the two 8-byte halves with relaxed atomic loads, matching how the fallback + // path stores them (StorePerCpu uses paired relaxed stores there). The rseq fast + // path commits all 16 bytes at once, so a reader may still observe a torn + // (value, timestamp) -- the documented last-writer-wins tradeoff a gauge tolerates. + const auto* halves = reinterpret_cast(&shard.Value); + auto write = __builtin_bit_cast(TRseqGaugeWrite, std::array{ + __atomic_load_n(&halves[0], __ATOMIC_RELAXED), + __atomic_load_n(&halves[1], __ATOMIC_RELAXED), + }); + if (write.Timestamp > maxTimestamp) { + maxTimestamp = write.Timestamp; + lastValue = write.Value; + } + } + + return lastValue; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/rseq_sensor_impl.h b/yt/yt/library/profiling/rseq_sensor_impl.h new file mode 100644 index 00000000000..4e1a9ef95a3 --- /dev/null +++ b/yt/yt/library/profiling/rseq_sensor_impl.h @@ -0,0 +1,138 @@ +#pragma once + +#include "impl.h" + +#include + +#include +#include +#include + +namespace NYT::NProfiling { + +//////////////////////////////////////////////////////////////////////////////// + +// rseq-backed hot sensors. Each update commits to the calling CPU's shard lock-free via an +// rseq critical section (see library/cpp/yt/rseq) -- no atomic and no lock on the fast +// path. The per-CPU shard array (one slot per NRseq::GetCpuCount()) is allocated inline +// with the object via NewWithExtraSpace, so a sensor costs a single allocation; the object +// is cache-line aligned so the trailing shards are too. rseq is Linux-only, so this header +// is included (and rseq_sensor_impl.cpp is built) only on Linux; the sensors are +// constructed (via Create) only when the rseq fast path is enabled (see +// TSolomonRegistry::IsRseqEnabled), with the atomic sharded counterparts in +// per_cpu_sensor_impl.h used otherwise. + +//////////////////////////////////////////////////////////////////////////////// + +struct alignas(CacheLineSize) TRseqCounterShard +{ + i64 Value = 0; +}; + +static_assert(sizeof(TRseqCounterShard) == CacheLineSize); + +struct alignas(CacheLineSize) TRseqTimeCounterShard +{ + TDuration::TValue Value = 0; +}; + +static_assert(sizeof(TRseqTimeCounterShard) == CacheLineSize); + +struct TRseqGaugeWrite +{ + double Value = 0; + TCpuInstant Timestamp = 0; +}; + +static_assert(sizeof(TRseqGaugeWrite) == 16); + +struct alignas(CacheLineSize) TRseqGaugeShard +{ + TRseqGaugeWrite Value; +}; + +static_assert(sizeof(TRseqGaugeShard) == CacheLineSize); + +//////////////////////////////////////////////////////////////////////////////// + +// Owns the inline per-CPU shard array shared by the rseq sensors below. The shards live in +// the extra space allocated alongside the object (NewWithExtraSpace); the base inherits +// TWithExtraSpace, locates the shards itself and exposes them through GetShards(). +// CRTP on the derived sensor so the inherited Create() allocates the leaf type. +template +class TRseqCounterBase + : public TWithExtraSpace +{ +public: + static TIntrusivePtr Create(); + +protected: + const int ShardCount_; + + explicit TRseqCounterBase(int shardCount); + ~TRseqCounterBase(); + + TMutableRange GetShards(); + +private: + TShard* const Shards_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class alignas(CacheLineSize) TRseqCounter + : public ICounter + , public TRseqCounterBase +{ +public: + void Increment(i64 delta) override; + + i64 GetValue() override; + +private: + explicit TRseqCounter(int shardCount); + + DECLARE_NEW_FRIEND() +}; + +//////////////////////////////////////////////////////////////////////////////// + +class alignas(CacheLineSize) TRseqTimeCounter + : public ITimeCounter + , public TRseqCounterBase +{ +public: + void Add(TDuration delta) override; + + TDuration GetValue() override; + +private: + explicit TRseqTimeCounter(int shardCount); + + DECLARE_NEW_FRIEND() +}; + +//////////////////////////////////////////////////////////////////////////////// + +class alignas(CacheLineSize) TRseqGauge + : public IGauge + , public TRseqCounterBase +{ +public: + void Update(double value) override; + + double GetValue() override; + +private: + explicit TRseqGauge(int shardCount); + + DECLARE_NEW_FRIEND() +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling + +#define RSEQ_SENSOR_IMPL_INL_H_ +#include "rseq_sensor_impl-inl.h" +#undef RSEQ_SENSOR_IMPL_INL_H_ diff --git a/yt/yt/library/profiling/sensor_impl.cpp b/yt/yt/library/profiling/sensor_impl.cpp deleted file mode 100644 index 7aade0c63bc..00000000000 --- a/yt/yt/library/profiling/sensor_impl.cpp +++ /dev/null @@ -1,268 +0,0 @@ -#include "sensor_impl.h" - -#include - -#include - -namespace NYT::NProfiling { - -//////////////////////////////////////////////////////////////////////////////// - -DEFINE_REFCOUNTED_TYPE(TSimpleCounter) -DEFINE_REFCOUNTED_TYPE(TSimpleGauge) - -//////////////////////////////////////////////////////////////////////////////// - -void TSimpleGauge::Update(double value) -{ - Value_.store(value, std::memory_order::relaxed); -} - -double TSimpleGauge::GetValue() -{ - return Value_.load(std::memory_order::relaxed); -} - -void TSimpleGauge::Record(double /*value*/) -{ - YT_UNIMPLEMENTED(); -} - -TSummarySnapshot TSimpleGauge::GetSummary() -{ - TSummarySnapshot summary; - summary.Record(GetValue()); - return summary; -} - -TSummarySnapshot TSimpleGauge::GetSummaryAndReset() -{ - return GetSummary(); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TSimpleTimeGauge::Update(TDuration value) -{ - Value_.store(value.GetValue(), std::memory_order::relaxed); -} - -TDuration TSimpleTimeGauge::GetValue() -{ - return TDuration::FromValue(Value_.load(std::memory_order::relaxed)); -} - -void TSimpleTimeGauge::Record(TDuration /*value*/) -{ - YT_UNIMPLEMENTED(); -} - -TSummarySnapshot TSimpleTimeGauge::GetSummary() -{ - TSummarySnapshot summary; - summary.Record(GetValue()); - return summary; -} - -TSummarySnapshot TSimpleTimeGauge::GetSummaryAndReset() -{ - return GetSummary(); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TSimpleCounter::Increment(i64 delta) -{ - YT_VERIFY(delta >= 0); - Value_.fetch_add(delta, std::memory_order::relaxed); -} - -i64 TSimpleCounter::GetValue() -{ - return Value_.load(std::memory_order::relaxed); -} - -//////////////////////////////////////////////////////////////////////////////// - -void TSimpleTimeCounter::Add(TDuration delta) -{ - Value_.fetch_add(delta.GetValue(), std::memory_order::relaxed); -} - -TDuration TSimpleTimeCounter::GetValue() -{ - return TDuration::FromValue(Value_.load(std::memory_order::relaxed)); -} - -//////////////////////////////////////////////////////////////////////////////// - -template -void TSimpleSummary::Record(T value) -{ - auto guard = Guard(Lock_); - Value_.Record(value); -} - -template -TSummarySnapshot TSimpleSummary::GetSummary() -{ - auto guard = Guard(Lock_); - return Value_; -} - -template -TSummarySnapshot TSimpleSummary::GetSummaryAndReset() -{ - auto guard = Guard(Lock_); - - auto value = Value_; - Value_ = {}; - return value; -} - -template class TSimpleSummary; -template class TSimpleSummary; - -//////////////////////////////////////////////////////////////////////////////// - -constexpr int MaxBinCount = 65; - -static auto GenericBucketBounds() -{ - std::array result; - - for (int index = 0; index <= 6; ++index) { - result[index] = 1ull << index; - } - - for (int index = 7; index < 10; ++index) { - result[index] = 1000ull >> (10 - index); - } - - for (int index = 10; index < MaxBinCount; ++index) { - result[index] = 1000 * result[index - 10]; - } - - return result; -} - -std::vector GenerateGenericBucketBounds() -{ - // BEWARE: Changing this variable will lead to master snapshots becoming invalid. - constexpr int MaxHistogramBinCount = 38; - std::vector result; - - auto genericBounds = GenericBucketBounds(); - result.reserve(MaxHistogramBinCount); - - for (int i = 0; i < std::ssize(genericBounds) && i < MaxHistogramBinCount; ++i) { - result.push_back(genericBounds[i]); - } - - return result; -} - -static std::vector BucketBounds(const TSensorOptions& options) -{ - if (!options.HistogramBounds.empty()) { - return options.HistogramBounds; - } - - std::vector bounds; - if (!options.TimeHistogramBounds.empty()) { - for (auto b : options.TimeHistogramBounds) { - bounds.push_back(b.SecondsFloat()); - } - return bounds; - } - - if (options.HistogramMin.Zero() && options.HistogramMax.Zero()) { - return {}; - } - - for (auto bound : GenericBucketBounds()) { - auto duration = TDuration::FromValue(bound); - if (options.HistogramMin <= duration && duration <= options.HistogramMax) { - bounds.push_back(duration.SecondsFloat()); - } - } - - return bounds; -} - -THistogram::THistogram(const TSensorOptions& options) - : Bounds_(BucketBounds(options)) - , Buckets_(Bounds_.size() + 1) -{ - YT_VERIFY(!Bounds_.empty()); - YT_VERIFY(Bounds_.size() <= MaxBinCount); -} - -void THistogram::Record(TDuration value) -{ - auto it = std::lower_bound(Bounds_.begin(), Bounds_.end(), value.SecondsFloat()); - Buckets_[it - Bounds_.begin()].fetch_add(1, std::memory_order::relaxed); -} - -void THistogram::Add(double value, int count) noexcept -{ - auto it = std::lower_bound(Bounds_.begin(), Bounds_.end(), value); - Buckets_[it - Bounds_.begin()].fetch_add(count, std::memory_order::relaxed); -} - -void THistogram::Remove(double value, int count) noexcept -{ - auto it = std::lower_bound(Bounds_.begin(), Bounds_.end(), value); - Buckets_[it - Bounds_.begin()].fetch_sub(count, std::memory_order::relaxed); -} - -void THistogram::Reset() noexcept -{ - for (int i = 0; i < std::ssize(Buckets_); ++i) { - Buckets_[i] = 0; - } -} - -THistogramSnapshot THistogram::GetSnapshot(bool reset) -{ - THistogramSnapshot snapshot; - snapshot.Bounds = Bounds_; - snapshot.Values.resize(Buckets_.size()); - - for (int i = 0; i < std::ssize(Buckets_); ++i) { - if (!reset) { - snapshot.Values[i] = Buckets_[i].load(std::memory_order::relaxed); - } else { - snapshot.Values[i] = Buckets_[i].exchange(0, std::memory_order::relaxed); - } - } - - return snapshot; -} - -void THistogram::LoadSnapshot(THistogramSnapshot snapshot) -{ - for (int i = 0; i < std::ssize(snapshot.Bounds); ++i) { - YT_VERIFY(Bounds_[i] == snapshot.Bounds[i]); - } - - YT_VERIFY(std::ssize(Buckets_) == std::ssize(Bounds_) + 1); - - for (int i = 0; i < std::ssize(snapshot.Values); ++i) { - Buckets_[i].store(snapshot.Values[i], std::memory_order::relaxed); - } -} - -TSummarySnapshot THistogram::GetSummary() -{ - YT_UNIMPLEMENTED(); -} - -TSummarySnapshot THistogram::GetSummaryAndReset() -{ - YT_UNIMPLEMENTED(); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/sensor_impl.h b/yt/yt/library/profiling/sensor_impl.h deleted file mode 100644 index 0343bf7b3d5..00000000000 --- a/yt/yt/library/profiling/sensor_impl.h +++ /dev/null @@ -1,135 +0,0 @@ -#pragma once - -#include -#include -#include - -#include - -namespace NYT::NProfiling { - -//////////////////////////////////////////////////////////////////////////////// - -class TSimpleGauge - : public IGauge - , public ISummary -{ -public: - void Update(double value) override; - - double GetValue() override; - - void Record(double value) override; - - TSummarySnapshot GetSummary() override; - TSummarySnapshot GetSummaryAndReset() override; - -private: - std::atomic Value_ = 0.0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TSimpleTimeGauge - : public ITimeGauge - , public ITimer -{ -public: - void Update(TDuration value) override; - - TDuration GetValue() override; - - void Record(TDuration value) override; - - TSummarySnapshot GetSummary() override; - TSummarySnapshot GetSummaryAndReset() override; - -private: - std::atomic Value_ = 0.0; -}; - -//////////////////////////////////////////////////////////////////////////////// - -class TSimpleCounter - : public ICounter -{ -public: - void Increment(i64 delta) override; - - i64 GetValue() override; - -private: - std::atomic Value_ = 0; -}; - -static_assert(sizeof(TSimpleCounter) == 32); - -//////////////////////////////////////////////////////////////////////////////// - -class TSimpleTimeCounter - : public ITimeCounter -{ -public: - void Add(TDuration delta) override; - - TDuration GetValue() override; - -private: - std::atomic Value_ = 0; -}; - -static_assert(sizeof(TSimpleTimeCounter) == 32); - -//////////////////////////////////////////////////////////////////////////////// - -template -class TSimpleSummary - : public ISummaryBase -{ -public: - void Record(T value) override; - - TSummarySnapshot GetSummary() override; - TSummarySnapshot GetSummaryAndReset() override; - -private: - YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_); - TSummarySnapshot Value_; -}; - -//////////////////////////////////////////////////////////////////////////////// - -DECLARE_REFCOUNTED_CLASS(THistogram) - -std::vector GenerateGenericBucketBounds(); - -class THistogram - : public ISummaryBase - , public IHistogram -{ -public: - explicit THistogram(const TSensorOptions& options); - - void Record(TDuration value) override; - - void Add(double value, int count) noexcept override; - void Remove(double value, int count) noexcept override; - void Reset() noexcept override; - - THistogramSnapshot GetSnapshot(bool reset) override; - void LoadSnapshot(THistogramSnapshot snapshot) override; - -private: - std::vector Bounds_; - std::vector> Buckets_; - - // These two methods are not used. - TSummarySnapshot GetSummary() override; - TSummarySnapshot GetSummaryAndReset() override; -}; - -DEFINE_REFCOUNTED_TYPE(THistogram) - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/simple_sensor_impl.cpp b/yt/yt/library/profiling/simple_sensor_impl.cpp new file mode 100644 index 00000000000..d10fe817218 --- /dev/null +++ b/yt/yt/library/profiling/simple_sensor_impl.cpp @@ -0,0 +1,268 @@ +#include "simple_sensor_impl.h" + +#include + +#include + +namespace NYT::NProfiling { + +//////////////////////////////////////////////////////////////////////////////// + +DEFINE_REFCOUNTED_TYPE(TSimpleCounter) +DEFINE_REFCOUNTED_TYPE(TSimpleGauge) + +//////////////////////////////////////////////////////////////////////////////// + +void TSimpleGauge::Update(double value) +{ + Value_.store(value, std::memory_order::relaxed); +} + +double TSimpleGauge::GetValue() +{ + return Value_.load(std::memory_order::relaxed); +} + +void TSimpleGauge::Record(double /*value*/) +{ + YT_UNIMPLEMENTED(); +} + +TSummarySnapshot TSimpleGauge::GetSummary() +{ + TSummarySnapshot summary; + summary.Record(GetValue()); + return summary; +} + +TSummarySnapshot TSimpleGauge::GetSummaryAndReset() +{ + return GetSummary(); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TSimpleTimeGauge::Update(TDuration value) +{ + Value_.store(value.GetValue(), std::memory_order::relaxed); +} + +TDuration TSimpleTimeGauge::GetValue() +{ + return TDuration::FromValue(Value_.load(std::memory_order::relaxed)); +} + +void TSimpleTimeGauge::Record(TDuration /*value*/) +{ + YT_UNIMPLEMENTED(); +} + +TSummarySnapshot TSimpleTimeGauge::GetSummary() +{ + TSummarySnapshot summary; + summary.Record(GetValue()); + return summary; +} + +TSummarySnapshot TSimpleTimeGauge::GetSummaryAndReset() +{ + return GetSummary(); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TSimpleCounter::Increment(i64 delta) +{ + YT_VERIFY(delta >= 0); + Value_.fetch_add(delta, std::memory_order::relaxed); +} + +i64 TSimpleCounter::GetValue() +{ + return Value_.load(std::memory_order::relaxed); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TSimpleTimeCounter::Add(TDuration delta) +{ + Value_.fetch_add(delta.GetValue(), std::memory_order::relaxed); +} + +TDuration TSimpleTimeCounter::GetValue() +{ + return TDuration::FromValue(Value_.load(std::memory_order::relaxed)); +} + +//////////////////////////////////////////////////////////////////////////////// + +template +void TSimpleSummary::Record(T value) +{ + auto guard = Guard(Lock_); + Value_.Record(value); +} + +template +TSummarySnapshot TSimpleSummary::GetSummary() +{ + auto guard = Guard(Lock_); + return Value_; +} + +template +TSummarySnapshot TSimpleSummary::GetSummaryAndReset() +{ + auto guard = Guard(Lock_); + + auto value = Value_; + Value_ = {}; + return value; +} + +template class TSimpleSummary; +template class TSimpleSummary; + +//////////////////////////////////////////////////////////////////////////////// + +constexpr int MaxBinCount = 65; + +static auto GenericBucketBounds() +{ + std::array result; + + for (int index = 0; index <= 6; ++index) { + result[index] = 1ull << index; + } + + for (int index = 7; index < 10; ++index) { + result[index] = 1000ull >> (10 - index); + } + + for (int index = 10; index < MaxBinCount; ++index) { + result[index] = 1000 * result[index - 10]; + } + + return result; +} + +std::vector GenerateGenericBucketBounds() +{ + // BEWARE: Changing this variable will lead to master snapshots becoming invalid. + constexpr int MaxHistogramBinCount = 38; + std::vector result; + + auto genericBounds = GenericBucketBounds(); + result.reserve(MaxHistogramBinCount); + + for (int i = 0; i < std::ssize(genericBounds) && i < MaxHistogramBinCount; ++i) { + result.push_back(genericBounds[i]); + } + + return result; +} + +static std::vector BucketBounds(const TSensorOptions& options) +{ + if (!options.HistogramBounds.empty()) { + return options.HistogramBounds; + } + + std::vector bounds; + if (!options.TimeHistogramBounds.empty()) { + for (auto b : options.TimeHistogramBounds) { + bounds.push_back(b.SecondsFloat()); + } + return bounds; + } + + if (options.HistogramMin.Zero() && options.HistogramMax.Zero()) { + return {}; + } + + for (auto bound : GenericBucketBounds()) { + auto duration = TDuration::FromValue(bound); + if (options.HistogramMin <= duration && duration <= options.HistogramMax) { + bounds.push_back(duration.SecondsFloat()); + } + } + + return bounds; +} + +THistogram::THistogram(const TSensorOptions& options) + : Bounds_(BucketBounds(options)) + , Buckets_(Bounds_.size() + 1) +{ + YT_VERIFY(!Bounds_.empty()); + YT_VERIFY(Bounds_.size() <= MaxBinCount); +} + +void THistogram::Record(TDuration value) +{ + auto it = std::lower_bound(Bounds_.begin(), Bounds_.end(), value.SecondsFloat()); + Buckets_[it - Bounds_.begin()].fetch_add(1, std::memory_order::relaxed); +} + +void THistogram::Add(double value, int count) noexcept +{ + auto it = std::lower_bound(Bounds_.begin(), Bounds_.end(), value); + Buckets_[it - Bounds_.begin()].fetch_add(count, std::memory_order::relaxed); +} + +void THistogram::Remove(double value, int count) noexcept +{ + auto it = std::lower_bound(Bounds_.begin(), Bounds_.end(), value); + Buckets_[it - Bounds_.begin()].fetch_sub(count, std::memory_order::relaxed); +} + +void THistogram::Reset() noexcept +{ + for (int i = 0; i < std::ssize(Buckets_); ++i) { + Buckets_[i] = 0; + } +} + +THistogramSnapshot THistogram::GetSnapshot(bool reset) +{ + THistogramSnapshot snapshot; + snapshot.Bounds = Bounds_; + snapshot.Values.resize(Buckets_.size()); + + for (int i = 0; i < std::ssize(Buckets_); ++i) { + if (!reset) { + snapshot.Values[i] = Buckets_[i].load(std::memory_order::relaxed); + } else { + snapshot.Values[i] = Buckets_[i].exchange(0, std::memory_order::relaxed); + } + } + + return snapshot; +} + +void THistogram::LoadSnapshot(THistogramSnapshot snapshot) +{ + for (int i = 0; i < std::ssize(snapshot.Bounds); ++i) { + YT_VERIFY(Bounds_[i] == snapshot.Bounds[i]); + } + + YT_VERIFY(std::ssize(Buckets_) == std::ssize(Bounds_) + 1); + + for (int i = 0; i < std::ssize(snapshot.Values); ++i) { + Buckets_[i].store(snapshot.Values[i], std::memory_order::relaxed); + } +} + +TSummarySnapshot THistogram::GetSummary() +{ + YT_UNIMPLEMENTED(); +} + +TSummarySnapshot THistogram::GetSummaryAndReset() +{ + YT_UNIMPLEMENTED(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/simple_sensor_impl.h b/yt/yt/library/profiling/simple_sensor_impl.h new file mode 100644 index 00000000000..0343bf7b3d5 --- /dev/null +++ b/yt/yt/library/profiling/simple_sensor_impl.h @@ -0,0 +1,135 @@ +#pragma once + +#include +#include +#include + +#include + +namespace NYT::NProfiling { + +//////////////////////////////////////////////////////////////////////////////// + +class TSimpleGauge + : public IGauge + , public ISummary +{ +public: + void Update(double value) override; + + double GetValue() override; + + void Record(double value) override; + + TSummarySnapshot GetSummary() override; + TSummarySnapshot GetSummaryAndReset() override; + +private: + std::atomic Value_ = 0.0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TSimpleTimeGauge + : public ITimeGauge + , public ITimer +{ +public: + void Update(TDuration value) override; + + TDuration GetValue() override; + + void Record(TDuration value) override; + + TSummarySnapshot GetSummary() override; + TSummarySnapshot GetSummaryAndReset() override; + +private: + std::atomic Value_ = 0.0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TSimpleCounter + : public ICounter +{ +public: + void Increment(i64 delta) override; + + i64 GetValue() override; + +private: + std::atomic Value_ = 0; +}; + +static_assert(sizeof(TSimpleCounter) == 32); + +//////////////////////////////////////////////////////////////////////////////// + +class TSimpleTimeCounter + : public ITimeCounter +{ +public: + void Add(TDuration delta) override; + + TDuration GetValue() override; + +private: + std::atomic Value_ = 0; +}; + +static_assert(sizeof(TSimpleTimeCounter) == 32); + +//////////////////////////////////////////////////////////////////////////////// + +template +class TSimpleSummary + : public ISummaryBase +{ +public: + void Record(T value) override; + + TSummarySnapshot GetSummary() override; + TSummarySnapshot GetSummaryAndReset() override; + +private: + YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_); + TSummarySnapshot Value_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +DECLARE_REFCOUNTED_CLASS(THistogram) + +std::vector GenerateGenericBucketBounds(); + +class THistogram + : public ISummaryBase + , public IHistogram +{ +public: + explicit THistogram(const TSensorOptions& options); + + void Record(TDuration value) override; + + void Add(double value, int count) noexcept override; + void Remove(double value, int count) noexcept override; + void Reset() noexcept override; + + THistogramSnapshot GetSnapshot(bool reset) override; + void LoadSnapshot(THistogramSnapshot snapshot) override; + +private: + std::vector Bounds_; + std::vector> Buckets_; + + // These two methods are not used. + TSummarySnapshot GetSummary() override; + TSummarySnapshot GetSummaryAndReset() override; +}; + +DEFINE_REFCOUNTED_TYPE(THistogram) + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/solomon/config.cpp b/yt/yt/library/profiling/solomon/config.cpp index 830abd19c48..b8f0fad637c 100644 --- a/yt/yt/library/profiling/solomon/config.cpp +++ b/yt/yt/library/profiling/solomon/config.cpp @@ -6,6 +6,32 @@ namespace NYT::NProfiling { //////////////////////////////////////////////////////////////////////////////// +TSolomonRegistryConfigPtr TSolomonRegistryConfig::ApplyDynamic( + const TSolomonRegistryDynamicConfigPtr& dynamicConfig) const +{ + auto result = New(); + result->EnableRseq = EnableRseq; + NYTree::UpdateYsonStructField(result->EnableRseq, dynamicConfig->EnableRseq); + result->Postprocess(); + return result; +} + +void TSolomonRegistryConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("enable_rseq", &TThis::EnableRseq) + .Default(false); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TSolomonRegistryDynamicConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("enable_rseq", &TThis::EnableRseq) + .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + void TShardConfig::Register(TRegistrar registrar) { registrar.Parameter("filter", &TThis::Filter) diff --git a/yt/yt/library/profiling/solomon/config.h b/yt/yt/library/profiling/solomon/config.h index a84459a95cf..e28aa099ede 100644 --- a/yt/yt/library/profiling/solomon/config.h +++ b/yt/yt/library/profiling/solomon/config.h @@ -26,6 +26,39 @@ DEFINE_REFCOUNTED_TYPE(TShardConfig) //////////////////////////////////////////////////////////////////////////////// +struct TSolomonRegistryConfig + : public NYTree::TYsonStruct +{ + //! Enables the lock-free rseq fast path for hot sensors. Off by default; even when on, a + //! hot sensor uses it only if a runtime safety probe passes (see + //! TSolomonRegistry::IsRseqEnabled). The choice is read when a hot sensor is constructed. + bool EnableRseq; + + TSolomonRegistryConfigPtr ApplyDynamic(const TSolomonRegistryDynamicConfigPtr& dynamicConfig) const; + + REGISTER_YSON_STRUCT(TSolomonRegistryConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TSolomonRegistryConfig) + +//////////////////////////////////////////////////////////////////////////////// + +struct TSolomonRegistryDynamicConfig + : public NYTree::TYsonStruct +{ + std::optional EnableRseq; + + REGISTER_YSON_STRUCT(TSolomonRegistryDynamicConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TSolomonRegistryDynamicConfig) + +//////////////////////////////////////////////////////////////////////////////// + struct TSolomonExporterConfig : public NYTree::TYsonStruct { diff --git a/yt/yt/library/profiling/solomon/configure_solomon_registry.cpp b/yt/yt/library/profiling/solomon/configure_solomon_registry.cpp new file mode 100644 index 00000000000..f1090cc987b --- /dev/null +++ b/yt/yt/library/profiling/solomon/configure_solomon_registry.cpp @@ -0,0 +1,41 @@ +#include "registry.h" +#include "config.h" + +#include + +namespace NYT::NProfiling { + +using namespace NYTree; + +//////////////////////////////////////////////////////////////////////////////// + +void SetupSingletonConfigParameter(TYsonStructParameter& parameter) +{ + parameter.DefaultNew(); +} + +void SetupSingletonConfigParameter(TYsonStructParameter& parameter) +{ + parameter.DefaultNew(); +} + +void ConfigureSingleton(const TSolomonRegistryConfigPtr& config) +{ + TSolomonRegistry::Get()->Configure(config); +} + +void ReconfigureSingleton( + const TSolomonRegistryConfigPtr& config, + const TSolomonRegistryDynamicConfigPtr& dynamicConfig) +{ + TSolomonRegistry::Get()->Configure(config->ApplyDynamic(dynamicConfig)); +} + +YT_DEFINE_RECONFIGURABLE_SINGLETON( + "solomon_registry", + TSolomonRegistryConfig, + TSolomonRegistryDynamicConfig); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/solomon/helpers.cpp b/yt/yt/library/profiling/solomon/helpers.cpp index 9864f2fde55..f8f1ac91a71 100644 --- a/yt/yt/library/profiling/solomon/helpers.cpp +++ b/yt/yt/library/profiling/solomon/helpers.cpp @@ -3,8 +3,12 @@ #include "producer.h" #include "sensor_set.h" -#include -#include +#include +#include + +#ifdef __linux__ +#include +#endif #include @@ -97,6 +101,13 @@ i64 GetCountersBytesAlive() usage += tracker->GetBytesAlive(GetRefCountedTypeKey()); usage += tracker->GetBytesAlive(GetRefCountedTypeKey()); + // The rseq-backed hot counters and gauge exist only on Linux (rseq_sensor_impl.h). +#ifdef __linux__ + usage += tracker->GetBytesAlive(GetRefCountedTypeKey()); + usage += tracker->GetBytesAlive(GetRefCountedTypeKey()); + usage += tracker->GetBytesAlive(GetRefCountedTypeKey()); +#endif + usage += tracker->GetBytesAlive(GetRefCountedTypeKey>()); usage += tracker->GetBytesAlive(GetRefCountedTypeKey>()); usage += tracker->GetBytesAlive(GetRefCountedTypeKey>()); diff --git a/yt/yt/library/profiling/solomon/public.h b/yt/yt/library/profiling/solomon/public.h index 2439b45bd8b..4eb3c098d01 100644 --- a/yt/yt/library/profiling/solomon/public.h +++ b/yt/yt/library/profiling/solomon/public.h @@ -12,6 +12,8 @@ namespace NYT::NProfiling { DECLARE_REFCOUNTED_STRUCT(TShardConfig) DECLARE_REFCOUNTED_STRUCT(TSolomonExporterConfig) DECLARE_REFCOUNTED_STRUCT(TSolomonProxyConfig) +DECLARE_REFCOUNTED_STRUCT(TSolomonRegistryConfig) +DECLARE_REFCOUNTED_STRUCT(TSolomonRegistryDynamicConfig) DECLARE_REFCOUNTED_CLASS(TSolomonExporter) YT_DECLARE_TYPEID(TSolomonExporter) diff --git a/yt/yt/library/profiling/solomon/registry.cpp b/yt/yt/library/profiling/solomon/registry.cpp index 5f44950e99b..c1509818721 100644 --- a/yt/yt/library/profiling/solomon/registry.cpp +++ b/yt/yt/library/profiling/solomon/registry.cpp @@ -1,7 +1,15 @@ #include "registry.h" -#include -#include +#include "config.h" + +#include +#include + +#ifdef __linux__ +#include + +#include +#endif #include @@ -15,20 +23,77 @@ using namespace NYTree; //////////////////////////////////////////////////////////////////////////////// +// The rseq-backed hot sensors are Linux-only (rseq_sensor_impl.h); off Linux the +// alias is a dummy (never instantiated -- the rseq branch is compiled out below). +#ifdef __linux__ +using TPortableRseqCounter = TRseqCounter; +using TPortableRseqTimeCounter = TRseqTimeCounter; +using TPortableRseqGauge = TRseqGauge; +#else +using TPortableRseqCounter = TPerCpuCounter; +using TPortableRseqTimeCounter = TPerCpuTimeCounter; +using TPortableRseqGauge = TPerCpuGauge; +#endif + +//////////////////////////////////////////////////////////////////////////////// + TSolomonRegistry::TSolomonRegistry() = default; +void TSolomonRegistry::Configure(const TSolomonRegistryConfigPtr& config) +{ + // Use the rseq fast path only when it is both requested and supported in this process: a + // runtime probe confirms the cached thread-pointer offset is stable across threads -- not + // so when our __rseq_abi lands in a dlopen'd module's dynamically allocated TLS. + bool enabled = config->EnableRseq; +#ifdef __linux__ + enabled = enabled && NRseq::IsPerCpuFastPathSupported(); +#else + enabled = false; +#endif + RseqEnabled_.store(enabled, std::memory_order::relaxed); +} + +bool TSolomonRegistry::IsRseqEnabled() const +{ + return RseqEnabled_.load(std::memory_order::relaxed); +} + template TIntrusivePtr SelectImpl(bool hot, const TFn& fn) { - if (!hot) { - auto counter = New(); - fn(counter); - return counter; - } else { - auto counter = New(); - fn(counter); - return counter; - } + auto sensor = [&] () -> TIntrusivePtr { + if (!hot) { + return New(); + } + return New(); + }(); + fn(sensor); + return sensor; +} + +// Like SelectImpl, but the hot sensor is chosen at construction time between the rseq fast +// path (when enabled and available) and the atomic sharded fallback. The choice is made +// once per sensor here, so the hot Increment/Update path carries no dispatch. +template +TIntrusivePtr SelectFastImpl(bool hot, bool rseqEnabled, const TFn& fn) +{ + auto sensor = [&] () -> TIntrusivePtr { + if (!hot) { + return New(); + } +#ifdef __linux__ + if (rseqEnabled) { + // The rseq sensors fold their per-CPU shard array into the object allocation + // (NewWithExtraSpace), so they are built via a factory rather than New. + return TRseq::Create(); + } +#else + Y_UNUSED(rseqEnabled); +#endif + return New(); + }(); + fn(sensor); + return sensor; } ICounterPtr TSolomonRegistry::RegisterCounter( @@ -36,7 +101,7 @@ ICounterPtr TSolomonRegistry::RegisterCounter( const TTagSet& tags, TSensorOptions options) { - return SelectImpl(options.Hot, [&, this] (const auto& counter) { + return SelectFastImpl(options.Hot, IsRseqEnabled(), [&, this] (const auto& counter) { DoRegister([this, name = std::string(name), tags, options = std::move(options), counter] { auto reader = [ptr = counter.Get()] { return ptr->GetValue(); @@ -53,8 +118,9 @@ ITimeCounterPtr TSolomonRegistry::RegisterTimeCounter( const TTagSet& tags, TSensorOptions options) { - return SelectImpl( + return SelectFastImpl( options.Hot, + IsRseqEnabled(), [&, this] (const auto& counter) { DoRegister([this, name = std::string(name), tags, options = std::move(options), counter] { auto set = FindSet(name, options); @@ -68,7 +134,7 @@ IGaugePtr TSolomonRegistry::RegisterGauge( const TTagSet& tags, TSensorOptions options) { - return SelectImpl(options.Hot, [&, this] (const auto& gauge) { + return SelectFastImpl(options.Hot, IsRseqEnabled(), [&, this] (const auto& gauge) { if (options.DisableDefault) { gauge->Update(std::numeric_limits::quiet_NaN()); } diff --git a/yt/yt/library/profiling/solomon/registry.h b/yt/yt/library/profiling/solomon/registry.h index 61ebd16c965..3a927829e1b 100644 --- a/yt/yt/library/profiling/solomon/registry.h +++ b/yt/yt/library/profiling/solomon/registry.h @@ -122,6 +122,12 @@ public: static TSolomonRegistryPtr Get(); + void Configure(const TSolomonRegistryConfigPtr& config); + + //! Returns whether hot sensors use the rseq fast path: rseq is enabled in config and the + //! runtime safety probe passed. + bool IsRseqEnabled() const; + void Disable(); void SetDynamicTags(std::vector dynamicTags); @@ -167,6 +173,7 @@ private: std::vector DynamicTags_; std::atomic Disabled_ = false; + std::atomic RseqEnabled_ = false; TMpscStack> RegistrationQueue_; template diff --git a/yt/yt/library/profiling/solomon/sensor_set.h b/yt/yt/library/profiling/solomon/sensor_set.h index 2f04230b820..2e46e274c50 100644 --- a/yt/yt/library/profiling/solomon/sensor_set.h +++ b/yt/yt/library/profiling/solomon/sensor_set.h @@ -3,7 +3,7 @@ #include "cube.h" #include "tag_registry.h" -#include +#include #include #include diff --git a/yt/yt/library/profiling/solomon/ya.make b/yt/yt/library/profiling/solomon/ya.make index 5fc4700c8ca..7fecaf4dee1 100644 --- a/yt/yt/library/profiling/solomon/ya.make +++ b/yt/yt/library/profiling/solomon/ya.make @@ -4,6 +4,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( config.cpp + GLOBAL configure_solomon_registry.cpp cube.cpp encoder.cpp exporter.cpp diff --git a/yt/yt/library/profiling/unittests/per_cpu_sensor_impl_ut.cpp b/yt/yt/library/profiling/unittests/per_cpu_sensor_impl_ut.cpp new file mode 100644 index 00000000000..76638293798 --- /dev/null +++ b/yt/yt/library/profiling/unittests/per_cpu_sensor_impl_ut.cpp @@ -0,0 +1,71 @@ +#include + +#include + +#include + +#include + +#include +#include +#include + +namespace NYT::NProfiling { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +// The atomic sharded sensors (per_cpu_sensor_impl.h). The rseq-backed counterparts are +// covered, with the same checks, by rseq_sensor_impl_ut.cpp. + +TEST(TPerCpuSensorTest, CounterAccumulates) +{ + auto counter = New(); + counter->Increment(1'000'000'000'000LL); + counter->Increment(-7); + counter->Increment(-1'000'000'000'000LL); + EXPECT_EQ(counter->GetValue(), -7); +} + +TEST(TPerCpuSensorTest, CounterConcurrentNoLostUpdates) +{ + auto counter = New(); + + int threadCount = std::max(4, std::thread::hardware_concurrency()); + constexpr i64 IterationCount = 1'000'000; + + std::vector threads; + for (int index = 0; index < threadCount; ++index) { + threads.emplace_back([&] { + for (i64 i = 0; i < IterationCount; ++i) { + counter->Increment(1); + } + }); + } + for (auto& thread : threads) { + thread.join(); + } + + EXPECT_EQ(counter->GetValue(), static_cast(threadCount) * IterationCount); +} + +TEST(TPerCpuSensorTest, TimeCounterAccumulates) +{ + auto counter = New(); + counter->Add(TDuration::MicroSeconds(10)); + counter->Add(TDuration::MicroSeconds(5)); + EXPECT_EQ(counter->GetValue(), TDuration::MicroSeconds(15)); +} + +TEST(TPerCpuSensorTest, GaugePublishesLastValue) +{ + auto gauge = New(); + gauge->Update(1.0); + gauge->Update(42.0); + EXPECT_EQ(gauge->GetValue(), 42.0); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/unittests/rseq_sensor_impl_ut.cpp b/yt/yt/library/profiling/unittests/rseq_sensor_impl_ut.cpp new file mode 100644 index 00000000000..1a9535c30e9 --- /dev/null +++ b/yt/yt/library/profiling/unittests/rseq_sensor_impl_ut.cpp @@ -0,0 +1,73 @@ +#include + +#include + +#include + +#include + +#include +#include +#include + +namespace NYT::NProfiling { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +// The rseq-backed sharded sensors (rseq_sensor_impl.h, Linux-only). Mirrors +// per_cpu_sensor_impl_ut.cpp so the two interchangeable hot implementations stay in sync. + +TEST(TRseqSensorTest, CounterAccumulates) +{ + auto counter = TRseqCounter::Create(); + counter->Increment(1'000'000'000'000LL); + counter->Increment(-7); + counter->Increment(-1'000'000'000'000LL); + EXPECT_EQ(counter->GetValue(), -7); +} + +// The core guarantee: across many threads (which the scheduler migrates between CPUs, +// exercising rseq aborts/restarts), not a single increment is lost. +TEST(TRseqSensorTest, CounterConcurrentNoLostUpdates) +{ + auto counter = TRseqCounter::Create(); + + int threadCount = std::max(4, std::thread::hardware_concurrency()); + constexpr i64 IterationCount = 1'000'000; + + std::vector threads; + for (int index = 0; index < threadCount; ++index) { + threads.emplace_back([&] { + for (i64 i = 0; i < IterationCount; ++i) { + counter->Increment(1); + } + }); + } + for (auto& thread : threads) { + thread.join(); + } + + EXPECT_EQ(counter->GetValue(), static_cast(threadCount) * IterationCount); +} + +TEST(TRseqSensorTest, TimeCounterAccumulates) +{ + auto counter = TRseqTimeCounter::Create(); + counter->Add(TDuration::MicroSeconds(10)); + counter->Add(TDuration::MicroSeconds(5)); + EXPECT_EQ(counter->GetValue(), TDuration::MicroSeconds(15)); +} + +TEST(TRseqSensorTest, GaugePublishesLastValue) +{ + auto gauge = TRseqGauge::Create(); + gauge->Update(1.0); + gauge->Update(42.0); + EXPECT_EQ(gauge->GetValue(), 42.0); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/unittests/simple_sensor_impl_ut.cpp b/yt/yt/library/profiling/unittests/simple_sensor_impl_ut.cpp new file mode 100644 index 00000000000..3574d3cd251 --- /dev/null +++ b/yt/yt/library/profiling/unittests/simple_sensor_impl_ut.cpp @@ -0,0 +1,72 @@ +#include + +#include + +#include + +#include + +#include +#include +#include + +namespace NYT::NProfiling { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +// The simple, single-atomic sensors (simple_sensor_impl.h) used for non-hot sensors and as the +// off-Linux hot fallback. + +// TSimpleCounter is monotonic (it verifies delta >= 0), unlike the sharded counters. +TEST(TSimpleSensorTest, CounterAccumulates) +{ + auto counter = New(); + counter->Increment(3); + counter->Increment(1'000'000'000'000LL); + counter->Increment(7); + EXPECT_EQ(counter->GetValue(), 1'000'000'000'010LL); +} + +TEST(TSimpleSensorTest, CounterConcurrentNoLostUpdates) +{ + auto counter = New(); + + int threadCount = std::max(4, std::thread::hardware_concurrency()); + constexpr i64 IterationCount = 1'000'000; + + std::vector threads; + for (int index = 0; index < threadCount; ++index) { + threads.emplace_back([&] { + for (i64 i = 0; i < IterationCount; ++i) { + counter->Increment(1); + } + }); + } + for (auto& thread : threads) { + thread.join(); + } + + EXPECT_EQ(counter->GetValue(), static_cast(threadCount) * IterationCount); +} + +TEST(TSimpleSensorTest, TimeCounterAccumulates) +{ + auto counter = New(); + counter->Add(TDuration::MicroSeconds(10)); + counter->Add(TDuration::MicroSeconds(5)); + EXPECT_EQ(counter->GetValue(), TDuration::MicroSeconds(15)); +} + +TEST(TSimpleSensorTest, GaugePublishesLastValue) +{ + auto gauge = New(); + gauge->Update(1.0); + gauge->Update(42.0); + EXPECT_EQ(gauge->GetValue(), 42.0); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NProfiling diff --git a/yt/yt/library/profiling/unittests/ya.make b/yt/yt/library/profiling/unittests/ya.make index c58e443b23e..8d88ca13985 100644 --- a/yt/yt/library/profiling/unittests/ya.make +++ b/yt/yt/library/profiling/unittests/ya.make @@ -7,6 +7,8 @@ IF (OS_LINUX) ENDIF() SRCS( + per_cpu_sensor_impl_ut.cpp + simple_sensor_impl_ut.cpp sensor_ut.cpp sensor_service_ut.cpp name_conflicts_ut.cpp @@ -19,6 +21,13 @@ SRCS( exporter_ut.cpp ) +# The rseq-backed sensors exist only on Linux (see rseq_sensor_impl.h). +IF (OS_LINUX) + SRCS( + rseq_sensor_impl_ut.cpp + ) +ENDIF() + INCLUDE(${ARCADIA_ROOT}/yt/opensource.inc) PEERDIR( diff --git a/yt/yt/library/profiling/ya.make b/yt/yt/library/profiling/ya.make index 732a9fb3cd2..6adec566ac2 100644 --- a/yt/yt/library/profiling/ya.make +++ b/yt/yt/library/profiling/ya.make @@ -5,10 +5,10 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( histogram_snapshot.cpp impl.cpp - percpu.cpp + per_cpu_sensor_impl.cpp producer.cpp sensor.cpp - sensor_impl.cpp + simple_sensor_impl.cpp tag.cpp testing.cpp ) @@ -22,6 +22,16 @@ PEERDIR( library/cpp/yt/threading ) +# The rseq-backed sensors use library/cpp/yt/rseq, which is Linux-only. +IF (OS_LINUX) + SRCS( + rseq_sensor_impl.cpp + ) + PEERDIR( + library/cpp/yt/rseq + ) +ENDIF() + END() RECURSE( -- cgit v1.3 From fa6e382a237dc16728baa8a4b3616ba902683120 Mon Sep 17 00:00:00 2001 From: robot-ratatosk Date: Thu, 25 Jun 2026 02:47:10 +0300 Subject: New version of the tld SKIP_CHECK SKIP_REVIEW commit_hash:1f26f20bb31b7321a5f24277d10d6bcd814ea442 --- library/cpp/tld/tlds-alpha-by-domain.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'library/cpp') diff --git a/library/cpp/tld/tlds-alpha-by-domain.txt b/library/cpp/tld/tlds-alpha-by-domain.txt index 91c361781e3..d14bb7480c9 100644 --- a/library/cpp/tld/tlds-alpha-by-domain.txt +++ b/library/cpp/tld/tlds-alpha-by-domain.txt @@ -1,4 +1,4 @@ -# Version 2026061800, Last Updated Thu Jun 18 07:07:01 2026 UTC +# Version 2026062302, Last Updated Wed Jun 24 07:07:01 2026 UTC AAA AARP ABB -- cgit v1.3 From ba2fd21ade4a0de6a6f530a7c33eafc8198dc1c7 Mon Sep 17 00:00:00 2001 From: uranix Date: Thu, 25 Jun 2026 21:43:30 +0300 Subject: Revert commit rXXXXXX, Add zstd codec on hamster commit_hash:8ba59395f9d9a69e5d6ebc82ab5ed0765f142f39 --- library/cpp/http/io/compression.cpp | 2 -- library/cpp/http/io/ya.make | 1 - 2 files changed, 3 deletions(-) (limited to 'library/cpp') diff --git a/library/cpp/http/io/compression.cpp b/library/cpp/http/io/compression.cpp index c68a4e976b6..8fa1f62ae69 100644 --- a/library/cpp/http/io/compression.cpp +++ b/library/cpp/http/io/compression.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include @@ -23,7 +22,6 @@ TCompressionCodecFactory::TCompressionCodecFactory() { Add("gzip", gzip, [](auto s) { return MakeHolder(s, ZLib::GZip); }); Add("deflate", gzip, [](auto s) { return MakeHolder(s, ZLib::ZLib); }); Add("br", [](auto s) { return MakeHolder(s); }, [](auto s) { return MakeHolder(s, 4); }); - Add("zstd", [](auto s) { return MakeHolder(s); }, [](auto s) { return MakeHolder(s, 3); }); Add("x-gzip", gzip, [](auto s) { return MakeHolder(s, ZLib::GZip); }); Add("x-deflate", gzip, [](auto s) { return MakeHolder(s, ZLib::ZLib); }); diff --git a/library/cpp/http/io/ya.make b/library/cpp/http/io/ya.make index 7ca76f44d92..873e2f34e34 100644 --- a/library/cpp/http/io/ya.make +++ b/library/cpp/http/io/ya.make @@ -13,7 +13,6 @@ PEERDIR( library/cpp/streams/brotli library/cpp/streams/bzip2 library/cpp/streams/lzma - library/cpp/streams/zstd ) SRCS( -- cgit v1.3