aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/memory_log/memlog.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'library/cpp/actors/memory_log/memlog.cpp')
-rw-r--r--library/cpp/actors/memory_log/memlog.cpp367
1 files changed, 367 insertions, 0 deletions
diff --git a/library/cpp/actors/memory_log/memlog.cpp b/library/cpp/actors/memory_log/memlog.cpp
new file mode 100644
index 0000000000..8e6b46727d
--- /dev/null
+++ b/library/cpp/actors/memory_log/memlog.cpp
@@ -0,0 +1,367 @@
+#include "memlog.h"
+
+#include <library/cpp/actors/util/datetime.h>
+
+#include <util/system/info.h>
+#include <util/system/atomic.h>
+#include <util/system/align.h>
+
+#include <contrib/libs/linuxvdso/interface.h>
+
+#if (defined(_i386_) || defined(_x86_64_)) && defined(_linux_)
+#define HAVE_VDSO_GETCPU 1
+#include <contrib/libs/linuxvdso/interface.h>
+static int (*FastGetCpu)(unsigned* cpu, unsigned* node, void* unused);
+#endif
+
+#if defined(_unix_)
+#include <sched.h>
+#elif defined(_win_)
+#include <WinBase.h>
+#else
+#error NO IMPLEMENTATION FOR THE PLATFORM
+#endif
+
+const char TMemoryLog::DEFAULT_LAST_MARK[16] = {
+ 'c',
+ 'b',
+ '7',
+ 'B',
+ '6',
+ '8',
+ 'a',
+ '8',
+ 'A',
+ '5',
+ '6',
+ '1',
+ '6',
+ '4',
+ '5',
+ '\n',
+};
+
+const char TMemoryLog::CLEAR_MARK[16] = {
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ ' ',
+ '\n',
+};
+
+unsigned TMemoryLog::GetSelfCpu() noexcept {
+#if defined(_unix_)
+#if HAVE_VDSO_GETCPU
+ unsigned cpu;
+ if (Y_LIKELY(FastGetCpu != nullptr)) {
+ auto result = FastGetCpu(&cpu, nullptr, nullptr);
+ Y_VERIFY(result == 0);
+ return cpu;
+ } else {
+ return 0;
+ }
+
+#elif defined(_x86_64_) || defined(_i386_)
+
+#define CPUID(func, eax, ebx, ecx, edx) \
+ __asm__ __volatile__( \
+ "cpuid" \
+ : "=a"(eax), "=b"(ebx), "=c"(ecx), "=d"(edx) \
+ : "a"(func));
+
+ int a = 0, b = 0, c = 0, d = 0;
+ CPUID(0x1, a, b, c, d);
+ int acpiID = (b >> 24);
+ return acpiID;
+
+#elif defined(__CNUC__)
+ return sched_getcpu();
+#else
+ return 0;
+#endif
+
+#elif defined(_win_)
+ return GetCurrentProcessorNumber();
+#else
+ return 0;
+#endif
+}
+
+TMemoryLog* TMemoryLog::MemLogBuffer = nullptr;
+Y_POD_THREAD(TThread::TId)
+TMemoryLog::LogThreadId;
+char* TMemoryLog::LastMarkIsHere = nullptr;
+
+std::atomic<bool> TMemoryLog::PrintLastMark(true);
+
+TMemoryLog::TMemoryLog(size_t totalSize, size_t grainSize)
+ : GrainSize(grainSize)
+ , FreeGrains(DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE * 2)
+ , Buf(totalSize)
+{
+ Y_VERIFY(DEFAULT_TOTAL_SIZE % DEFAULT_GRAIN_SIZE == 0);
+ NumberOfGrains = DEFAULT_TOTAL_SIZE / DEFAULT_GRAIN_SIZE;
+
+ for (size_t i = 0; i < NumberOfGrains; ++i) {
+ new (GetGrain(i)) TGrain;
+ }
+
+ NumberOfCpus = NSystemInfo::NumberOfCpus();
+ Y_VERIFY(NumberOfGrains > NumberOfCpus);
+ ActiveGrains.Reset(new TGrain*[NumberOfCpus]);
+ for (size_t i = 0; i < NumberOfCpus; ++i) {
+ ActiveGrains[i] = GetGrain(i);
+ }
+
+ for (size_t i = NumberOfCpus; i < NumberOfGrains; ++i) {
+ FreeGrains.StubbornPush(GetGrain(i));
+ }
+
+#if HAVE_VDSO_GETCPU
+ auto vdsoFunc = (decltype(FastGetCpu))
+ NVdso::Function("__vdso_getcpu", "LINUX_2.6");
+ AtomicSet(FastGetCpu, vdsoFunc);
+#endif
+}
+
+void* TMemoryLog::GetWriteBuffer(size_t amount) noexcept {
+ // alignment required by NoCacheMemcpy
+ amount = AlignUp<size_t>(amount, MemcpyAlignment);
+
+ for (ui16 tries = MAX_GET_BUFFER_TRIES; tries-- > 0;) {
+ auto myCpu = GetSelfCpu();
+
+ TGrain* grain = AtomicGet(ActiveGrains[myCpu]);
+
+ if (grain != nullptr) {
+ auto mine = AtomicGetAndAdd(grain->WritePointer, amount);
+ if (mine + amount <= GrainSize - sizeof(TGrain)) {
+ return &grain->Data[mine];
+ }
+
+ if (!AtomicCas(&ActiveGrains[myCpu], 0, grain)) {
+ continue;
+ }
+
+ FreeGrains.StubbornPush(grain);
+ }
+
+ grain = (TGrain*)FreeGrains.Pop();
+
+ if (grain == nullptr) {
+ return nullptr;
+ }
+
+ grain->WritePointer = 0;
+
+ if (!AtomicCas(&ActiveGrains[myCpu], grain, 0)) {
+ FreeGrains.StubbornPush(grain);
+ continue;
+ }
+ }
+
+ return nullptr;
+}
+
+void ClearAlignedTail(char* tail) noexcept {
+ auto aligned = AlignUp(tail, TMemoryLog::MemcpyAlignment);
+ if (aligned > tail) {
+ memset(tail, 0, aligned - tail);
+ }
+}
+
+#if defined(_x86_64_) || defined(_i386_)
+#include <xmmintrin.h>
+// the main motivation is not poluting CPU cache
+NO_SANITIZE_THREAD
+void NoCacheMemcpy(char* dst, const char* src, size_t size) noexcept {
+ while (size >= sizeof(__m128) * 2) {
+ __m128 a = _mm_load_ps((float*)(src + 0 * sizeof(__m128)));
+ __m128 b = _mm_load_ps((float*)(src + 1 * sizeof(__m128)));
+ _mm_stream_ps((float*)(dst + 0 * sizeof(__m128)), a);
+ _mm_stream_ps((float*)(dst + 1 * sizeof(__m128)), b);
+
+ size -= sizeof(__m128) * 2;
+ src += sizeof(__m128) * 2;
+ dst += sizeof(__m128) * 2;
+ }
+ memcpy(dst, src, size);
+}
+
+NO_SANITIZE_THREAD
+void NoWCacheMemcpy(char* dst, const char* src, size_t size) noexcept {
+ constexpr ui16 ITEMS_COUNT = 1024;
+ alignas(TMemoryLog::MemcpyAlignment) __m128 buf[ITEMS_COUNT];
+ while (size >= sizeof(buf)) {
+ memcpy(&buf, src, sizeof(buf));
+
+ for (ui16 i = 0; i < ITEMS_COUNT; ++i) {
+ _mm_stream_ps((float*)dst, buf[i]);
+ dst += sizeof(__m128);
+ }
+
+ size -= sizeof(buf);
+ src += sizeof(buf);
+ }
+
+ memcpy(&buf, src, size);
+ // no problem to copy few bytes more
+ size = AlignUp(size, sizeof(__m128));
+ for (ui16 i = 0; i < size / sizeof(__m128); ++i) {
+ _mm_stream_ps((float*)dst, buf[i]);
+ dst += sizeof(__m128);
+ }
+}
+
+#endif
+
+NO_SANITIZE_THREAD
+char* BareMemLogWrite(const char* begin, size_t msgSize, bool isLast) noexcept {
+ bool lastMark =
+ isLast && TMemoryLog::PrintLastMark.load(std::memory_order_acquire);
+ size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize;
+
+ char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount);
+ if (buffer == nullptr) {
+ return nullptr;
+ }
+
+#if defined(_x86_64_) || defined(_i386_)
+ if (AlignDown(begin, TMemoryLog::MemcpyAlignment) == begin) {
+ NoCacheMemcpy(buffer, begin, msgSize);
+ } else {
+ NoWCacheMemcpy(buffer, begin, msgSize);
+ }
+#else
+ memcpy(buffer, begin, msgSize);
+#endif
+
+ if (lastMark) {
+ TMemoryLog::ChangeLastMark(buffer + msgSize);
+ }
+
+ ClearAlignedTail(buffer + amount);
+ return buffer;
+}
+
+NO_SANITIZE_THREAD
+bool MemLogWrite(const char* begin, size_t msgSize, bool addLF) noexcept {
+ bool lastMark = TMemoryLog::PrintLastMark.load(std::memory_order_acquire);
+ size_t amount = lastMark ? msgSize + TMemoryLog::LAST_MARK_SIZE : msgSize;
+
+ // Let's construct prolog with timestamp and thread id
+ auto threadId = TMemoryLog::GetTheadId();
+
+ // alignment required by NoCacheMemcpy
+ // check for format for snprintf
+ constexpr size_t prologSize = 48;
+ alignas(TMemoryLog::MemcpyAlignment) char prolog[prologSize + 1];
+ Y_VERIFY(AlignDown(&prolog, TMemoryLog::MemcpyAlignment) == &prolog);
+
+ int snprintfResult = snprintf(prolog, prologSize + 1,
+ "TS %020" PRIu64 " TI %020" PRIu64 " ", GetCycleCountFast(), threadId);
+
+ if (snprintfResult < 0) {
+ return false;
+ }
+ Y_VERIFY(snprintfResult == prologSize);
+
+ amount += prologSize;
+ if (addLF) {
+ ++amount; // add 1 byte for \n at the end of the message
+ }
+
+ char* buffer = (char*)TMemoryLog::GetWriteBufferStatic(amount);
+ if (buffer == nullptr) {
+ return false;
+ }
+
+#if defined(_x86_64_) || defined(_i386_)
+ // warning: copy prolog first to avoid corruption of the message
+ // by prolog tail
+ NoCacheMemcpy(buffer, prolog, prologSize);
+ if (AlignDown(begin + prologSize, TMemoryLog::MemcpyAlignment) == begin + prologSize) {
+ NoCacheMemcpy(buffer + prologSize, begin, msgSize);
+ } else {
+ NoWCacheMemcpy(buffer + prologSize, begin, msgSize);
+ }
+#else
+ memcpy(buffer, prolog, prologSize);
+ memcpy(buffer + prologSize, begin, msgSize);
+#endif
+
+ if (addLF) {
+ buffer[prologSize + msgSize] = '\n';
+ }
+
+ if (lastMark) {
+ TMemoryLog::ChangeLastMark(buffer + prologSize + msgSize + (int)addLF);
+ }
+
+ ClearAlignedTail(buffer + amount);
+ return true;
+}
+
+NO_SANITIZE_THREAD
+void TMemoryLog::ChangeLastMark(char* buffer) noexcept {
+ memcpy(buffer, DEFAULT_LAST_MARK, LAST_MARK_SIZE);
+ auto oldMark = AtomicSwap(&LastMarkIsHere, buffer);
+ if (Y_LIKELY(oldMark != nullptr)) {
+ memcpy(oldMark, CLEAR_MARK, LAST_MARK_SIZE);
+ }
+ if (AtomicGet(LastMarkIsHere) != buffer) {
+ memcpy(buffer, CLEAR_MARK, LAST_MARK_SIZE);
+ AtomicBarrier();
+ }
+}
+
+bool MemLogVPrintF(const char* format, va_list params) noexcept {
+ auto logger = TMemoryLog::GetMemoryLogger();
+ if (logger == nullptr) {
+ return false;
+ }
+
+ auto threadId = TMemoryLog::GetTheadId();
+
+ // alignment required by NoCacheMemcpy
+ alignas(TMemoryLog::MemcpyAlignment) char buf[TMemoryLog::MAX_MESSAGE_SIZE];
+ Y_VERIFY(AlignDown(&buf, TMemoryLog::MemcpyAlignment) == &buf);
+
+ int prologSize = snprintf(buf,
+ TMemoryLog::MAX_MESSAGE_SIZE - 2,
+ "TS %020" PRIu64 " TI %020" PRIu64 " ",
+ GetCycleCountFast(),
+ threadId);
+
+ if (Y_UNLIKELY(prologSize < 0)) {
+ return false;
+ }
+ Y_VERIFY((ui32)prologSize <= TMemoryLog::MAX_MESSAGE_SIZE);
+
+ int add = vsnprintf(
+ &buf[prologSize],
+ TMemoryLog::MAX_MESSAGE_SIZE - prologSize - 2,
+ format, params);
+
+ if (Y_UNLIKELY(add < 0)) {
+ return false;
+ }
+ Y_VERIFY(add >= 0);
+ auto totalSize = prologSize + add;
+
+ buf[totalSize++] = '\n';
+ Y_VERIFY((ui32)totalSize <= TMemoryLog::MAX_MESSAGE_SIZE);
+
+ return BareMemLogWrite(buf, totalSize) != nullptr;
+}