aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/memory_log
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/memory_log
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/memory_log')
-rw-r--r--library/cpp/actors/memory_log/memlog.cpp367
-rw-r--r--library/cpp/actors/memory_log/memlog.h211
-rw-r--r--library/cpp/actors/memory_log/mmap.cpp63
-rw-r--r--library/cpp/actors/memory_log/ya.make19
4 files changed, 660 insertions, 0 deletions
diff --git a/library/cpp/actors/memory_log/memlog.cpp b/library/cpp/actors/memory_log/memlog.cpp
new file mode 100644
index 0000000000..8e6b46727d
--- /dev/null
+++ b/library/cpp/actors/memory_log/memlog.cpp
@@ -0,0 +1,367 @@
+#include "memlog.h"
+
+#include <library/cpp/actors/util/datetime.h>
+
+#include <util/system/info.h>
+#include <util/system/atomic.h>
+#include <util/system/align.h>
+
+#include <contrib/libs/linuxvdso/interface.h>
+
+#if (defined(_i386_) || defined(_x86_64_)) && defined(_linux_)
+#define HAVE_VDSO_GETCPU 1
+#include <contrib/libs/linuxvdso/interface.h>
+static int (*FastGetCpu)(unsigned* cpu, unsigned* node, void* unused);
+#endif
+
+#if defined(_unix_)
+#include <sched.h>
+#elif defined(_win_)
+#include <WinBase.h>
+#else
+#error NO IMPLEMENTATION FOR THE PLATFORM
+#endif
+
+const char TMemoryLog::DEFAULT_LAST_MARK[16] = {
+ 'c',
+ 'b',
+ '7',
+ 'B',
+ '6',
+ '8',
+ 'a',
+ '8',
+ 'A',
+ '5',
+ '6',
+ '1',
+ '6',
+ '4',
+ '5',
+ '\n',
+};
+
+const char TMemoryLog::CLEAR_MARK[16] = {
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ '\n',
+};
+
+unsigned TMemoryLog::GetSelfCpu() noexcept {
+#if defined(_unix_)
+#if HAVE_VDSO_GETCPU
+ unsigned cpu;
+ if (Y_LIKELY(FastGetCpu != nullptr)) {
+ auto result = FastGetCpu(&cpu, nullptr, nullptr);
+ Y_VERIFY(result == 0);
+ return cpu;
+ } else {
+ return 0;
+ }
+
+#elif defined(_x86_64_) || defined(_i386_)
+
+#define CPUID(func, eax, ebx, ecx, edx) \
+ __asm__ __volatile__( \
+ "cpuid" \
+ : "=a"(eax), "=b"(ebx), "=c"(ecx), "=d"(edx) \
+ : "a"(func));
+
+ int a = 0, b = 0, c = 0, d = 0;
+ CPUID(0x1, a, b, c, d);
+ int acpiID = (b >> 24);
+ return acpiID;
+
+#elif defined(__CNUC__)
+ return sched_getcpu();
+#else
+ return 0;
+#endif
+
+#elif defined(_win_)
+ return GetCurrentProcessorNumber();
+#else
+ return 0;
+#endif
+}
+
+TMemoryLog* TMemoryLog::MemLogBuffer = nullptr;
+Y_POD_THREAD(TThread::TId)
+TMemoryLog::LogThreadId;
+char* TMemoryLog::LastMarkIsHere = nullptr;
+
+std::atomic<bool> TMemoryLog::PrintLastMark(true);
+
+TMemoryLog::TMemoryLog(size_t totalSize, size_t grainSize)
+ : GrainSize(grainSize)
+ , FreeGrains(DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE * 2)
+ , Buf(totalSize)
+{
+ Y_VERIFY(DEFAULT_TOTAL_SIZE % DEFAULT_GRAIN_SIZE == 0);
+ NumberOfGrains = DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE;
+
+ for (size_t i = 0; i < NumberOfGrains; ++i) {
+ new (GetGrain(i)) TGrain;
+ }
+
+ NumberOfCpus = NSystemInfo::NumberOfCpus();
+ Y_VERIFY(NumberOfGrains > NumberOfCpus);
+ ActiveGrains.Reset(new TGrain*[NumberOfCpus]);
+ for (size_t i = 0; i < NumberOfCpus; ++i) {
+ ActiveGrains[i] = GetGrain(i);
+ }
+
+ for (size_t i = NumberOfCpus; i < NumberOfGrains; ++i) {
+ FreeGrains.StubbornPush(GetGrain(i));
+ }
+
+#if HAVE_VDSO_GETCPU
+ auto vdsoFunc = (decltype(FastGetCpu))
+ NVdso::Function("__vdso_getcpu", "LINUX_2.6");
+ AtomicSet(FastGetCpu, vdsoFunc);
+#endif
+}
+
+void* TMemoryLog::GetWriteBuffer(size_t amount) noexcept {
+ // alignment required by NoCacheMemcpy
+ amount = AlignUp<size_t>(amount, MemcpyAlignment);
+
+ for (ui16 tries = MAX_GET_BUFFER_TRIES; tries-- > 0;) {
+ auto myCpu = GetSelfCpu();
+
+ TGrain* grain = AtomicGet(ActiveGrains[myCpu]);
+
+ if (grain != nullptr) {
+ auto mine = AtomicGetAndAdd(grain->WritePointer, amount);
+ if (mine + amount <= GrainSize - sizeof(TGrain)) {
+ return &grain->Data[mine];
+ }
+
+ if (!AtomicCas(&ActiveGrains[myCpu], 0, grain)) {
+ continue;
+ }
+
+ FreeGrains.StubbornPush(grain);
+ }
+
+ grain = (TGrain*)FreeGrains.Pop();
+
+ if (grain == nullptr) {
+ return nullptr;
+ }
+
+ grain->WritePointer = 0;
+
+ if (!AtomicCas(&ActiveGrains[myCpu], grain, 0)) {
+ FreeGrains.StubbornPush(grain);
+ continue;
+ }
+ }
+
+ return nullptr;
+}
+
+void ClearAlignedTail(char* tail) noexcept {
+ auto aligned = AlignUp(tail, TMemoryLog::MemcpyAlignment);
+ if (aligned > tail) {
+ memset(tail, 0, aligned - tail);
+ }
+}
+
+#if defined(_x86_64_) || defined(_i386_)
+#include <xmmintrin.h>
+// the main motivation is not poluting CPU cache
+NO_SANITIZE_THREAD
+void NoCacheMemcpy(char* dst, const char* src, size_t size) noexcept {
+ while (size >= sizeof(__m128) * 2) {
+ __m128 a = _mm_load_ps((float*)(src + 0 * sizeof(__m128)));
+ __m128 b = _mm_load_ps((float*)(src + 1 * sizeof(__m128)));
+ _mm_stream_ps((float*)(dst + 0 * sizeof(__m128)), a);
+ _mm_stream_ps((float*)(dst + 1 * sizeof(__m128)), b);
+
+ size -= sizeof(__m128) * 2;
+ src += sizeof(__m128) * 2;
+ dst += sizeof(__m128) * 2;
+ }
+ memcpy(dst, src, size);
+}
+
+NO_SANITIZE_THREAD
+void NoWCacheMemcpy(char* dst, const char* src, size_t size) noexcept {
+ constexpr ui16 ITEMS_COUNT = 1024;
+ alignas(TMemoryLog::MemcpyAlignment) __m128 buf[ITEMS_COUNT];
+ while (size >= sizeof(buf)) {
+ memcpy(&buf, src, sizeof(buf));
+
+ for (ui16 i = 0; i < ITEMS_COUNT; ++i) {
+ _mm_stream_ps((float*)dst, buf[i]);
+ dst += sizeof(__m128);
+ }
+
+ size -= sizeof(buf);
+ src += sizeof(buf);
+ }
+
+ memcpy(&buf, src, size);
+ // no problem to copy few bytes more
+ size = AlignUp(size, sizeof(__m128));
+ for (ui16 i = 0; i < size / sizeof(__m128); ++i) {
+ _mm_stream_ps((float*)dst, buf[i]);
+ dst += sizeof(__m128);
+ }
+}
+
+#endif
+
+NO_SANITIZE_THREAD
+char* BareMemLogWrite(const char* begin, size_t msgSize, bool isLast) noexcept {
+ bool lastMark =
+ isLast && TMemoryLog::PrintLastMark.load(std::memory_order_acquire);
+ size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize;
+
+ char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount);
+ if (buffer == nullptr) {
+ return nullptr;
+ }
+
+#if defined(_x86_64_) || defined(_i386_)
+ if (AlignDown(begin, TMemoryLog::MemcpyAlignment) == begin) {
+ NoCacheMemcpy(buffer, begin, msgSize);
+ } else {
+ NoWCacheMemcpy(buffer, begin, msgSize);
+ }
+#else
+ memcpy(buffer, begin, msgSize);
+#endif
+
+ if (lastMark) {
+ TMemoryLog::ChangeLastMark(buffer + msgSize);
+ }
+
+ ClearAlignedTail(buffer + amount);
+ return buffer;
+}
+
+NO_SANITIZE_THREAD
+bool MemLogWrite(const char* begin, size_t msgSize, bool addLF) noexcept {
+ bool lastMark = TMemoryLog::PrintLastMark.load(std::memory_order_acquire);
+ size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize;
+
+ // Let's construct prolog with timestamp and thread id
+ auto threadId = TMemoryLog::GetTheadId();
+
+ // alignment required by NoCacheMemcpy
+ // check for format for snprintf
+ constexpr size_t prologSize = 48;
+ alignas(TMemoryLog::MemcpyAlignment) char prolog[prologSize + 1];
+ Y_VERIFY(AlignDown(&prolog, TMemoryLog::MemcpyAlignment) == &prolog);
+
+ int snprintfResult = snprintf(prolog, prologSize + 1,
+ "TS %020" PRIu64 " TI %020" PRIu64 " ", GetCycleCountFast(), threadId);
+
+ if (snprintfResult < 0) {
+ return false;
+ }
+ Y_VERIFY(snprintfResult == prologSize);
+
+ amount += prologSize;
+ if (addLF) {
+ ++amount; // add 1 byte for \n at the end of the message
+ }
+
+ char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount);
+ if (buffer == nullptr) {
+ return false;
+ }
+
+#if defined(_x86_64_) || defined(_i386_)
+ // warning: copy prolog first to avoid corruption of the message
+ // by prolog tail
+ NoCacheMemcpy(buffer, prolog, prologSize);
+ if (AlignDown(begin + prologSize, TMemoryLog::MemcpyAlignment) == begin + prologSize) {
+ NoCacheMemcpy(buffer + prologSize, begin, msgSize);
+ } else {
+ NoWCacheMemcpy(buffer + prologSize, begin, msgSize);
+ }
+#else
+ memcpy(buffer, prolog, prologSize);
+ memcpy(buffer + prologSize, begin, msgSize);
+#endif
+
+ if (addLF) {
+ buffer[prologSize + msgSize] = '\n';
+ }
+
+ if (lastMark) {
+ TMemoryLog::ChangeLastMark(buffer + prologSize + msgSize + (int)addLF);
+ }
+
+ ClearAlignedTail(buffer + amount);
+ return true;
+}
+
+NO_SANITIZE_THREAD
+void TMemoryLog::ChangeLastMark(char* buffer) noexcept {
+ memcpy(buffer, DEFAULT_LAST_MARK, LAST_MARK_SIZE);
+ auto oldMark = AtomicSwap(&LastMarkIsHere, buffer);
+ if (Y_LIKELY(oldMark != nullptr)) {
+ memcpy(oldMark, CLEAR_MARK, LAST_MARK_SIZE);
+ }
+ if (AtomicGet(LastMarkIsHere) != buffer) {
+ memcpy(buffer, CLEAR_MARK, LAST_MARK_SIZE);
+ AtomicBarrier();
+ }
+}
+
+bool MemLogVPrintF(const char* format, va_list params) noexcept {
+ auto logger = TMemoryLog::GetMemoryLogger();
+ if (logger == nullptr) {
+ return false;
+ }
+
+ auto threadId = TMemoryLog::GetTheadId();
+
+ // alignment required by NoCacheMemcpy
+ alignas(TMemoryLog::MemcpyAlignment) char buf[TMemoryLog::MAX_MESSAGE_SIZE];
+ Y_VERIFY(AlignDown(&buf, TMemoryLog::MemcpyAlignment) == &buf);
+
+ int prologSize = snprintf(buf,
+ TMemoryLog::MAX_MESSAGE_SIZE - 2,
+ "TS %020" PRIu64 " TI %020" PRIu64 " ",
+ GetCycleCountFast(),
+ threadId);
+
+ if (Y_UNLIKELY(prologSize < 0)) {
+ return false;
+ }
+ Y_VERIFY((ui32)prologSize <= TMemoryLog::MAX_MESSAGE_SIZE);
+
+ int add = vsnprintf(
+ &buf[prologSize],
+ TMemoryLog::MAX_MESSAGE_SIZE - prologSize - 2,
+ format, params);
+
+ if (Y_UNLIKELY(add < 0)) {
+ return false;
+ }
+ Y_VERIFY(add >= 0);
+ auto totalSize = prologSize + add;
+
+ buf[totalSize++] = '\n';
+ Y_VERIFY((ui32)totalSize <= TMemoryLog::MAX_MESSAGE_SIZE);
+
+ return BareMemLogWrite(buf, totalSize) != nullptr;
+}
diff --git a/library/cpp/actors/memory_log/memlog.h b/library/cpp/actors/memory_log/memlog.h
new file mode 100644
index 0000000000..2aa27272a6
--- /dev/null
+++ b/library/cpp/actors/memory_log/memlog.h
@@ -0,0 +1,211 @@
+#pragma once
+
+#include <library/cpp/threading/queue/mpmc_unordered_ring.h>
+#include <util/generic/string.h>
+#include <util/string/printf.h>
+#include <util/system/datetime.h>
+#include <util/system/thread.h>
+#include <util/system/types.h>
+#include <util/system/atomic.h>
+#include <util/system/align.h>
+#include <util/system/tls.h>
+
+#include <atomic>
+#include <cstdio>
+
+#ifdef _win_
+#include <util/system/winint.h>
+#endif
+
+#ifndef NO_SANITIZE_THREAD
+#define NO_SANITIZE_THREAD
+#if defined(__has_feature)
+#if __has_feature(thread_sanitizer)
+#undef NO_SANITIZE_THREAD
+#define NO_SANITIZE_THREAD __attribute__((no_sanitize_thread))
+#endif
+#endif
+#endif
+
+class TMemoryLog {
+public:
+ static constexpr size_t DEFAULT_TOTAL_SIZE = 10 * 1024 * 1024;
+ static constexpr size_t DEFAULT_GRAIN_SIZE = 1024 * 64;
+ static constexpr size_t MAX_MESSAGE_SIZE = 1024;
+ static constexpr ui16 MAX_GET_BUFFER_TRIES = 4;
+ static constexpr ui16 MemcpyAlignment = 16;
+
+ // search for cb7B68a8A561645
+ static const char DEFAULT_LAST_MARK[16];
+ static const char CLEAR_MARK[16];
+
+ static constexpr size_t LAST_MARK_SIZE = sizeof(DEFAULT_LAST_MARK);
+
+ inline static TMemoryLog* GetMemoryLogger() noexcept {
+ return AtomicGet(MemLogBuffer);
+ }
+
+ void* GetWriteBuffer(size_t amount) noexcept;
+
+ inline static void* GetWriteBufferStatic(size_t amount) noexcept {
+ auto logger = GetMemoryLogger();
+ if (logger == nullptr) {
+ return nullptr;
+ }
+ return logger->GetWriteBuffer(amount);
+ }
+
+ size_t GetGlobalBufferSize() const noexcept {
+ return Buf.GetSize();
+ }
+
+ inline static void CreateMemoryLogBuffer(
+ size_t totalSize = DEFAULT_TOTAL_SIZE,
+ size_t grainSize = DEFAULT_GRAIN_SIZE)
+ Y_COLD {
+ if (AtomicGet(MemLogBuffer) != nullptr) {
+ return;
+ }
+
+ AtomicSet(MemLogBuffer, new TMemoryLog(totalSize, grainSize));
+ }
+
+ static std::atomic<bool> PrintLastMark;
+
+ // buffer must be at least 16 bytes
+ static void ChangeLastMark(char* buffer) noexcept;
+
+ inline static TThread::TId GetTheadId() noexcept {
+ if (LogThreadId == 0) {
+ LogThreadId = TThread::CurrentThreadId();
+ }
+ return LogThreadId;
+ }
+
+private:
+ TMemoryLog(size_t totalSize, size_t grainSize) Y_COLD;
+
+ struct TGrain {
+ TAtomic WritePointer = 0;
+ char Padding[MemcpyAlignment - sizeof(TAtomic)];
+ char Data[];
+ };
+
+ size_t NumberOfCpus;
+ size_t GrainSize;
+ size_t NumberOfGrains;
+ TArrayPtr<TGrain*> ActiveGrains;
+ NThreading::TMPMCUnorderedRing FreeGrains;
+
+ TGrain* GetGrain(size_t grainIndex) const noexcept {
+ return (TGrain*)((char*)GetGlobalBuffer() + GrainSize * grainIndex);
+ }
+
+ class TMMapArea {
+ public:
+ TMMapArea(size_t amount) Y_COLD {
+ MMap(amount);
+ }
+
+ TMMapArea(const TMMapArea&) = delete;
+ TMMapArea& operator=(const TMMapArea& copy) = delete;
+
+ TMMapArea(TMMapArea&& move) Y_COLD {
+ BufPtr = move.BufPtr;
+ Size = move.Size;
+
+ move.BufPtr = nullptr;
+ move.Size = 0;
+ }
+
+ TMMapArea& operator=(TMMapArea&& move) Y_COLD {
+ BufPtr = move.BufPtr;
+ Size = move.Size;
+
+ move.BufPtr = nullptr;
+ move.Size = 0;
+ return *this;
+ }
+
+ void Reset(size_t amount) Y_COLD {
+ MUnmap();
+ MMap(amount);
+ }
+
+ ~TMMapArea() noexcept Y_COLD {
+ MUnmap();
+ }
+
+ size_t GetSize() const noexcept {
+ return Size;
+ }
+
+ void* GetPtr() const noexcept {
+ return BufPtr;
+ }
+
+ private:
+ void* BufPtr;
+ size_t Size;
+#ifdef _win_
+ HANDLE Mapping;
+#endif
+
+ void MMap(size_t amount);
+ void MUnmap();
+ };
+
+ TMMapArea Buf;
+
+ void* GetGlobalBuffer() const noexcept {
+ return Buf.GetPtr();
+ }
+
+ static unsigned GetSelfCpu() noexcept;
+
+ static TMemoryLog* MemLogBuffer;
+ static Y_POD_THREAD(TThread::TId) LogThreadId;
+ static char* LastMarkIsHere;
+};
+
+// it's no use of sanitizing this function
+NO_SANITIZE_THREAD
+char* BareMemLogWrite(
+ const char* begin, size_t msgSize, bool isLast = true) noexcept;
+
+// it's no use of sanitizing this function
+NO_SANITIZE_THREAD
+bool MemLogWrite(
+ const char* begin, size_t msgSize, bool addLF = false) noexcept;
+
+Y_WRAPPER inline bool MemLogWrite(const char* begin, const char* end) noexcept {
+ if (end <= begin) {
+ return false;
+ }
+
+ size_t msgSize = end - begin;
+ return MemLogWrite(begin, msgSize);
+}
+
+template <typename TObj>
+bool MemLogWriteStruct(const TObj* obj) noexcept {
+ auto begin = (const char*)(const void*)obj;
+ return MemLogWrite(begin, begin + sizeof(TObj));
+}
+
+Y_PRINTF_FORMAT(1, 0)
+bool MemLogVPrintF(const char* format, va_list params) noexcept;
+
+Y_PRINTF_FORMAT(1, 2)
+Y_WRAPPER
+inline bool MemLogPrintF(const char* format, ...) noexcept {
+ va_list params;
+ va_start(params, format);
+ auto result = MemLogVPrintF(format, params);
+ va_end(params);
+ return result;
+}
+
+Y_WRAPPER inline bool MemLogWriteNullTerm(const char* str) noexcept {
+ return MemLogWrite(str, strlen(str));
+}
diff --git a/library/cpp/actors/memory_log/mmap.cpp b/library/cpp/actors/memory_log/mmap.cpp
new file mode 100644
index 0000000000..201998d343
--- /dev/null
+++ b/library/cpp/actors/memory_log/mmap.cpp
@@ -0,0 +1,63 @@
+#include "memlog.h"
+
+#if defined(_unix_)
+#include <sys/mman.h>
+#elif defined(_win_)
+#include <util/system/winint.h>
+#else
+#error NO IMPLEMENTATION FOR THE PLATFORM
+#endif
+
+void TMemoryLog::TMMapArea::MMap(size_t amount) {
+ Y_VERIFY(amount > 0);
+
+#if defined(_unix_)
+ constexpr int mmapProt = PROT_READ | PROT_WRITE;
+#if defined(_linux_)
+ constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON | MAP_POPULATE;
+#else
+ constexpr int mmapFlags = MAP_PRIVATE | MAP_ANON;
+#endif
+
+ BufPtr = ::mmap(nullptr, amount, mmapProt, mmapFlags, -1, 0);
+ if (BufPtr == MAP_FAILED) {
+ throw std::bad_alloc();
+ }
+
+#elif defined(_win_)
+ Mapping = ::CreateFileMapping(
+ (HANDLE)-1, nullptr, PAGE_READWRITE, 0, amount, nullptr);
+ if (Mapping == NULL) {
+ throw std::bad_alloc();
+ }
+ BufPtr = ::MapViewOfFile(Mapping, FILE_MAP_WRITE, 0, 0, amount);
+ if (BufPtr == NULL) {
+ throw std::bad_alloc();
+ }
+#endif
+
+ Size = amount;
+}
+
+void TMemoryLog::TMMapArea::MUnmap() {
+ if (BufPtr == nullptr) {
+ return;
+ }
+
+#if defined(_unix_)
+ int result = ::munmap(BufPtr, Size);
+ Y_VERIFY(result == 0);
+
+#elif defined(_win_)
+ BOOL result = ::UnmapViewOfFile(BufPtr);
+ Y_VERIFY(result != 0);
+
+ result = ::CloseHandle(Mapping);
+ Y_VERIFY(result != 0);
+
+ Mapping = 0;
+#endif
+
+ BufPtr = nullptr;
+ Size = 0;
+}
diff --git a/library/cpp/actors/memory_log/ya.make b/library/cpp/actors/memory_log/ya.make
new file mode 100644
index 0000000000..d89d5db4d7
--- /dev/null
+++ b/library/cpp/actors/memory_log/ya.make
@@ -0,0 +1,19 @@
+LIBRARY()
+
+OWNER(
+ agri
+ g:kikimr
+)
+
+SRCS(
+ memlog.cpp
+ memlog.h
+ mmap.cpp
+)
+
+PEERDIR(
+ library/cpp/threading/queue
+ contrib/libs/linuxvdso
+)
+
+END()