diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/memory_log | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/memory_log')
-rw-r--r-- | library/cpp/actors/memory_log/memlog.cpp | 367 | ||||
-rw-r--r-- | library/cpp/actors/memory_log/memlog.h | 211 | ||||
-rw-r--r-- | library/cpp/actors/memory_log/mmap.cpp | 63 | ||||
-rw-r--r-- | library/cpp/actors/memory_log/ya.make | 19 |
4 files changed, 660 insertions, 0 deletions
diff --git a/library/cpp/actors/memory_log/memlog.cpp b/library/cpp/actors/memory_log/memlog.cpp new file mode 100644 index 0000000000..8e6b46727d --- /dev/null +++ b/library/cpp/actors/memory_log/memlog.cpp @@ -0,0 +1,367 @@ +#include "memlog.h" + +#include <library/cpp/actors/util/datetime.h> + +#include <util/system/info.h> +#include <util/system/atomic.h> +#include <util/system/align.h> + +#include <contrib/libs/linuxvdso/interface.h> + +#if (defined(_i386_) || defined(_x86_64_)) && defined(_linux_) +#define HAVE_VDSO_GETCPU 1 +#include <contrib/libs/linuxvdso/interface.h> +static int (*FastGetCpu)(unsigned* cpu, unsigned* node, void* unused); +#endif + +#if defined(_unix_) +#include <sched.h> +#elif defined(_win_) +#include <WinBase.h> +#else +#error NO IMPLEMENTATION FOR THE PLATFORM +#endif + +const char TMemoryLog::DEFAULT_LAST_MARK[16] = { + 'c', + 'b', + '7', + 'B', + '6', + '8', + 'a', + '8', + 'A', + '5', + '6', + '1', + '6', + '4', + '5', + '\n', +}; + +const char TMemoryLog::CLEAR_MARK[16] = { + ' ', + ' ', + ' ', + ' ', + ' ', + ' ', + ' ', + ' ', + ' ', + ' ', + ' ', + ' ', + ' ', + ' ', + ' ', + '\n', +}; + +unsigned TMemoryLog::GetSelfCpu() noexcept { +#if defined(_unix_) +#if HAVE_VDSO_GETCPU + unsigned cpu; + if (Y_LIKELY(FastGetCpu != nullptr)) { + auto result = FastGetCpu(&cpu, nullptr, nullptr); + Y_VERIFY(result == 0); + return cpu; + } else { + return 0; + } + +#elif defined(_x86_64_) || defined(_i386_) + +#define CPUID(func, eax, ebx, ecx, edx) \ + __asm__ __volatile__( \ + "cpuid" \ + : "=a"(eax), "=b"(ebx), "=c"(ecx), "=d"(edx) \ + : "a"(func)); + + int a = 0, b = 0, c = 0, d = 0; + CPUID(0x1, a, b, c, d); + int acpiID = (b >> 24); + return acpiID; + +#elif defined(__CNUC__) + return sched_getcpu(); +#else + return 0; +#endif + +#elif defined(_win_) + return GetCurrentProcessorNumber(); +#else + return 0; +#endif +} + +TMemoryLog* TMemoryLog::MemLogBuffer = nullptr; +Y_POD_THREAD(TThread::TId) +TMemoryLog::LogThreadId; +char* TMemoryLog::LastMarkIsHere = nullptr; + +std::atomic<bool> TMemoryLog::PrintLastMark(true); + +TMemoryLog::TMemoryLog(size_t totalSize, size_t grainSize) + : GrainSize(grainSize) + , FreeGrains(DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE * 2) + , Buf(totalSize) +{ + Y_VERIFY(DEFAULT_TOTAL_SIZE % DEFAULT_GRAIN_SIZE == 0); + NumberOfGrains = DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE; + + for (size_t i = 0; i < NumberOfGrains; ++i) { + new (GetGrain(i)) TGrain; + } + + NumberOfCpus = NSystemInfo::NumberOfCpus(); + Y_VERIFY(NumberOfGrains > NumberOfCpus); + ActiveGrains.Reset(new TGrain*[NumberOfCpus]); + for (size_t i = 0; i < NumberOfCpus; ++i) { + ActiveGrains[i] = GetGrain(i); + } + + for (size_t i = NumberOfCpus; i < NumberOfGrains; ++i) { + FreeGrains.StubbornPush(GetGrain(i)); + } + +#if HAVE_VDSO_GETCPU + auto vdsoFunc = (decltype(FastGetCpu)) + NVdso::Function("__vdso_getcpu", "LINUX_2.6"); + AtomicSet(FastGetCpu, vdsoFunc); +#endif +} + +void* TMemoryLog::GetWriteBuffer(size_t amount) noexcept { + // alignment required by NoCacheMemcpy + amount = AlignUp<size_t>(amount, MemcpyAlignment); + + for (ui16 tries = MAX_GET_BUFFER_TRIES; tries-- > 0;) { + auto myCpu = GetSelfCpu(); + + TGrain* grain = AtomicGet(ActiveGrains[myCpu]); + + if (grain != nullptr) { + auto mine = AtomicGetAndAdd(grain->WritePointer, amount); + if (mine + amount <= GrainSize - sizeof(TGrain)) { + return &grain->Data[mine]; + } + + if (!AtomicCas(&ActiveGrains[myCpu], 0, grain)) { + continue; + } + + FreeGrains.StubbornPush(grain); + } + + grain = (TGrain*)FreeGrains.Pop(); + + if (grain == nullptr) { + return nullptr; + } + + grain->WritePointer = 0; + + if (!AtomicCas(&ActiveGrains[myCpu], grain, 0)) { + FreeGrains.StubbornPush(grain); + continue; + } + } + + return nullptr; +} + +void ClearAlignedTail(char* tail) noexcept { + auto aligned = AlignUp(tail, TMemoryLog::MemcpyAlignment); + if (aligned > tail) { + memset(tail, 0, aligned - tail); + } +} + +#if defined(_x86_64_) || defined(_i386_) +#include <xmmintrin.h> +// the main motivation is not poluting CPU cache +NO_SANITIZE_THREAD +void NoCacheMemcpy(char* dst, const char* src, size_t size) noexcept { + while (size >= sizeof(__m128) * 2) { + __m128 a = _mm_load_ps((float*)(src + 0 * sizeof(__m128))); + __m128 b = _mm_load_ps((float*)(src + 1 * sizeof(__m128))); + _mm_stream_ps((float*)(dst + 0 * sizeof(__m128)), a); + _mm_stream_ps((float*)(dst + 1 * sizeof(__m128)), b); + + size -= sizeof(__m128) * 2; + src += sizeof(__m128) * 2; + dst += sizeof(__m128) * 2; + } + memcpy(dst, src, size); +} + +NO_SANITIZE_THREAD +void NoWCacheMemcpy(char* dst, const char* src, size_t size) noexcept { + constexpr ui16 ITEMS_COUNT = 1024; + alignas(TMemoryLog::MemcpyAlignment) __m128 buf[ITEMS_COUNT]; + while (size >= sizeof(buf)) { + memcpy(&buf, src, sizeof(buf)); + + for (ui16 i = 0; i < ITEMS_COUNT; ++i) { + _mm_stream_ps((float*)dst, buf[i]); + dst += sizeof(__m128); + } + + size -= sizeof(buf); + src += sizeof(buf); + } + + memcpy(&buf, src, size); + // no problem to copy few bytes more + size = AlignUp(size, sizeof(__m128)); + for (ui16 i = 0; i < size / sizeof(__m128); ++i) { + _mm_stream_ps((float*)dst, buf[i]); + dst += sizeof(__m128); + } +} + +#endif + +NO_SANITIZE_THREAD +char* BareMemLogWrite(const char* begin, size_t msgSize, bool isLast) noexcept { + bool lastMark = + isLast && TMemoryLog::PrintLastMark.load(std::memory_order_acquire); + size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize; + + char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount); + if (buffer == nullptr) { + return nullptr; + } + +#if defined(_x86_64_) || defined(_i386_) + if (AlignDown(begin, TMemoryLog::MemcpyAlignment) == begin) { + NoCacheMemcpy(buffer, begin, msgSize); + } else { + NoWCacheMemcpy(buffer, begin, msgSize); + } +#else + memcpy(buffer, begin, msgSize); +#endif + + if (lastMark) { + TMemoryLog::ChangeLastMark(buffer + msgSize); + } + + ClearAlignedTail(buffer + amount); + return buffer; +} + +NO_SANITIZE_THREAD +bool MemLogWrite(const char* begin, size_t msgSize, bool addLF) noexcept { + bool lastMark = TMemoryLog::PrintLastMark.load(std::memory_order_acquire); + size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize; + + // Let's construct prolog with timestamp and thread id + auto threadId = TMemoryLog::GetTheadId(); + + // alignment required by NoCacheMemcpy + // check for format for snprintf + constexpr size_t prologSize = 48; + alignas(TMemoryLog::MemcpyAlignment) char prolog[prologSize + 1]; + Y_VERIFY(AlignDown(&prolog, TMemoryLog::MemcpyAlignment) == &prolog); + + int snprintfResult = snprintf(prolog, prologSize + 1, + "TS %020" PRIu64 " TI %020" PRIu64 " ", GetCycleCountFast(), threadId); + + if (snprintfResult < 0) { + return false; + } + Y_VERIFY(snprintfResult == prologSize); + + amount += prologSize; + if (addLF) { + ++amount; // add 1 byte for \n at the end of the message + } + + char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount); + if (buffer == nullptr) { + return false; + } + +#if defined(_x86_64_) || defined(_i386_) + // warning: copy prolog first to avoid corruption of the message + // by prolog tail + NoCacheMemcpy(buffer, prolog, prologSize); + if (AlignDown(begin + prologSize, TMemoryLog::MemcpyAlignment) == begin + prologSize) { + NoCacheMemcpy(buffer + prologSize, begin, msgSize); + } else { + NoWCacheMemcpy(buffer + prologSize, begin, msgSize); + } +#else + memcpy(buffer, prolog, prologSize); + memcpy(buffer + prologSize, begin, msgSize); +#endif + + if (addLF) { + buffer[prologSize + msgSize] = '\n'; + } + + if (lastMark) { + TMemoryLog::ChangeLastMark(buffer + prologSize + msgSize + (int)addLF); + } + + ClearAlignedTail(buffer + amount); + return true; +} + +NO_SANITIZE_THREAD +void TMemoryLog::ChangeLastMark(char* buffer) noexcept { + memcpy(buffer, DEFAULT_LAST_MARK, LAST_MARK_SIZE); + auto oldMark = AtomicSwap(&LastMarkIsHere, buffer); + if (Y_LIKELY(oldMark != nullptr)) { + memcpy(oldMark, CLEAR_MARK, LAST_MARK_SIZE); + } + if (AtomicGet(LastMarkIsHere) != buffer) { + memcpy(buffer, CLEAR_MARK, LAST_MARK_SIZE); + AtomicBarrier(); + } +} + +bool MemLogVPrintF(const char* format, va_list params) noexcept { + auto logger = TMemoryLog::GetMemoryLogger(); + if (logger == nullptr) { + return false; + } + + auto threadId = TMemoryLog::GetTheadId(); + + // alignment required by NoCacheMemcpy + alignas(TMemoryLog::MemcpyAlignment) char buf[TMemoryLog::MAX_MESSAGE_SIZE]; + Y_VERIFY(AlignDown(&buf, TMemoryLog::MemcpyAlignment) == &buf); + + int prologSize = snprintf(buf, + TMemoryLog::MAX_MESSAGE_SIZE - 2, + "TS %020" PRIu64 " TI %020" PRIu64 " ", + GetCycleCountFast(), + threadId); + + if (Y_UNLIKELY(prologSize < 0)) { + return false; + } + Y_VERIFY((ui32)prologSize <= TMemoryLog::MAX_MESSAGE_SIZE); + + int add = vsnprintf( + &buf[prologSize], + TMemoryLog::MAX_MESSAGE_SIZE - prologSize - 2, + format, params); + + if (Y_UNLIKELY(add < 0)) { + return false; + } + Y_VERIFY(add >= 0); + auto totalSize = prologSize + add; + + buf[totalSize++] = '\n'; + Y_VERIFY((ui32)totalSize <= TMemoryLog::MAX_MESSAGE_SIZE); + + return BareMemLogWrite(buf, totalSize) != nullptr; +} diff --git a/library/cpp/actors/memory_log/memlog.h b/library/cpp/actors/memory_log/memlog.h new file mode 100644 index 0000000000..2aa27272a6 --- /dev/null +++ b/library/cpp/actors/memory_log/memlog.h @@ -0,0 +1,211 @@ +#pragma once + +#include <library/cpp/threading/queue/mpmc_unordered_ring.h> +#include <util/generic/string.h> +#include <util/string/printf.h> +#include <util/system/datetime.h> +#include <util/system/thread.h> +#include <util/system/types.h> +#include <util/system/atomic.h> +#include <util/system/align.h> +#include <util/system/tls.h> + +#include <atomic> +#include <cstdio> + +#ifdef _win_ +#include <util/system/winint.h> +#endif + +#ifndef NO_SANITIZE_THREAD +#define NO_SANITIZE_THREAD +#if defined(__has_feature) +#if __has_feature(thread_sanitizer) +#undef NO_SANITIZE_THREAD +#define NO_SANITIZE_THREAD __attribute__((no_sanitize_thread)) +#endif +#endif +#endif + +class TMemoryLog { +public: + static constexpr size_t DEFAULT_TOTAL_SIZE = 10 * 1024 * 1024; + static constexpr size_t DEFAULT_GRAIN_SIZE = 1024 * 64; + static constexpr size_t MAX_MESSAGE_SIZE = 1024; + static constexpr ui16 MAX_GET_BUFFER_TRIES = 4; + static constexpr ui16 MemcpyAlignment = 16; + + // search for cb7B68a8A561645 + static const char DEFAULT_LAST_MARK[16]; + static const char CLEAR_MARK[16]; + + static constexpr size_t LAST_MARK_SIZE = sizeof(DEFAULT_LAST_MARK); + + inline static TMemoryLog* GetMemoryLogger() noexcept { + return AtomicGet(MemLogBuffer); + } + + void* GetWriteBuffer(size_t amount) noexcept; + + inline static void* GetWriteBufferStatic(size_t amount) noexcept { + auto logger = GetMemoryLogger(); + if (logger == nullptr) { + return nullptr; + } + return logger->GetWriteBuffer(amount); + } + + size_t GetGlobalBufferSize() const noexcept { + return Buf.GetSize(); + } + + inline static void CreateMemoryLogBuffer( + size_t totalSize = DEFAULT_TOTAL_SIZE, + size_t grainSize = DEFAULT_GRAIN_SIZE) + Y_COLD { + if (AtomicGet(MemLogBuffer) != nullptr) { + return; + } + + AtomicSet(MemLogBuffer, new TMemoryLog(totalSize, grainSize)); + } + + static std::atomic<bool> PrintLastMark; + + // buffer must be at least 16 bytes + static void ChangeLastMark(char* buffer) noexcept; + + inline static TThread::TId GetTheadId() noexcept { + if (LogThreadId == 0) { + LogThreadId = TThread::CurrentThreadId(); + } + return LogThreadId; + } + +private: + TMemoryLog(size_t totalSize, size_t grainSize) Y_COLD; + + struct TGrain { + TAtomic WritePointer = 0; + char Padding[MemcpyAlignment - sizeof(TAtomic)]; + char Data[]; + }; + + size_t NumberOfCpus; + size_t GrainSize; + size_t NumberOfGrains; + TArrayPtr<TGrain*> ActiveGrains; + NThreading::TMPMCUnorderedRing FreeGrains; + + TGrain* GetGrain(size_t grainIndex) const noexcept { + return (TGrain*)((char*)GetGlobalBuffer() + GrainSize * grainIndex); + } + + class TMMapArea { + public: + TMMapArea(size_t amount) Y_COLD { + MMap(amount); + } + + TMMapArea(const TMMapArea&) = delete; + TMMapArea& operator=(const TMMapArea& copy) = delete; + + TMMapArea(TMMapArea&& move) Y_COLD { + BufPtr = move.BufPtr; + Size = move.Size; + + move.BufPtr = nullptr; + move.Size = 0; + } + + TMMapArea& operator=(TMMapArea&& move) Y_COLD { + BufPtr = move.BufPtr; + Size = move.Size; + + move.BufPtr = nullptr; + move.Size = 0; + return *this; + } + + void Reset(size_t amount) Y_COLD { + MUnmap(); + MMap(amount); + } + + ~TMMapArea() noexcept Y_COLD { + MUnmap(); + } + + size_t GetSize() const noexcept { + return Size; + } + + void* GetPtr() const noexcept { + return BufPtr; + } + + private: + void* BufPtr; + size_t Size; +#ifdef _win_ + HANDLE Mapping; +#endif + + void MMap(size_t amount); + void MUnmap(); + }; + + TMMapArea Buf; + + void* GetGlobalBuffer() const noexcept { + return Buf.GetPtr(); + } + + static unsigned GetSelfCpu() noexcept; + + static TMemoryLog* MemLogBuffer; + static Y_POD_THREAD(TThread::TId) LogThreadId; + static char* LastMarkIsHere; +}; + +// it's no use of sanitizing this function +NO_SANITIZE_THREAD +char* BareMemLogWrite( + const char* begin, size_t msgSize, bool isLast = true) noexcept; + +// it's no use of sanitizing this function +NO_SANITIZE_THREAD +bool MemLogWrite( + const char* begin, size_t msgSize, bool addLF = false) noexcept; + +Y_WRAPPER inline bool MemLogWrite(const char* begin, const char* end) noexcept { + if (end <= begin) { + return false; + } + + size_t msgSize = end - begin; + return MemLogWrite(begin, msgSize); +} + +template <typename TObj> +bool MemLogWriteStruct(const TObj* obj) noexcept { + auto begin = (const char*)(const void*)obj; + return MemLogWrite(begin, begin + sizeof(TObj)); +} + +Y_PRINTF_FORMAT(1, 0) +bool MemLogVPrintF(const char* format, va_list params) noexcept; + +Y_PRINTF_FORMAT(1, 2) +Y_WRAPPER +inline bool MemLogPrintF(const char* format, ...) noexcept { + va_list params; + va_start(params, format); + auto result = MemLogVPrintF(format, params); + va_end(params); + return result; +} + +Y_WRAPPER inline bool MemLogWriteNullTerm(const char* str) noexcept { + return MemLogWrite(str, strlen(str)); +} diff --git a/library/cpp/actors/memory_log/mmap.cpp b/library/cpp/actors/memory_log/mmap.cpp new file mode 100644 index 0000000000..201998d343 --- /dev/null +++ b/library/cpp/actors/memory_log/mmap.cpp @@ -0,0 +1,63 @@ +#include "memlog.h" + +#if defined(_unix_) +#include <sys/mman.h> +#elif defined(_win_) +#include <util/system/winint.h> +#else +#error NO IMPLEMENTATION FOR THE PLATFORM +#endif + +void TMemoryLog::TMMapArea::MMap(size_t amount) { + Y_VERIFY(amount > 0); + +#if defined(_unix_) + constexpr int mmapProt = PROT_READ | PROT_WRITE; +#if defined(_linux_) + constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON | MAP_POPULATE; +#else + constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON; +#endif + + BufPtr = ::mmap(nullptr, amount, mmapProt, mmapFlags, -1, 0); + if (BufPtr == MAP_FAILED) { + throw std::bad_alloc(); + } + +#elif defined(_win_) + Mapping = ::CreateFileMapping( + (HANDLE)-1, nullptr, PAGE_READWRITE, 0, amount, nullptr); + if (Mapping == NULL) { + throw std::bad_alloc(); + } + BufPtr = ::MapViewOfFile(Mapping, FILE_MAP_WRITE, 0, 0, amount); + if (BufPtr == NULL) { + throw std::bad_alloc(); + } +#endif + + Size = amount; +} + +void TMemoryLog::TMMapArea::MUnmap() { + if (BufPtr == nullptr) { + return; + } + +#if defined(_unix_) + int result = ::munmap(BufPtr, Size); + Y_VERIFY(result == 0); + +#elif defined(_win_) + BOOL result = ::UnmapViewOfFile(BufPtr); + Y_VERIFY(result != 0); + + result = ::CloseHandle(Mapping); + Y_VERIFY(result != 0); + + Mapping = 0; +#endif + + BufPtr = nullptr; + Size = 0; +} diff --git a/library/cpp/actors/memory_log/ya.make b/library/cpp/actors/memory_log/ya.make new file mode 100644 index 0000000000..d89d5db4d7 --- /dev/null +++ b/library/cpp/actors/memory_log/ya.make @@ -0,0 +1,19 @@ +LIBRARY() + +OWNER( + agri + g:kikimr +) + +SRCS( + memlog.cpp + memlog.h + mmap.cpp +) + +PEERDIR( + library/cpp/threading/queue + contrib/libs/linuxvdso +) + +END() |