summaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorbabenko <[email protected]>2026-06-19 14:27:43 +0300
committerbabenko <[email protected]>2026-06-19 15:12:00 +0300
commit89c0e29c8f9ba29ecdc736fefda87286482ac213 (patch)
tree1adfbcd839240d8d0155771c6b775fa00a3e5f32 /library/cpp
parent824b32f6aab5c67b2d39288b1d229eb257f248f0 (diff)
Add lock-free per-CPU primitives to library/cpp/yt/rseq
Introduce AddPerCpu and StorePerCpu over an rseq-sharded per-CPU array. On the x86-64 Linux fast path the update is committed by a hand-rolled rseq critical section (non-atomic, migration-safe): addq for the 8-byte accumulate, movq / movdqu for the 8- or 16-byte store. The kernel restarts the sequence on preemption or migration, and only one thread runs on a CPU at a time, so no atomic or lock is needed. Off the fast path (other arches, no kernel rseq) the operation falls back to an atomic on the slot indexed by sched_getcpu(). A naturally-aligned 8-byte store is single-copy atomic on x86-64, so it is never observed torn; the 16-byte store may be, which is acceptable for a last-writer-wins gauge. commit_hash:6250f6e9e35cf3895ebafe0b534ec12cca50b03b
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/yt/rseq/per_cpu-inl.h340
-rw-r--r--library/cpp/yt/rseq/per_cpu.cpp112
-rw-r--r--library/cpp/yt/rseq/per_cpu.h76
-rw-r--r--library/cpp/yt/rseq/rseq.cpp7
-rw-r--r--library/cpp/yt/rseq/unittests/per_cpu_ut.cpp229
-rw-r--r--library/cpp/yt/rseq/unittests/ya.make14
-rw-r--r--library/cpp/yt/rseq/ya.make6
7 files changed, 781 insertions, 3 deletions
diff --git a/library/cpp/yt/rseq/per_cpu-inl.h b/library/cpp/yt/rseq/per_cpu-inl.h
new file mode 100644
index 00000000000..42d8dd299bc
--- /dev/null
+++ b/library/cpp/yt/rseq/per_cpu-inl.h
@@ -0,0 +1,340 @@
+#ifndef PER_CPU_INL_H_
+#error "Direct inclusion of this file is not allowed, include per_cpu.h"
+// For the sake of sane code completion.
+#include "per_cpu.h"
+#endif
+
+#include <library/cpp/yt/assert/assert.h>
+
+#include <util/system/compiler.h>
+
+#if defined(__x86_64__)
+#include <emmintrin.h> // __m128i -- used only by the x86 rseq fast-path 16-byte store
+#endif
+
+#include <array>
+#include <string_view>
+
+// The full rseq fast path (a per-CPU non-atomic add committed by an rseq critical
+// section) is implemented for x86-64 Linux only. Everywhere else AddPerCpu uses the
+// atomic fallback.
+#if defined(__x86_64__)
+ #include "rseq.h"
+ #define YT_RSEQ_PERCPU_FAST
+#endif
+
+namespace NYT::NRseq {
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Returns a CPU id in [0, GetCpuCount()) for the atomic fallback path.
+int GetFallbackCpuId();
+
+//! Parses a kernel CPU range list (e.g. "0-3,8-11") and returns the highest CPU id plus one,
+//! or -1 if the list contains no id. Exposed for testing; #GetCpuCount feeds it the
+//! /sys/devices/system/cpu/possible bitmap.
+int ParsePossibleCpuCount(std::string_view list);
+
+//! Cached #GetCpuCount value used by the fast path's bounds check (see per_cpu.cpp).
+/*!
+ * The fast path indexes the slot array by the raw rseq cpu_id and bounds-checks it against
+ * this value with a single unsigned compare (which also rejects a negative, unregistered
+ * cpu_id); an out-of-range cpu_id takes the clamped atomic fallback instead. Set by the
+ * first call to #GetCpuCount; defaults to 0 so every update falls back until the size is
+ * known -- callers must therefore size the array via #GetCpuCount before any update.
+ */
+extern constinit int CpuCount;
+
+//! Returns a pointer to slot |index| of a per-CPU array -- |base| advanced by
+//! |index * stride| bytes. The size_t cast keeps the byte offset from overflowing.
+template <class T>
+Y_FORCE_INLINE T* GetSlot(T* base, size_t stride, int index)
+{
+ using TByte = std::conditional_t<std::is_const_v<T>, const char, char>;
+ return reinterpret_cast<T*>(
+ reinterpret_cast<TByte*>(base) + static_cast<size_t>(index) * stride);
+}
+
+#ifdef YT_RSEQ_PERCPU_FAST
+
+//! *reinterpret_cast<i64*>(slot) += value, committed by an rseq critical section
+//! validated against |cpuId|.
+/*!
+ * Returns true on commit, false if the kernel aborted the sequence (caller retries).
+ */
+Y_FORCE_INLINE bool RseqCommitAdd8(void* slot, i64 value, int cpuId)
+{
+ // The kernel-managed struct rseq: cpu_id_start@0, cpu_id@4, rseq_cs@8. CpuIdFieldOffset
+ // is TP -> cpu_id, so the area starts 4 bytes earlier.
+ char* area = static_cast<char*>(__builtin_thread_pointer()) + CpuIdFieldOffset - 4;
+ __asm__ __volatile__ goto(
+ ".pushsection __rseq_cs, \"aw\"\n\t"
+ ".balign 32\n\t"
+ "1:\n\t"
+ ".long 0, 0\n\t" // version, flags
+ ".quad 2f, (3f - 2f), 4f\n\t" // start_ip, post_commit_offset, abort_ip
+ ".popsection\n\t"
+ "leaq 1b(%%rip), %%rax\n\t"
+ "movq %%rax, 8(%[area])\n\t" // area->rseq_cs = &descriptor
+ "2:\n\t" // start_ip
+ "cmpl %[cpuId], 4(%[area])\n\t" // if (area->cpu_id != cpuId) abort
+ "jnz 4f\n\t"
+ "addq %[value], (%[slot])\n\t" // commit: *slot += value (non-atomic)
+ "3:\n\t" // post_commit_ip
+ ".pushsection __rseq_failure, \"ax\"\n\t"
+ ".byte 0x0f, 0xb9, 0x3d\n\t" // ud1: makes the signature a valid instruction
+ ".long 0x53053053\n\t" // rseq signature (precedes abort_ip)
+ "4:\n\t" // abort_ip
+ "jmp %l[abort]\n\t"
+ ".popsection\n\t"
+ :
+ : [area] "r" (area), [slot] "r" (slot), [value] "r" (value), [cpuId] "r" (cpuId)
+ : "rax", "memory"
+ : abort);
+ return true;
+abort:
+ return false;
+}
+
+//! *reinterpret_cast<i64*>(slot) = value (non-atomic movq), committed by an rseq critical
+//! section validated against |cpuId|.
+/*!
+ * Returns true on commit, false if the kernel aborted the sequence (caller retries).
+ */
+Y_FORCE_INLINE bool RseqCommitStore8(void* slot, i64 value, int cpuId)
+{
+ char* area = static_cast<char*>(__builtin_thread_pointer()) + CpuIdFieldOffset - 4;
+ __asm__ __volatile__ goto(
+ ".pushsection __rseq_cs, \"aw\"\n\t"
+ ".balign 32\n\t"
+ "1:\n\t"
+ ".long 0, 0\n\t"
+ ".quad 2f, (3f - 2f), 4f\n\t"
+ ".popsection\n\t"
+ "leaq 1b(%%rip), %%rax\n\t"
+ "movq %%rax, 8(%[area])\n\t"
+ "2:\n\t"
+ "cmpl %[cpuId], 4(%[area])\n\t"
+ "jnz 4f\n\t"
+ "movq %[value], (%[slot])\n\t" // commit: 8-byte store (non-atomic)
+ "3:\n\t"
+ ".pushsection __rseq_failure, \"ax\"\n\t"
+ ".byte 0x0f, 0xb9, 0x3d\n\t"
+ ".long 0x53053053\n\t"
+ "4:\n\t"
+ "jmp %l[abort]\n\t"
+ ".popsection\n\t"
+ :
+ : [area] "r" (area), [slot] "r" (slot), [value] "r" (value), [cpuId] "r" (cpuId)
+ : "rax", "memory"
+ : abort);
+ return true;
+abort:
+ return false;
+}
+
+//! *reinterpret_cast<__m128i*>(slot) = value (single non-atomic movdqu), committed by an rseq
+//! critical section validated against |cpuId|.
+/*!
+ * Returns true on commit, false if the kernel aborted the sequence (caller retries). A single
+ * 16-byte instruction means an abort never leaves the slot half-written; a reader on another
+ * CPU may still observe the store torn mid-flight -- acceptable for a last-writer-wins gauge.
+ * This helper is x86-only (compiled under YT_RSEQ_PERCPU_FAST), so __m128i in the signature
+ * costs nothing off x86 and keeps the value in an xmm register for the movdqu.
+ */
+Y_FORCE_INLINE bool RseqCommitStore16(void* slot, __m128i value, int cpuId)
+{
+ char* area = static_cast<char*>(__builtin_thread_pointer()) + CpuIdFieldOffset - 4;
+ __asm__ __volatile__ goto(
+ ".pushsection __rseq_cs, \"aw\"\n\t"
+ ".balign 32\n\t"
+ "1:\n\t"
+ ".long 0, 0\n\t"
+ ".quad 2f, (3f - 2f), 4f\n\t"
+ ".popsection\n\t"
+ "leaq 1b(%%rip), %%rax\n\t"
+ "movq %%rax, 8(%[area])\n\t"
+ "2:\n\t"
+ "cmpl %[cpuId], 4(%[area])\n\t"
+ "jnz 4f\n\t"
+ "movdqu %[value], (%[slot])\n\t" // commit: 16-byte store (non-atomic)
+ "3:\n\t"
+ ".pushsection __rseq_failure, \"ax\"\n\t"
+ ".byte 0x0f, 0xb9, 0x3d\n\t"
+ ".long 0x53053053\n\t"
+ "4:\n\t"
+ "jmp %l[abort]\n\t"
+ ".popsection\n\t"
+ :
+ : [area] "r" (area), [slot] "r" (slot), [value] "x" (value), [cpuId] "r" (cpuId)
+ : "rax", "memory"
+ : abort);
+ return true;
+abort:
+ return false;
+}
+
+//! Runs |commit(slot, cpuId)| for the calling CPU under rseq, retrying on abort.
+/*!
+ * |commit| runs one rseq critical section (see the RseqCommit* helpers above) and returns
+ * true on commit, false if the kernel aborted it. Returns false when the fast path is
+ * unavailable -- the rseq cpu_id is not within [0, CpuCount) -- in which case nothing is
+ * written and the caller must use the fallback. The cpu_id is read unsigned, so the single
+ * |cpuId >= CpuCount| test also rejects an unregistered thread (whose cpu_id sentinel reads
+ * as ~0u) and a cpu_id beyond the slot array (reachable only when #GetCpuCount could not
+ * read an exact bound).
+ *
+ * Must be reached only after #GetCpuCount has run (see NDetail::CpuCount); callers satisfy
+ * this by sizing the slot array with #GetCpuCount. CpuCount defaults to 0, so every update
+ * falls back until the size is known.
+ */
+template <class TCommit>
+Y_FORCE_INLINE bool RunRseqPerCpu(void* base, size_t stride, TCommit commit)
+{
+ ui32 cpuId = ReadField<ui32>(CpuIdFieldOffset);
+ ui32 cpuCount = CpuCount;
+ if (cpuId >= cpuCount) [[unlikely]] {
+ // Fresh thread not yet rseq-registered (e.g. a build without tcmalloc): register once
+ // and re-read. If it stays out of range, fall back.
+ EnsureCurrentThreadRegistered();
+ cpuId = ReadField<ui32>(CpuIdFieldOffset);
+ if (cpuId >= cpuCount) [[unlikely]] {
+ return false;
+ }
+ }
+ for (;;) {
+ void* slot = GetSlot(base, stride, static_cast<int>(cpuId));
+ if (commit(slot, cpuId)) [[likely]] {
+ return true;
+ }
+ // Aborted (migration/preemption): re-read the CPU and re-validate before reusing it,
+ // since after a migration it may name an out-of-range CPU.
+ cpuId = ReadField<ui32>(CpuIdFieldOffset);
+ if (cpuId >= cpuCount) [[unlikely]] {
+ return false;
+ }
+ }
+}
+
+#endif // YT_RSEQ_PERCPU_FAST
+
+//! Relaxed atomic load of |slot| (the read side of #AddPerCpu / #StorePerCpu).
+template <class T>
+Y_FORCE_INLINE T AtomicLoad(const T* slot)
+{
+ return __atomic_load_n(slot, __ATOMIC_RELAXED);
+}
+
+template <class T>
+Y_FORCE_INLINE void AtomicAddPerCpu(T* base, size_t stride, T value)
+{
+ auto* slot = GetSlot(base, stride, GetFallbackCpuId());
+ __atomic_fetch_add(slot, value, __ATOMIC_RELAXED);
+}
+
+//! Stores |value| into the calling CPU's slot with relaxed atomic stores: one 8-byte store,
+//! or two for a 16-byte value (the CPU is resolved once). Each 8-byte store is single-copy
+//! atomic, but the two halves of a 16-byte value may be observed split -- a torn value
+//! matching the fast path, which the last-writer-wins gauge tolerates. |T| is bit-cast to
+//! ui64 halves, so any 8- or 16-byte trivially-copyable type (incl. __m128i) works on any
+//! arch.
+template <class T>
+ requires (sizeof(T) == 8 || sizeof(T) == 16)
+Y_FORCE_INLINE void AtomicStorePerCpu(T* base, size_t stride, T value)
+{
+ auto* slot = reinterpret_cast<ui64*>(GetSlot(base, stride, GetFallbackCpuId()));
+ if constexpr (sizeof(T) == 8) {
+ __atomic_store_n(slot, __builtin_bit_cast(ui64, value), __ATOMIC_RELAXED);
+ } else {
+ auto parts = __builtin_bit_cast(std::array<ui64, 2>, value);
+ __atomic_store_n(slot, parts[0], __ATOMIC_RELAXED);
+ __atomic_store_n(slot + 1, parts[1], __ATOMIC_RELAXED);
+ }
+}
+
+// base + stride implementations behind the public pointer-to-member API below.
+
+template <class T>
+ requires std::integral<T> && (sizeof(T) == 8)
+Y_FORCE_INLINE void AddPerCpuImpl(T* base, size_t stride, T value)
+{
+#ifdef YT_RSEQ_PERCPU_FAST
+ i64 delta = static_cast<i64>(value);
+ if (RunRseqPerCpu(base, stride, [&] (void* slot, int cpuId) {
+ return RseqCommitAdd8(slot, delta, cpuId);
+ })) [[likely]]
+ {
+ return;
+ }
+#endif
+ AtomicAddPerCpu(base, stride, value);
+}
+
+template <class T>
+ requires (sizeof(T) == 8 || sizeof(T) == 16) && std::is_trivially_copyable_v<T>
+Y_FORCE_INLINE void StorePerCpuImpl(T* base, size_t stride, T value)
+{
+#ifdef YT_RSEQ_PERCPU_FAST
+ if constexpr (sizeof(T) == 16) {
+ auto packed = __builtin_bit_cast(__m128i, value);
+ if (RunRseqPerCpu(base, stride, [&] (void* slot, int cpuId) {
+ return RseqCommitStore16(slot, packed, cpuId);
+ })) [[likely]]
+ {
+ return;
+ }
+ } else {
+ auto packed = __builtin_bit_cast(i64, value);
+ if (RunRseqPerCpu(base, stride, [&] (void* slot, int cpuId) {
+ return RseqCommitStore8(slot, packed, cpuId);
+ })) [[likely]]
+ {
+ return;
+ }
+ }
+#endif
+ AtomicStorePerCpu(base, stride, value);
+}
+
+template <class T>
+ requires std::integral<T> && (sizeof(T) == 8)
+Y_FORCE_INLINE T LoadPerCpuImpl(const T* base, size_t stride, int index)
+{
+ return AtomicLoad(GetSlot(base, stride, index));
+}
+
+} // namespace NDetail
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TShard, class TValue>
+ requires std::integral<TValue> && (sizeof(TValue) == 8)
+Y_FORCE_INLINE void AddPerCpu(TShard* shards, TValue TShard::* field, TValue delta)
+{
+ static_assert(sizeof(TShard) % 8 == 0, "Shard size must be a multiple of 8");
+ NDetail::AddPerCpuImpl(&(shards[0].*field), sizeof(TShard), delta);
+}
+
+template <class TShard, class TValue>
+ requires (sizeof(TValue) == 8 || sizeof(TValue) == 16) && std::is_trivially_copyable_v<TValue>
+Y_FORCE_INLINE void StorePerCpu(TShard* shards, TValue TShard::* field, TValue value)
+{
+ static_assert(sizeof(TShard) % 8 == 0, "Shard size must be a multiple of 8");
+ NDetail::StorePerCpuImpl(&(shards[0].*field), sizeof(TShard), value);
+}
+
+template <class TShard, class TValue>
+ requires std::integral<TValue> && (sizeof(TValue) == 8)
+Y_FORCE_INLINE TValue LoadPerCpu(const TShard* shards, TValue TShard::* field, int index)
+{
+ static_assert(sizeof(TShard) % 8 == 0, "Shard size must be a multiple of 8");
+ return NDetail::LoadPerCpuImpl(&(shards[0].*field), sizeof(TShard), index);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRseq
diff --git a/library/cpp/yt/rseq/per_cpu.cpp b/library/cpp/yt/rseq/per_cpu.cpp
new file mode 100644
index 00000000000..90f1fa87fb3
--- /dev/null
+++ b/library/cpp/yt/rseq/per_cpu.cpp
@@ -0,0 +1,112 @@
+#include "per_cpu.h"
+
+#if defined(__linux__)
+#include <sched.h>
+#include <unistd.h>
+
+#include <cstdio>
+#endif
+
+#include <mutex>
+
+namespace NYT::NRseq {
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NDetail {
+
+int ParsePossibleCpuCount(std::string_view list)
+{
+ // The list enumerates CPU id ranges (e.g. "0-3,8-11"); the highest id + 1 is nr_cpu_ids,
+ // the exclusive upper bound for the rseq cpu_id. This differs from _SC_NPROCESSORS_CONF
+ // (a popcount) on sparse topologies: "0-3,8-11" yields 12 here but a count of 8.
+ int maxId = -1;
+ for (size_t index = 0; index < list.size();) {
+ if (list[index] < '0' || list[index] > '9') {
+ ++index;
+ continue;
+ }
+ int value = 0;
+ while (index < list.size() && list[index] >= '0' && list[index] <= '9') {
+ value = value * 10 + (list[index] - '0');
+ ++index;
+ }
+ if (value > maxId) {
+ maxId = value;
+ }
+ }
+ return maxId >= 0 ? maxId + 1 : -1;
+}
+
+#if defined(__linux__)
+
+//! Reads /sys/devices/system/cpu/possible and returns nr_cpu_ids, or -1 if it cannot be read.
+static int TryReadPossibleCpuCount()
+{
+ auto* file = std::fopen("/sys/devices/system/cpu/possible", "re");
+ if (!file) {
+ return -1;
+ }
+ char buffer[256] = {};
+ size_t size = std::fread(buffer, 1, sizeof(buffer) - 1, file);
+ std::fclose(file);
+ return ParsePossibleCpuCount(std::string_view(buffer, size));
+}
+
+#endif
+
+// Published by GetCpuCount(); see the declaration in per_cpu-inl.h. Defaults to 0 so the
+// fast path's bounds check sends every update to the safe atomic fallback until the size is
+// known.
+constinit int CpuCount = 0;
+
+int GetFallbackCpuId()
+{
+#if defined(__linux__)
+ int cpuId = ::sched_getcpu();
+ if (cpuId < 0) {
+ return 0;
+ }
+ int cpuCount = GetCpuCount();
+ // Defensive: keep the index in range even if a CPU came online beyond the configured
+ // count. On the fallback path slots are touched atomically, so a shared slot is safe.
+ return cpuId < cpuCount ? cpuId : cpuId % cpuCount;
+#else
+ return 0;
+#endif
+}
+
+} // namespace NDetail
+
+////////////////////////////////////////////////////////////////////////////////
+
+int GetCpuCount()
+{
+ static std::once_flag OnceFlag;
+ std::call_once(OnceFlag, [] {
+ int cpuCount = 1;
+#if defined(__linux__)
+ // The fast path indexes the slot array by the raw rseq cpu_id, so size to the highest
+ // CPU id the kernel can report plus one (nr_cpu_ids), not merely the number of CPUs;
+ // the possible-CPU bitmap gives this exact bound, and then every cpu_id is in range.
+ if (int possible = NDetail::TryReadPossibleCpuCount(); possible > 0) {
+ cpuCount = possible;
+ } else {
+ // Bitmap unavailable (e.g. /sys masked in a container): _SC_NPROCESSORS_CONF is a
+ // count, not a cpu_id bound, so on a sparse topology it may be smaller than some
+ // cpu_id. The fast path's bounds check then routes those CPUs to the clamped
+ // atomic fallback -- still memory-safe, though a clamped slot may mix atomic and
+ // non-atomic writes (at worst a lost counter update on such exotic setups).
+ int configured = static_cast<int>(::sysconf(_SC_NPROCESSORS_CONF));
+ cpuCount = configured > 0 ? configured : 1;
+ }
+#endif
+ // Publish for the fast-path bounds check before any update can index the array.
+ NDetail::CpuCount = cpuCount;
+ });
+ return NDetail::CpuCount;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRseq
diff --git a/library/cpp/yt/rseq/per_cpu.h b/library/cpp/yt/rseq/per_cpu.h
new file mode 100644
index 00000000000..66fe8d02552
--- /dev/null
+++ b/library/cpp/yt/rseq/per_cpu.h
@@ -0,0 +1,76 @@
+#pragma once
+
+#include <util/system/types.h>
+
+#include <concepts>
+#include <cstddef>
+#include <type_traits>
+
+namespace NYT::NRseq {
+
+// The library is Linux-only: rseq is a Linux kernel feature and the build enforces OS_LINUX
+// (see ya.make); off the rseq fast path the primitives fall back to plain atomics.
+
+////////////////////////////////////////////////////////////////////////////////
+
+//! Number of shards a per-CPU array must provide -- one per CPU.
+/*!
+ * Equals nr_cpu_ids (highest possible CPU id + 1), from /sys/devices/system/cpu/possible so
+ * it covers offlined and hot-pluggable CPUs. The fast path indexes by the raw rseq cpu_id, so
+ * a plain count (e.g. _SC_NPROCESSORS_CONF) would undersize the array on sparse topologies --
+ * it is only the fallback when the bitmap is unreadable. Always >= 1; cached.
+ */
+int GetCpuCount();
+
+//! Adds |delta| to the calling CPU's slot of a per-CPU array of shards, lock-free.
+/*!
+ * |shards| is an array of GetCpuCount() |TShard| slots (typically cache-line padded); |field|
+ * selects the |TValue| to update. The stride is sizeof(TShard), which must be a multiple of 8
+ * (checked at compile time) so the field stays 8-byte aligned (for a tear-free RMW).
+ *
+ * Fast path (x86-64 Linux): a non-atomic read-modify-write committed by an rseq critical
+ * section -- no atomic, no lock; safe against preemption/migration (the kernel restarts it)
+ * and other threads (one thread per CPU). Otherwise (non-x86-64 Linux, or no kernel rseq): an
+ * atomic fetch_add. A process uses one path consistently, so the two never mix on a slot
+ * (except on exotic sparse topologies; see per_cpu.cpp).
+ *
+ * WARNING (fiber TLS): the fast path reads the thread pointer, so reach #AddPerCpu only via a
+ * non-inlinable, fiber-switch-free frame (a virtual call or YT_PREVENT_TLS_CACHING; see
+ * library/cpp/yt/misc/tls.h).
+ */
+template <class TShard, class TValue>
+ requires std::integral<TValue> && (sizeof(TValue) == 8)
+void AddPerCpu(TShard* shards, TValue TShard::* field, TValue delta);
+
+//! Stores |value| (8 or 16 bytes) into the calling CPU's slot, lock-free.
+/*!
+ * |shards| / |field| as in #AddPerCpu; |TValue| is an 8- or 16-byte trivially-copyable type.
+ *
+ * Fast path (x86-64 Linux): an rseq-committed store (movq for 8 bytes, movdqu for 16);
+ * otherwise relaxed atomic store(s). An 8-byte store is single-copy atomic, never torn; a
+ * 16-byte store is not atomic on either path, so a reader on another CPU may see the halves
+ * split -- fine for a last-writer-wins gauge.
+ *
+ * WARNING (fiber TLS): same contract as #AddPerCpu.
+ */
+template <class TShard, class TValue>
+ requires (sizeof(TValue) == 8 || sizeof(TValue) == 16) && std::is_trivially_copyable_v<TValue>
+void StorePerCpu(TShard* shards, TValue TShard::* field, TValue value);
+
+//! Relaxed atomic load of slot |index| -- the reader counterpart of #AddPerCpu.
+/*!
+ * |shards| / |field| as in #AddPerCpu; reads shards[index].*field for |index| in
+ * [0, GetCpuCount()). Not tied to the calling CPU (no rseq fast path); aggregate a counter by
+ * loading every slot and summing.
+ */
+template <class TShard, class TValue>
+ requires std::integral<TValue> && (sizeof(TValue) == 8)
+TValue LoadPerCpu(const TShard* shards, TValue TShard::* field, int index);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NRseq
+
+#define PER_CPU_INL_H_
+#include "per_cpu-inl.h"
+#undef PER_CPU_INL_H_
diff --git a/library/cpp/yt/rseq/rseq.cpp b/library/cpp/yt/rseq/rseq.cpp
index 9ebcb4f45ef..c2098d16d11 100644
--- a/library/cpp/yt/rseq/rseq.cpp
+++ b/library/cpp/yt/rseq/rseq.cpp
@@ -77,9 +77,10 @@ bool OwnsRegistration = false;
bool RegisterCurrentThread()
{
// flags = 0. We pass the shared signature and the standard size so that whoever of
- // {us, tcmalloc, librseq} runs first registers __rseq_abi and the rest get EBUSY,
- // which is success for our read-only use (we never run rseq critical sections, so the
- // signature only ever matters for matching this registration call).
+ // {us, tcmalloc, librseq} runs first registers __rseq_abi and the rest get EBUSY
+ // (success). The signature must be 0x53053053: it is the value emitted before abort_ip
+ // in the per_cpu rseq critical sections (see per_cpu-inl.h), and on a kernel abort the
+ // signature must match the registered one or the kernel delivers SIGSEGV.
if (::syscall(RseqSyscallNumber, &__rseq_abi, RseqRegistrationSize, 0u, RseqSignature) == 0) {
return true;
}
diff --git a/library/cpp/yt/rseq/unittests/per_cpu_ut.cpp b/library/cpp/yt/rseq/unittests/per_cpu_ut.cpp
new file mode 100644
index 00000000000..71e0adef604
--- /dev/null
+++ b/library/cpp/yt/rseq/unittests/per_cpu_ut.cpp
@@ -0,0 +1,229 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yt/rseq/per_cpu.h>
+
+#include <library/cpp/yt/memory/public.h>
+
+#include <util/system/types.h>
+
+#include <atomic>
+#include <iterator>
+#include <thread>
+#include <vector>
+
+namespace NYT::NRseq {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+// A per-CPU i64 accumulator built on the rseq primitives, mirroring how a profiling
+// counter would use them.
+class TPerCpuI64
+{
+public:
+ TPerCpuI64()
+ : Slots_(GetCpuCount())
+ { }
+
+ void Add(i64 value)
+ {
+ AddPerCpu(Slots_.data(), &TSlot::Value, value);
+ }
+
+ i64 GetValue() const
+ {
+ i64 total = 0;
+ for (int index = 0; index < std::ssize(Slots_); ++index) {
+ total += LoadPerCpu(Slots_.data(), &TSlot::Value, index);
+ }
+ return total;
+ }
+
+private:
+ struct alignas(CacheLineSize) TSlot
+ {
+ i64 Value = 0;
+ };
+
+ static_assert(sizeof(TSlot) == CacheLineSize);
+
+ std::vector<TSlot> Slots_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TEST(TPerCpuRseqTest, CpuCountIsSane)
+{
+ EXPECT_GE(GetCpuCount(), 1);
+ EXPECT_LE(GetCpuCount(), 1 << 20);
+}
+
+TEST(TPerCpuRseqTest, ParsePossibleCpuCount)
+{
+ using NDetail::ParsePossibleCpuCount;
+ EXPECT_EQ(ParsePossibleCpuCount("0"), 1);
+ EXPECT_EQ(ParsePossibleCpuCount("0-3"), 4);
+ EXPECT_EQ(ParsePossibleCpuCount("0-63"), 64);
+ // Sparse mask: the bound is the highest id + 1 (12), not the CPU popcount (8).
+ EXPECT_EQ(ParsePossibleCpuCount("0-3,8-11"), 12);
+ EXPECT_EQ(ParsePossibleCpuCount("0-3\n"), 4);
+ EXPECT_EQ(ParsePossibleCpuCount(""), -1);
+ EXPECT_EQ(ParsePossibleCpuCount("\n"), -1);
+}
+
+TEST(TPerCpuRseqTest, SingleThreadAccumulates)
+{
+ TPerCpuI64 counter;
+ constexpr i64 Iterations = 1'000'000;
+ for (i64 i = 0; i < Iterations; ++i) {
+ counter.Add(1);
+ }
+ EXPECT_EQ(counter.GetValue(), Iterations);
+}
+
+TEST(TPerCpuRseqTest, SingleThreadHandlesNegativeAndLargeDeltas)
+{
+ TPerCpuI64 counter;
+ counter.Add(1'000'000'000'000LL);
+ counter.Add(-7);
+ counter.Add(-1'000'000'000'000LL);
+ EXPECT_EQ(counter.GetValue(), -7);
+}
+
+// The core correctness guarantee: across many threads (which the scheduler migrates
+// between CPUs, exercising rseq aborts/restarts), not a single increment is lost.
+TEST(TPerCpuRseqTest, ConcurrentNoLostUpdates)
+{
+ TPerCpuI64 counter;
+
+ const int threadCount = std::max<int>(4, std::thread::hardware_concurrency());
+ constexpr i64 PerThread = 2'000'000;
+
+ std::atomic<bool> start{false};
+ std::vector<std::thread> threads;
+ for (int t = 0; t < threadCount; ++t) {
+ threads.emplace_back([&] {
+ while (!start.load(std::memory_order::acquire)) {
+ }
+ for (i64 i = 0; i < PerThread; ++i) {
+ counter.Add(1);
+ }
+ });
+ }
+ start.store(true, std::memory_order::release);
+ for (auto& thread : threads) {
+ thread.join();
+ }
+
+ EXPECT_EQ(counter.GetValue(), static_cast<i64>(threadCount) * PerThread);
+}
+
+// Independent counters updated concurrently must not interfere with each other.
+TEST(TPerCpuRseqTest, IndependentCountersDoNotInterfere)
+{
+ TPerCpuI64 a;
+ TPerCpuI64 b;
+
+ const int threadCount = std::max<int>(4, std::thread::hardware_concurrency());
+ constexpr i64 PerThread = 1'000'000;
+
+ std::vector<std::thread> threads;
+ for (int t = 0; t < threadCount; ++t) {
+ threads.emplace_back([&, t] {
+ for (i64 i = 0; i < PerThread; ++i) {
+ a.Add(1);
+ if (t % 2 == 0) {
+ b.Add(2);
+ }
+ }
+ });
+ }
+ for (auto& thread : threads) {
+ thread.join();
+ }
+
+ EXPECT_EQ(a.GetValue(), static_cast<i64>(threadCount) * PerThread);
+ EXPECT_EQ(b.GetValue(), static_cast<i64>((threadCount + 1) / 2) * PerThread * 2);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TPair
+{
+ ui64 A;
+ ui64 B;
+};
+
+struct alignas(CacheLineSize) TPairSlot
+{
+ TPair Value{};
+};
+
+static_assert(sizeof(TPairSlot) == CacheLineSize);
+
+TEST(TPerCpuRseqTest, StorePerCpuPublishesValue)
+{
+ std::vector<TPairSlot> slots(GetCpuCount());
+ constexpr ui64 Last = 100'000;
+ for (ui64 i = 1; i <= Last; ++i) {
+ StorePerCpu(slots.data(), &TPairSlot::Value, TPair{i, i});
+ }
+
+ bool foundLast = false;
+ for (const auto& slot : slots) {
+ // No store ever writes mismatched halves, so any populated slot must be consistent.
+ EXPECT_EQ(slot.Value.A, slot.Value.B);
+ if (slot.Value.A == Last) {
+ foundLast = true;
+ }
+ }
+ EXPECT_TRUE(foundLast);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct alignas(CacheLineSize) TWordSlot
+{
+ ui64 Value = 0;
+};
+
+static_assert(sizeof(TWordSlot) == CacheLineSize);
+
+TEST(TPerCpuRseqTest, StorePerCpu8PublishesValue)
+{
+ std::vector<TWordSlot> slots(GetCpuCount());
+ constexpr ui64 Last = 100'000;
+ for (ui64 i = 1; i <= Last; ++i) {
+ StorePerCpu(slots.data(), &TWordSlot::Value, i);
+ }
+
+ bool foundLast = false;
+ for (const auto& slot : slots) {
+ if (slot.Value == Last) {
+ foundLast = true;
+ }
+ }
+ EXPECT_TRUE(foundLast);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// LoadPerCpu must read exactly the requested slot (verifies the base + index * stride
+// addressing), independent of the calling CPU.
+TEST(TPerCpuRseqTest, LoadPerCpuReadsRequestedSlot)
+{
+ std::vector<TWordSlot> slots(GetCpuCount());
+ for (int index = 0; index < std::ssize(slots); ++index) {
+ slots[index].Value = static_cast<ui64>(index) * 100 + 1;
+ }
+ for (int index = 0; index < std::ssize(slots); ++index) {
+ EXPECT_EQ(
+ LoadPerCpu(slots.data(), &TWordSlot::Value, index),
+ static_cast<ui64>(index) * 100 + 1);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+} // namespace NYT::NRseq
diff --git a/library/cpp/yt/rseq/unittests/ya.make b/library/cpp/yt/rseq/unittests/ya.make
new file mode 100644
index 00000000000..ed82f7781e6
--- /dev/null
+++ b/library/cpp/yt/rseq/unittests/ya.make
@@ -0,0 +1,14 @@
+GTEST()
+
+INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc)
+
+SRCS(
+ per_cpu_ut.cpp
+)
+
+PEERDIR(
+ library/cpp/yt/memory
+ library/cpp/yt/rseq
+)
+
+END()
diff --git a/library/cpp/yt/rseq/ya.make b/library/cpp/yt/rseq/ya.make
index f0f55e17f49..dbe6491bf89 100644
--- a/library/cpp/yt/rseq/ya.make
+++ b/library/cpp/yt/rseq/ya.make
@@ -7,11 +7,17 @@ IF (NOT OS_LINUX)
ENDIF()
SRCS(
+ per_cpu.cpp
rseq.cpp
)
PEERDIR(
+ library/cpp/yt/assert
library/cpp/yt/misc
)
END()
+
+RECURSE_FOR_TESTS(
+ unittests
+)