diff options
| author | babenko <[email protected]> | 2026-06-19 14:27:43 +0300 |
|---|---|---|
| committer | babenko <[email protected]> | 2026-06-19 15:12:00 +0300 |
| commit | 89c0e29c8f9ba29ecdc736fefda87286482ac213 (patch) | |
| tree | 1adfbcd839240d8d0155771c6b775fa00a3e5f32 /library/cpp | |
| parent | 824b32f6aab5c67b2d39288b1d229eb257f248f0 (diff) | |
Add lock-free per-CPU primitives to library/cpp/yt/rseq
Introduce AddPerCpu and StorePerCpu over an rseq-sharded per-CPU array.
On the x86-64 Linux fast path the update is committed by a hand-rolled
rseq critical section (non-atomic, migration-safe): addq for the 8-byte
accumulate, movq / movdqu for the 8- or 16-byte store. The kernel
restarts the sequence on preemption or migration, and only one thread
runs on a CPU at a time, so no atomic or lock is needed. Off the fast
path (other arches, no kernel rseq) the operation falls back to an
atomic on the slot indexed by sched_getcpu().
A naturally-aligned 8-byte store is single-copy atomic on x86-64, so it
is never observed torn; the 16-byte store may be, which is acceptable for
a last-writer-wins gauge.
commit_hash:6250f6e9e35cf3895ebafe0b534ec12cca50b03b
Diffstat (limited to 'library/cpp')
| -rw-r--r-- | library/cpp/yt/rseq/per_cpu-inl.h | 340 | ||||
| -rw-r--r-- | library/cpp/yt/rseq/per_cpu.cpp | 112 | ||||
| -rw-r--r-- | library/cpp/yt/rseq/per_cpu.h | 76 | ||||
| -rw-r--r-- | library/cpp/yt/rseq/rseq.cpp | 7 | ||||
| -rw-r--r-- | library/cpp/yt/rseq/unittests/per_cpu_ut.cpp | 229 | ||||
| -rw-r--r-- | library/cpp/yt/rseq/unittests/ya.make | 14 | ||||
| -rw-r--r-- | library/cpp/yt/rseq/ya.make | 6 |
7 files changed, 781 insertions, 3 deletions
diff --git a/library/cpp/yt/rseq/per_cpu-inl.h b/library/cpp/yt/rseq/per_cpu-inl.h new file mode 100644 index 00000000000..42d8dd299bc --- /dev/null +++ b/library/cpp/yt/rseq/per_cpu-inl.h @@ -0,0 +1,340 @@ +#ifndef PER_CPU_INL_H_ +#error "Direct inclusion of this file is not allowed, include per_cpu.h" +// For the sake of sane code completion. +#include "per_cpu.h" +#endif + +#include <library/cpp/yt/assert/assert.h> + +#include <util/system/compiler.h> + +#if defined(__x86_64__) +#include <emmintrin.h> // __m128i -- used only by the x86 rseq fast-path 16-byte store +#endif + +#include <array> +#include <string_view> + +// The full rseq fast path (a per-CPU non-atomic add committed by an rseq critical +// section) is implemented for x86-64 Linux only. Everywhere else AddPerCpu uses the +// atomic fallback. +#if defined(__x86_64__) + #include "rseq.h" + #define YT_RSEQ_PERCPU_FAST +#endif + +namespace NYT::NRseq { + +//////////////////////////////////////////////////////////////////////////////// + +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +//! Returns a CPU id in [0, GetCpuCount()) for the atomic fallback path. +int GetFallbackCpuId(); + +//! Parses a kernel CPU range list (e.g. "0-3,8-11") and returns the highest CPU id plus one, +//! or -1 if the list contains no id. Exposed for testing; #GetCpuCount feeds it the +//! /sys/devices/system/cpu/possible bitmap. +int ParsePossibleCpuCount(std::string_view list); + +//! Cached #GetCpuCount value used by the fast path's bounds check (see per_cpu.cpp). +/*! + * The fast path indexes the slot array by the raw rseq cpu_id and bounds-checks it against + * this value with a single unsigned compare (which also rejects a negative, unregistered + * cpu_id); an out-of-range cpu_id takes the clamped atomic fallback instead. Set by the + * first call to #GetCpuCount; defaults to 0 so every update falls back until the size is + * known -- callers must therefore size the array via #GetCpuCount before any update. + */ +extern constinit int CpuCount; + +//! Returns a pointer to slot |index| of a per-CPU array -- |base| advanced by +//! |index * stride| bytes. The size_t cast keeps the byte offset from overflowing. +template <class T> +Y_FORCE_INLINE T* GetSlot(T* base, size_t stride, int index) +{ + using TByte = std::conditional_t<std::is_const_v<T>, const char, char>; + return reinterpret_cast<T*>( + reinterpret_cast<TByte*>(base) + static_cast<size_t>(index) * stride); +} + +#ifdef YT_RSEQ_PERCPU_FAST + +//! *reinterpret_cast<i64*>(slot) += value, committed by an rseq critical section +//! validated against |cpuId|. +/*! + * Returns true on commit, false if the kernel aborted the sequence (caller retries). + */ +Y_FORCE_INLINE bool RseqCommitAdd8(void* slot, i64 value, int cpuId) +{ + // The kernel-managed struct rseq: cpu_id_start@0, cpu_id@4, rseq_cs@8. CpuIdFieldOffset + // is TP -> cpu_id, so the area starts 4 bytes earlier. + char* area = static_cast<char*>(__builtin_thread_pointer()) + CpuIdFieldOffset - 4; + __asm__ __volatile__ goto( + ".pushsection __rseq_cs, \"aw\"\n\t" + ".balign 32\n\t" + "1:\n\t" + ".long 0, 0\n\t" // version, flags + ".quad 2f, (3f - 2f), 4f\n\t" // start_ip, post_commit_offset, abort_ip + ".popsection\n\t" + "leaq 1b(%%rip), %%rax\n\t" + "movq %%rax, 8(%[area])\n\t" // area->rseq_cs = &descriptor + "2:\n\t" // start_ip + "cmpl %[cpuId], 4(%[area])\n\t" // if (area->cpu_id != cpuId) abort + "jnz 4f\n\t" + "addq %[value], (%[slot])\n\t" // commit: *slot += value (non-atomic) + "3:\n\t" // post_commit_ip + ".pushsection __rseq_failure, \"ax\"\n\t" + ".byte 0x0f, 0xb9, 0x3d\n\t" // ud1: makes the signature a valid instruction + ".long 0x53053053\n\t" // rseq signature (precedes abort_ip) + "4:\n\t" // abort_ip + "jmp %l[abort]\n\t" + ".popsection\n\t" + : + : [area] "r" (area), [slot] "r" (slot), [value] "r" (value), [cpuId] "r" (cpuId) + : "rax", "memory" + : abort); + return true; +abort: + return false; +} + +//! *reinterpret_cast<i64*>(slot) = value (non-atomic movq), committed by an rseq critical +//! section validated against |cpuId|. +/*! + * Returns true on commit, false if the kernel aborted the sequence (caller retries). + */ +Y_FORCE_INLINE bool RseqCommitStore8(void* slot, i64 value, int cpuId) +{ + char* area = static_cast<char*>(__builtin_thread_pointer()) + CpuIdFieldOffset - 4; + __asm__ __volatile__ goto( + ".pushsection __rseq_cs, \"aw\"\n\t" + ".balign 32\n\t" + "1:\n\t" + ".long 0, 0\n\t" + ".quad 2f, (3f - 2f), 4f\n\t" + ".popsection\n\t" + "leaq 1b(%%rip), %%rax\n\t" + "movq %%rax, 8(%[area])\n\t" + "2:\n\t" + "cmpl %[cpuId], 4(%[area])\n\t" + "jnz 4f\n\t" + "movq %[value], (%[slot])\n\t" // commit: 8-byte store (non-atomic) + "3:\n\t" + ".pushsection __rseq_failure, \"ax\"\n\t" + ".byte 0x0f, 0xb9, 0x3d\n\t" + ".long 0x53053053\n\t" + "4:\n\t" + "jmp %l[abort]\n\t" + ".popsection\n\t" + : + : [area] "r" (area), [slot] "r" (slot), [value] "r" (value), [cpuId] "r" (cpuId) + : "rax", "memory" + : abort); + return true; +abort: + return false; +} + +//! *reinterpret_cast<__m128i*>(slot) = value (single non-atomic movdqu), committed by an rseq +//! critical section validated against |cpuId|. +/*! + * Returns true on commit, false if the kernel aborted the sequence (caller retries). A single + * 16-byte instruction means an abort never leaves the slot half-written; a reader on another + * CPU may still observe the store torn mid-flight -- acceptable for a last-writer-wins gauge. + * This helper is x86-only (compiled under YT_RSEQ_PERCPU_FAST), so __m128i in the signature + * costs nothing off x86 and keeps the value in an xmm register for the movdqu. + */ +Y_FORCE_INLINE bool RseqCommitStore16(void* slot, __m128i value, int cpuId) +{ + char* area = static_cast<char*>(__builtin_thread_pointer()) + CpuIdFieldOffset - 4; + __asm__ __volatile__ goto( + ".pushsection __rseq_cs, \"aw\"\n\t" + ".balign 32\n\t" + "1:\n\t" + ".long 0, 0\n\t" + ".quad 2f, (3f - 2f), 4f\n\t" + ".popsection\n\t" + "leaq 1b(%%rip), %%rax\n\t" + "movq %%rax, 8(%[area])\n\t" + "2:\n\t" + "cmpl %[cpuId], 4(%[area])\n\t" + "jnz 4f\n\t" + "movdqu %[value], (%[slot])\n\t" // commit: 16-byte store (non-atomic) + "3:\n\t" + ".pushsection __rseq_failure, \"ax\"\n\t" + ".byte 0x0f, 0xb9, 0x3d\n\t" + ".long 0x53053053\n\t" + "4:\n\t" + "jmp %l[abort]\n\t" + ".popsection\n\t" + : + : [area] "r" (area), [slot] "r" (slot), [value] "x" (value), [cpuId] "r" (cpuId) + : "rax", "memory" + : abort); + return true; +abort: + return false; +} + +//! Runs |commit(slot, cpuId)| for the calling CPU under rseq, retrying on abort. +/*! + * |commit| runs one rseq critical section (see the RseqCommit* helpers above) and returns + * true on commit, false if the kernel aborted it. Returns false when the fast path is + * unavailable -- the rseq cpu_id is not within [0, CpuCount) -- in which case nothing is + * written and the caller must use the fallback. The cpu_id is read unsigned, so the single + * |cpuId >= CpuCount| test also rejects an unregistered thread (whose cpu_id sentinel reads + * as ~0u) and a cpu_id beyond the slot array (reachable only when #GetCpuCount could not + * read an exact bound). + * + * Must be reached only after #GetCpuCount has run (see NDetail::CpuCount); callers satisfy + * this by sizing the slot array with #GetCpuCount. CpuCount defaults to 0, so every update + * falls back until the size is known. + */ +template <class TCommit> +Y_FORCE_INLINE bool RunRseqPerCpu(void* base, size_t stride, TCommit commit) +{ + ui32 cpuId = ReadField<ui32>(CpuIdFieldOffset); + ui32 cpuCount = CpuCount; + if (cpuId >= cpuCount) [[unlikely]] { + // Fresh thread not yet rseq-registered (e.g. a build without tcmalloc): register once + // and re-read. If it stays out of range, fall back. + EnsureCurrentThreadRegistered(); + cpuId = ReadField<ui32>(CpuIdFieldOffset); + if (cpuId >= cpuCount) [[unlikely]] { + return false; + } + } + for (;;) { + void* slot = GetSlot(base, stride, static_cast<int>(cpuId)); + if (commit(slot, cpuId)) [[likely]] { + return true; + } + // Aborted (migration/preemption): re-read the CPU and re-validate before reusing it, + // since after a migration it may name an out-of-range CPU. + cpuId = ReadField<ui32>(CpuIdFieldOffset); + if (cpuId >= cpuCount) [[unlikely]] { + return false; + } + } +} + +#endif // YT_RSEQ_PERCPU_FAST + +//! Relaxed atomic load of |slot| (the read side of #AddPerCpu / #StorePerCpu). +template <class T> +Y_FORCE_INLINE T AtomicLoad(const T* slot) +{ + return __atomic_load_n(slot, __ATOMIC_RELAXED); +} + +template <class T> +Y_FORCE_INLINE void AtomicAddPerCpu(T* base, size_t stride, T value) +{ + auto* slot = GetSlot(base, stride, GetFallbackCpuId()); + __atomic_fetch_add(slot, value, __ATOMIC_RELAXED); +} + +//! Stores |value| into the calling CPU's slot with relaxed atomic stores: one 8-byte store, +//! or two for a 16-byte value (the CPU is resolved once). Each 8-byte store is single-copy +//! atomic, but the two halves of a 16-byte value may be observed split -- a torn value +//! matching the fast path, which the last-writer-wins gauge tolerates. |T| is bit-cast to +//! ui64 halves, so any 8- or 16-byte trivially-copyable type (incl. __m128i) works on any +//! arch. +template <class T> + requires (sizeof(T) == 8 || sizeof(T) == 16) +Y_FORCE_INLINE void AtomicStorePerCpu(T* base, size_t stride, T value) +{ + auto* slot = reinterpret_cast<ui64*>(GetSlot(base, stride, GetFallbackCpuId())); + if constexpr (sizeof(T) == 8) { + __atomic_store_n(slot, __builtin_bit_cast(ui64, value), __ATOMIC_RELAXED); + } else { + auto parts = __builtin_bit_cast(std::array<ui64, 2>, value); + __atomic_store_n(slot, parts[0], __ATOMIC_RELAXED); + __atomic_store_n(slot + 1, parts[1], __ATOMIC_RELAXED); + } +} + +// base + stride implementations behind the public pointer-to-member API below. + +template <class T> + requires std::integral<T> && (sizeof(T) == 8) +Y_FORCE_INLINE void AddPerCpuImpl(T* base, size_t stride, T value) +{ +#ifdef YT_RSEQ_PERCPU_FAST + i64 delta = static_cast<i64>(value); + if (RunRseqPerCpu(base, stride, [&] (void* slot, int cpuId) { + return RseqCommitAdd8(slot, delta, cpuId); + })) [[likely]] + { + return; + } +#endif + AtomicAddPerCpu(base, stride, value); +} + +template <class T> + requires (sizeof(T) == 8 || sizeof(T) == 16) && std::is_trivially_copyable_v<T> +Y_FORCE_INLINE void StorePerCpuImpl(T* base, size_t stride, T value) +{ +#ifdef YT_RSEQ_PERCPU_FAST + if constexpr (sizeof(T) == 16) { + auto packed = __builtin_bit_cast(__m128i, value); + if (RunRseqPerCpu(base, stride, [&] (void* slot, int cpuId) { + return RseqCommitStore16(slot, packed, cpuId); + })) [[likely]] + { + return; + } + } else { + auto packed = __builtin_bit_cast(i64, value); + if (RunRseqPerCpu(base, stride, [&] (void* slot, int cpuId) { + return RseqCommitStore8(slot, packed, cpuId); + })) [[likely]] + { + return; + } + } +#endif + AtomicStorePerCpu(base, stride, value); +} + +template <class T> + requires std::integral<T> && (sizeof(T) == 8) +Y_FORCE_INLINE T LoadPerCpuImpl(const T* base, size_t stride, int index) +{ + return AtomicLoad(GetSlot(base, stride, index)); +} + +} // namespace NDetail + +//////////////////////////////////////////////////////////////////////////////// + +template <class TShard, class TValue> + requires std::integral<TValue> && (sizeof(TValue) == 8) +Y_FORCE_INLINE void AddPerCpu(TShard* shards, TValue TShard::* field, TValue delta) +{ + static_assert(sizeof(TShard) % 8 == 0, "Shard size must be a multiple of 8"); + NDetail::AddPerCpuImpl(&(shards[0].*field), sizeof(TShard), delta); +} + +template <class TShard, class TValue> + requires (sizeof(TValue) == 8 || sizeof(TValue) == 16) && std::is_trivially_copyable_v<TValue> +Y_FORCE_INLINE void StorePerCpu(TShard* shards, TValue TShard::* field, TValue value) +{ + static_assert(sizeof(TShard) % 8 == 0, "Shard size must be a multiple of 8"); + NDetail::StorePerCpuImpl(&(shards[0].*field), sizeof(TShard), value); +} + +template <class TShard, class TValue> + requires std::integral<TValue> && (sizeof(TValue) == 8) +Y_FORCE_INLINE TValue LoadPerCpu(const TShard* shards, TValue TShard::* field, int index) +{ + static_assert(sizeof(TShard) % 8 == 0, "Shard size must be a multiple of 8"); + return NDetail::LoadPerCpuImpl(&(shards[0].*field), sizeof(TShard), index); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRseq diff --git a/library/cpp/yt/rseq/per_cpu.cpp b/library/cpp/yt/rseq/per_cpu.cpp new file mode 100644 index 00000000000..90f1fa87fb3 --- /dev/null +++ b/library/cpp/yt/rseq/per_cpu.cpp @@ -0,0 +1,112 @@ +#include "per_cpu.h" + +#if defined(__linux__) +#include <sched.h> +#include <unistd.h> + +#include <cstdio> +#endif + +#include <mutex> + +namespace NYT::NRseq { + +//////////////////////////////////////////////////////////////////////////////// + +namespace NDetail { + +int ParsePossibleCpuCount(std::string_view list) +{ + // The list enumerates CPU id ranges (e.g. "0-3,8-11"); the highest id + 1 is nr_cpu_ids, + // the exclusive upper bound for the rseq cpu_id. This differs from _SC_NPROCESSORS_CONF + // (a popcount) on sparse topologies: "0-3,8-11" yields 12 here but a count of 8. + int maxId = -1; + for (size_t index = 0; index < list.size();) { + if (list[index] < '0' || list[index] > '9') { + ++index; + continue; + } + int value = 0; + while (index < list.size() && list[index] >= '0' && list[index] <= '9') { + value = value * 10 + (list[index] - '0'); + ++index; + } + if (value > maxId) { + maxId = value; + } + } + return maxId >= 0 ? maxId + 1 : -1; +} + +#if defined(__linux__) + +//! Reads /sys/devices/system/cpu/possible and returns nr_cpu_ids, or -1 if it cannot be read. +static int TryReadPossibleCpuCount() +{ + auto* file = std::fopen("/sys/devices/system/cpu/possible", "re"); + if (!file) { + return -1; + } + char buffer[256] = {}; + size_t size = std::fread(buffer, 1, sizeof(buffer) - 1, file); + std::fclose(file); + return ParsePossibleCpuCount(std::string_view(buffer, size)); +} + +#endif + +// Published by GetCpuCount(); see the declaration in per_cpu-inl.h. Defaults to 0 so the +// fast path's bounds check sends every update to the safe atomic fallback until the size is +// known. +constinit int CpuCount = 0; + +int GetFallbackCpuId() +{ +#if defined(__linux__) + int cpuId = ::sched_getcpu(); + if (cpuId < 0) { + return 0; + } + int cpuCount = GetCpuCount(); + // Defensive: keep the index in range even if a CPU came online beyond the configured + // count. On the fallback path slots are touched atomically, so a shared slot is safe. + return cpuId < cpuCount ? cpuId : cpuId % cpuCount; +#else + return 0; +#endif +} + +} // namespace NDetail + +//////////////////////////////////////////////////////////////////////////////// + +int GetCpuCount() +{ + static std::once_flag OnceFlag; + std::call_once(OnceFlag, [] { + int cpuCount = 1; +#if defined(__linux__) + // The fast path indexes the slot array by the raw rseq cpu_id, so size to the highest + // CPU id the kernel can report plus one (nr_cpu_ids), not merely the number of CPUs; + // the possible-CPU bitmap gives this exact bound, and then every cpu_id is in range. + if (int possible = NDetail::TryReadPossibleCpuCount(); possible > 0) { + cpuCount = possible; + } else { + // Bitmap unavailable (e.g. /sys masked in a container): _SC_NPROCESSORS_CONF is a + // count, not a cpu_id bound, so on a sparse topology it may be smaller than some + // cpu_id. The fast path's bounds check then routes those CPUs to the clamped + // atomic fallback -- still memory-safe, though a clamped slot may mix atomic and + // non-atomic writes (at worst a lost counter update on such exotic setups). + int configured = static_cast<int>(::sysconf(_SC_NPROCESSORS_CONF)); + cpuCount = configured > 0 ? configured : 1; + } +#endif + // Publish for the fast-path bounds check before any update can index the array. + NDetail::CpuCount = cpuCount; + }); + return NDetail::CpuCount; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRseq diff --git a/library/cpp/yt/rseq/per_cpu.h b/library/cpp/yt/rseq/per_cpu.h new file mode 100644 index 00000000000..66fe8d02552 --- /dev/null +++ b/library/cpp/yt/rseq/per_cpu.h @@ -0,0 +1,76 @@ +#pragma once + +#include <util/system/types.h> + +#include <concepts> +#include <cstddef> +#include <type_traits> + +namespace NYT::NRseq { + +// The library is Linux-only: rseq is a Linux kernel feature and the build enforces OS_LINUX +// (see ya.make); off the rseq fast path the primitives fall back to plain atomics. + +//////////////////////////////////////////////////////////////////////////////// + +//! Number of shards a per-CPU array must provide -- one per CPU. +/*! + * Equals nr_cpu_ids (highest possible CPU id + 1), from /sys/devices/system/cpu/possible so + * it covers offlined and hot-pluggable CPUs. The fast path indexes by the raw rseq cpu_id, so + * a plain count (e.g. _SC_NPROCESSORS_CONF) would undersize the array on sparse topologies -- + * it is only the fallback when the bitmap is unreadable. Always >= 1; cached. + */ +int GetCpuCount(); + +//! Adds |delta| to the calling CPU's slot of a per-CPU array of shards, lock-free. +/*! + * |shards| is an array of GetCpuCount() |TShard| slots (typically cache-line padded); |field| + * selects the |TValue| to update. The stride is sizeof(TShard), which must be a multiple of 8 + * (checked at compile time) so the field stays 8-byte aligned (for a tear-free RMW). + * + * Fast path (x86-64 Linux): a non-atomic read-modify-write committed by an rseq critical + * section -- no atomic, no lock; safe against preemption/migration (the kernel restarts it) + * and other threads (one thread per CPU). Otherwise (non-x86-64 Linux, or no kernel rseq): an + * atomic fetch_add. A process uses one path consistently, so the two never mix on a slot + * (except on exotic sparse topologies; see per_cpu.cpp). + * + * WARNING (fiber TLS): the fast path reads the thread pointer, so reach #AddPerCpu only via a + * non-inlinable, fiber-switch-free frame (a virtual call or YT_PREVENT_TLS_CACHING; see + * library/cpp/yt/misc/tls.h). + */ +template <class TShard, class TValue> + requires std::integral<TValue> && (sizeof(TValue) == 8) +void AddPerCpu(TShard* shards, TValue TShard::* field, TValue delta); + +//! Stores |value| (8 or 16 bytes) into the calling CPU's slot, lock-free. +/*! + * |shards| / |field| as in #AddPerCpu; |TValue| is an 8- or 16-byte trivially-copyable type. + * + * Fast path (x86-64 Linux): an rseq-committed store (movq for 8 bytes, movdqu for 16); + * otherwise relaxed atomic store(s). An 8-byte store is single-copy atomic, never torn; a + * 16-byte store is not atomic on either path, so a reader on another CPU may see the halves + * split -- fine for a last-writer-wins gauge. + * + * WARNING (fiber TLS): same contract as #AddPerCpu. + */ +template <class TShard, class TValue> + requires (sizeof(TValue) == 8 || sizeof(TValue) == 16) && std::is_trivially_copyable_v<TValue> +void StorePerCpu(TShard* shards, TValue TShard::* field, TValue value); + +//! Relaxed atomic load of slot |index| -- the reader counterpart of #AddPerCpu. +/*! + * |shards| / |field| as in #AddPerCpu; reads shards[index].*field for |index| in + * [0, GetCpuCount()). Not tied to the calling CPU (no rseq fast path); aggregate a counter by + * loading every slot and summing. + */ +template <class TShard, class TValue> + requires std::integral<TValue> && (sizeof(TValue) == 8) +TValue LoadPerCpu(const TShard* shards, TValue TShard::* field, int index); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NRseq + +#define PER_CPU_INL_H_ +#include "per_cpu-inl.h" +#undef PER_CPU_INL_H_ diff --git a/library/cpp/yt/rseq/rseq.cpp b/library/cpp/yt/rseq/rseq.cpp index 9ebcb4f45ef..c2098d16d11 100644 --- a/library/cpp/yt/rseq/rseq.cpp +++ b/library/cpp/yt/rseq/rseq.cpp @@ -77,9 +77,10 @@ bool OwnsRegistration = false; bool RegisterCurrentThread() { // flags = 0. We pass the shared signature and the standard size so that whoever of - // {us, tcmalloc, librseq} runs first registers __rseq_abi and the rest get EBUSY, - // which is success for our read-only use (we never run rseq critical sections, so the - // signature only ever matters for matching this registration call). + // {us, tcmalloc, librseq} runs first registers __rseq_abi and the rest get EBUSY + // (success). The signature must be 0x53053053: it is the value emitted before abort_ip + // in the per_cpu rseq critical sections (see per_cpu-inl.h), and on a kernel abort the + // signature must match the registered one or the kernel delivers SIGSEGV. if (::syscall(RseqSyscallNumber, &__rseq_abi, RseqRegistrationSize, 0u, RseqSignature) == 0) { return true; } diff --git a/library/cpp/yt/rseq/unittests/per_cpu_ut.cpp b/library/cpp/yt/rseq/unittests/per_cpu_ut.cpp new file mode 100644 index 00000000000..71e0adef604 --- /dev/null +++ b/library/cpp/yt/rseq/unittests/per_cpu_ut.cpp @@ -0,0 +1,229 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yt/rseq/per_cpu.h> + +#include <library/cpp/yt/memory/public.h> + +#include <util/system/types.h> + +#include <atomic> +#include <iterator> +#include <thread> +#include <vector> + +namespace NYT::NRseq { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +// A per-CPU i64 accumulator built on the rseq primitives, mirroring how a profiling +// counter would use them. +class TPerCpuI64 +{ +public: + TPerCpuI64() + : Slots_(GetCpuCount()) + { } + + void Add(i64 value) + { + AddPerCpu(Slots_.data(), &TSlot::Value, value); + } + + i64 GetValue() const + { + i64 total = 0; + for (int index = 0; index < std::ssize(Slots_); ++index) { + total += LoadPerCpu(Slots_.data(), &TSlot::Value, index); + } + return total; + } + +private: + struct alignas(CacheLineSize) TSlot + { + i64 Value = 0; + }; + + static_assert(sizeof(TSlot) == CacheLineSize); + + std::vector<TSlot> Slots_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TPerCpuRseqTest, CpuCountIsSane) +{ + EXPECT_GE(GetCpuCount(), 1); + EXPECT_LE(GetCpuCount(), 1 << 20); +} + +TEST(TPerCpuRseqTest, ParsePossibleCpuCount) +{ + using NDetail::ParsePossibleCpuCount; + EXPECT_EQ(ParsePossibleCpuCount("0"), 1); + EXPECT_EQ(ParsePossibleCpuCount("0-3"), 4); + EXPECT_EQ(ParsePossibleCpuCount("0-63"), 64); + // Sparse mask: the bound is the highest id + 1 (12), not the CPU popcount (8). + EXPECT_EQ(ParsePossibleCpuCount("0-3,8-11"), 12); + EXPECT_EQ(ParsePossibleCpuCount("0-3\n"), 4); + EXPECT_EQ(ParsePossibleCpuCount(""), -1); + EXPECT_EQ(ParsePossibleCpuCount("\n"), -1); +} + +TEST(TPerCpuRseqTest, SingleThreadAccumulates) +{ + TPerCpuI64 counter; + constexpr i64 Iterations = 1'000'000; + for (i64 i = 0; i < Iterations; ++i) { + counter.Add(1); + } + EXPECT_EQ(counter.GetValue(), Iterations); +} + +TEST(TPerCpuRseqTest, SingleThreadHandlesNegativeAndLargeDeltas) +{ + TPerCpuI64 counter; + counter.Add(1'000'000'000'000LL); + counter.Add(-7); + counter.Add(-1'000'000'000'000LL); + EXPECT_EQ(counter.GetValue(), -7); +} + +// The core correctness guarantee: across many threads (which the scheduler migrates +// between CPUs, exercising rseq aborts/restarts), not a single increment is lost. +TEST(TPerCpuRseqTest, ConcurrentNoLostUpdates) +{ + TPerCpuI64 counter; + + const int threadCount = std::max<int>(4, std::thread::hardware_concurrency()); + constexpr i64 PerThread = 2'000'000; + + std::atomic<bool> start{false}; + std::vector<std::thread> threads; + for (int t = 0; t < threadCount; ++t) { + threads.emplace_back([&] { + while (!start.load(std::memory_order::acquire)) { + } + for (i64 i = 0; i < PerThread; ++i) { + counter.Add(1); + } + }); + } + start.store(true, std::memory_order::release); + for (auto& thread : threads) { + thread.join(); + } + + EXPECT_EQ(counter.GetValue(), static_cast<i64>(threadCount) * PerThread); +} + +// Independent counters updated concurrently must not interfere with each other. +TEST(TPerCpuRseqTest, IndependentCountersDoNotInterfere) +{ + TPerCpuI64 a; + TPerCpuI64 b; + + const int threadCount = std::max<int>(4, std::thread::hardware_concurrency()); + constexpr i64 PerThread = 1'000'000; + + std::vector<std::thread> threads; + for (int t = 0; t < threadCount; ++t) { + threads.emplace_back([&, t] { + for (i64 i = 0; i < PerThread; ++i) { + a.Add(1); + if (t % 2 == 0) { + b.Add(2); + } + } + }); + } + for (auto& thread : threads) { + thread.join(); + } + + EXPECT_EQ(a.GetValue(), static_cast<i64>(threadCount) * PerThread); + EXPECT_EQ(b.GetValue(), static_cast<i64>((threadCount + 1) / 2) * PerThread * 2); +} + +//////////////////////////////////////////////////////////////////////////////// + +struct TPair +{ + ui64 A; + ui64 B; +}; + +struct alignas(CacheLineSize) TPairSlot +{ + TPair Value{}; +}; + +static_assert(sizeof(TPairSlot) == CacheLineSize); + +TEST(TPerCpuRseqTest, StorePerCpuPublishesValue) +{ + std::vector<TPairSlot> slots(GetCpuCount()); + constexpr ui64 Last = 100'000; + for (ui64 i = 1; i <= Last; ++i) { + StorePerCpu(slots.data(), &TPairSlot::Value, TPair{i, i}); + } + + bool foundLast = false; + for (const auto& slot : slots) { + // No store ever writes mismatched halves, so any populated slot must be consistent. + EXPECT_EQ(slot.Value.A, slot.Value.B); + if (slot.Value.A == Last) { + foundLast = true; + } + } + EXPECT_TRUE(foundLast); +} + +//////////////////////////////////////////////////////////////////////////////// + +struct alignas(CacheLineSize) TWordSlot +{ + ui64 Value = 0; +}; + +static_assert(sizeof(TWordSlot) == CacheLineSize); + +TEST(TPerCpuRseqTest, StorePerCpu8PublishesValue) +{ + std::vector<TWordSlot> slots(GetCpuCount()); + constexpr ui64 Last = 100'000; + for (ui64 i = 1; i <= Last; ++i) { + StorePerCpu(slots.data(), &TWordSlot::Value, i); + } + + bool foundLast = false; + for (const auto& slot : slots) { + if (slot.Value == Last) { + foundLast = true; + } + } + EXPECT_TRUE(foundLast); +} + +//////////////////////////////////////////////////////////////////////////////// + +// LoadPerCpu must read exactly the requested slot (verifies the base + index * stride +// addressing), independent of the calling CPU. +TEST(TPerCpuRseqTest, LoadPerCpuReadsRequestedSlot) +{ + std::vector<TWordSlot> slots(GetCpuCount()); + for (int index = 0; index < std::ssize(slots); ++index) { + slots[index].Value = static_cast<ui64>(index) * 100 + 1; + } + for (int index = 0; index < std::ssize(slots); ++index) { + EXPECT_EQ( + LoadPerCpu(slots.data(), &TWordSlot::Value, index), + static_cast<ui64>(index) * 100 + 1); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace +} // namespace NYT::NRseq diff --git a/library/cpp/yt/rseq/unittests/ya.make b/library/cpp/yt/rseq/unittests/ya.make new file mode 100644 index 00000000000..ed82f7781e6 --- /dev/null +++ b/library/cpp/yt/rseq/unittests/ya.make @@ -0,0 +1,14 @@ +GTEST() + +INCLUDE(${ARCADIA_ROOT}/library/cpp/yt/ya_cpp.make.inc) + +SRCS( + per_cpu_ut.cpp +) + +PEERDIR( + library/cpp/yt/memory + library/cpp/yt/rseq +) + +END() diff --git a/library/cpp/yt/rseq/ya.make b/library/cpp/yt/rseq/ya.make index f0f55e17f49..dbe6491bf89 100644 --- a/library/cpp/yt/rseq/ya.make +++ b/library/cpp/yt/rseq/ya.make @@ -7,11 +7,17 @@ IF (NOT OS_LINUX) ENDIF() SRCS( + per_cpu.cpp rseq.cpp ) PEERDIR( + library/cpp/yt/assert library/cpp/yt/misc ) END() + +RECURSE_FOR_TESTS( + unittests +) |
