diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-02 01:45:21 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-02 02:42:50 +0300 |
commit | 9c43d58f75cf086b744cf4fe2ae180e8f37e4a0c (patch) | |
tree | 9f88a486917d371d099cd712efd91b4c122d209d /library/cpp | |
parent | 32fb6dda1feb24f9ab69ece5df0cb9ec238ca5e6 (diff) | |
download | ydb-9c43d58f75cf086b744cf4fe2ae180e8f37e4a0c.tar.gz |
Intermediate changes
Diffstat (limited to 'library/cpp')
77 files changed, 9546 insertions, 0 deletions
diff --git a/library/cpp/deprecated/autoarray/README.md b/library/cpp/deprecated/autoarray/README.md new file mode 100644 index 0000000000..1d83147cee --- /dev/null +++ b/library/cpp/deprecated/autoarray/README.md @@ -0,0 +1,3 @@ +Pre-C++11 vector-like container. + +Just use std::vector. If you need to fill your vector with custom-constructed data, use reserve+emplace_back (but make sure that your elements are movable). diff --git a/library/cpp/deprecated/autoarray/autoarray.cpp b/library/cpp/deprecated/autoarray/autoarray.cpp new file mode 100644 index 0000000000..15167f27f6 --- /dev/null +++ b/library/cpp/deprecated/autoarray/autoarray.cpp @@ -0,0 +1 @@ +#include "autoarray.h" diff --git a/library/cpp/deprecated/autoarray/autoarray.h b/library/cpp/deprecated/autoarray/autoarray.h new file mode 100644 index 0000000000..2aa12c5916 --- /dev/null +++ b/library/cpp/deprecated/autoarray/autoarray.h @@ -0,0 +1,264 @@ +#pragma once + +#include <util/system/compat.h> +#include <util/system/yassert.h> +#include <util/system/defaults.h> +#include <util/system/sys_alloc.h> + +#include <util/generic/typetraits.h> +#include <utility> + +#include <new> +#include <util/generic/noncopyable.h> + +struct autoarray_getindex { + autoarray_getindex() = default; +}; + +struct aarr_b0 { + aarr_b0() = default; +}; + +struct aarr_nofill { + aarr_nofill() = default; +}; + +template <typename T> +struct ynd_type_traits { + enum { + empty_destructor = TTypeTraits<T>::IsPod, + }; +}; + +template <class T> +class autoarray : TNonCopyable { +protected: + T* arr; + size_t _size; + +private: + void AllocBuf(size_t siz) { + arr = nullptr; + _size = 0; + if (siz) { + arr = (T*)y_allocate(sizeof(T) * siz); + _size = siz; + } + } + +public: + using value_type = T; + using iterator = T*; + using const_iterator = const T*; + + autoarray() + : arr(nullptr) + , _size(0) + { + } + autoarray(size_t siz) { + AllocBuf(siz); + T* curr = arr; + try { + for (T* end = arr + _size; curr != end; ++curr) + new (curr) T(); + } catch (...) { + for (--curr; curr >= arr; --curr) + curr->~T(); + y_deallocate(arr); + throw; + } + } + template <class A> + explicit autoarray(size_t siz, A& fill) { + AllocBuf(siz); + T* curr = arr; + try { + for (T* end = arr + _size; curr != end; ++curr) + new (curr) T(fill); + } catch (...) { + for (--curr; curr >= arr; --curr) + curr->~T(); + y_deallocate(arr); + throw; + } + } + explicit autoarray(size_t siz, autoarray_getindex) { + AllocBuf(siz); + size_t nCurrent = 0; + try { + for (nCurrent = 0; nCurrent < _size; ++nCurrent) + new (&arr[nCurrent]) T(nCurrent); + } catch (...) { + for (size_t n = 0; n < nCurrent; ++n) + arr[n].~T(); + y_deallocate(arr); + throw; + } + } + explicit autoarray(size_t siz, aarr_b0) { + AllocBuf(siz); + memset(arr, 0, _size * sizeof(T)); + } + explicit autoarray(size_t siz, aarr_nofill) { + AllocBuf(siz); + } + template <class A> + explicit autoarray(const A* fill, size_t siz) { + AllocBuf(siz); + size_t nCurrent = 0; + try { + for (nCurrent = 0; nCurrent < _size; ++nCurrent) + new (&arr[nCurrent]) T(fill[nCurrent]); + } catch (...) { + for (size_t n = 0; n < nCurrent; ++n) + arr[n].~T(); + y_deallocate(arr); + throw; + } + } + template <class A, class B> + explicit autoarray(const A* fill, const B* cfill, size_t siz) { + AllocBuf(siz); + size_t nCurrent = 0; + try { + for (nCurrent = 0; nCurrent < _size; ++nCurrent) + new (&arr[nCurrent]) T(fill[nCurrent], cfill); + } catch (...) { + for (size_t n = 0; n < nCurrent; ++n) + arr[n].~T(); + y_deallocate(arr); + throw; + } + } + template <class A> + explicit autoarray(const A* fill, size_t initsiz, size_t fullsiz) { + AllocBuf(fullsiz); + size_t nCurrent = 0; + try { + for (nCurrent = 0; nCurrent < ((initsiz < _size) ? initsiz : _size); ++nCurrent) + new (&arr[nCurrent]) T(fill[nCurrent]); + for (; nCurrent < _size; ++nCurrent) + new (&arr[nCurrent]) T(); + } catch (...) { + for (size_t n = 0; n < nCurrent; ++n) + arr[n].~T(); + y_deallocate(arr); + throw; + } + } + template <class A> + explicit autoarray(const A* fill, size_t initsiz, size_t fullsiz, const T& dummy) { + AllocBuf(fullsiz); + size_t nCurrent = 0; + try { + for (nCurrent = 0; nCurrent < ((initsiz < _size) ? initsiz : _size); ++nCurrent) + new (&arr[nCurrent]) T(fill[nCurrent]); + for (; nCurrent < _size; ++nCurrent) + new (&arr[nCurrent]) T(dummy); + } catch (...) { + for (size_t n = 0; n < nCurrent; ++n) + arr[n].~T(); + y_deallocate(arr); + throw; + } + } + + template <class... R> + explicit autoarray(size_t siz, R&&... fill) { + AllocBuf(siz); + T* curr = arr; + try { + for (T* end = arr + _size; curr != end; ++curr) + new (curr) T(std::forward<R>(fill)...); + } catch (...) { + for (--curr; curr >= arr; --curr) + curr->~T(); + y_deallocate(arr); + throw; + } + } + ~autoarray() { + if (_size) { + if (!ynd_type_traits<T>::empty_destructor) + for (T *curr = arr, *end = arr + _size; curr != end; ++curr) + curr->~T(); + y_deallocate(arr); + } + } + T& operator[](size_t pos) { + Y_ASSERT(pos < _size); + return arr[pos]; + } + const T& operator[](size_t pos) const { + Y_ASSERT(pos < _size); + return arr[pos]; + } + size_t size() const { + return _size; + } + void swap(autoarray& with) { + T* tmp_arr = arr; + size_t tmp_size = _size; + arr = with.arr; + _size = with._size; + with.arr = tmp_arr; + with._size = tmp_size; + } + void resize(size_t siz) { + autoarray<T> tmp(arr, _size, siz); + swap(tmp); + } + void resize(size_t siz, const T& dummy) { + autoarray<T> tmp(arr, _size, siz, dummy); + swap(tmp); + } + T* rawpointer() { + return arr; + } + const T* operator~() const { + return arr; + } + T* begin() { + return arr; + } + T* end() { + return arr + _size; + } + T& back() { + Y_ASSERT(_size); + return arr[_size - 1]; + } + bool empty() const { + return !_size; + } + bool operator!() const { + return !_size; + } + size_t operator+() const { + return _size; + } + const T* begin() const { + return arr; + } + const T* end() const { + return arr + _size; + } + const T& back() const { + Y_ASSERT(_size); + return arr[_size - 1]; + } + //operator T*() { return arr; } +}; + +template <class T> +inline bool operator==(const autoarray<T>& a, const autoarray<T>& b) { + size_t count = a.size(); + if (count != b.size()) + return false; + for (size_t i = 0; i < count; ++i) { + if (a[i] != b[i]) + return false; + } + return true; +} diff --git a/library/cpp/deprecated/autoarray/ya.make b/library/cpp/deprecated/autoarray/ya.make new file mode 100644 index 0000000000..4b055f8c29 --- /dev/null +++ b/library/cpp/deprecated/autoarray/ya.make @@ -0,0 +1,7 @@ +LIBRARY() + +SRCS( + autoarray.cpp +) + +END() diff --git a/library/cpp/deprecated/fgood/README.md b/library/cpp/deprecated/fgood/README.md new file mode 100644 index 0000000000..4f66289657 --- /dev/null +++ b/library/cpp/deprecated/fgood/README.md @@ -0,0 +1,15 @@ +Some ancient wrappers on top of FILE*, and some string manupulation functions. + +Alternatives are as follows. + +For TFILEPtr. Use TIFStream or TOFStream if you need IO. For some rare use cases a TFileMap might also do. + +For fput/fget/getline. Use streams API. + +For struct ffb and struct prnstr. Just don't use them. Even if you can figure out what they do. + +For sf family of functions and TLineSplitter. Just use Split* from util/string/split.h + +For TSFReader. Use TMapTsvFile. + +For read_or_die family of functions. Use streams API. diff --git a/library/cpp/deprecated/fgood/ffb.cpp b/library/cpp/deprecated/fgood/ffb.cpp new file mode 100644 index 0000000000..aa9da861a6 --- /dev/null +++ b/library/cpp/deprecated/fgood/ffb.cpp @@ -0,0 +1,407 @@ +#include "ffb.h" + +#include <util/string/util.h> // str_spn +#include <util/system/compat.h> +#include <util/generic/yexception.h> + +#include <cstdio> +#include <algorithm> + +#include <ctype.h> + +#ifdef _win_ +#include <io.h> +#else +#include <unistd.h> +#endif + +ffb::ffb(FILE* file) + : TFILEPtr(file) +{ + if (file && !isatty(fileno(file)) && BUFSIZ < 512 * 1024) + setvbuf(file, nullptr, _IOFBF, 512 * 1024); +} + +void ffb::operator=(FILE* f) { + TFILEPtr::operator=(f); + if (f && !isatty(fileno(f)) && BUFSIZ < 512 * 1024) + setvbuf(f, nullptr, _IOFBF, 512 * 1024); +} + +void ffb::open(const char* name, const char* mode) { + TFILEPtr::open(name, mode); + if (!isatty(fileno(*this)) && BUFSIZ < 512 * 1024) + setvbuf(*this, nullptr, _IOFBF, 512 * 1024); +} + +int sf(char** fb, char* buf) { //don't want to call sf(fb, buf, 32) + if (!(*buf && *buf != 10)) { + *fb = nullptr; + return 0; + } + int n = 1; + fb[0] = buf; + while (*buf && *buf != 10 && n < 31) { + if (*buf == '\t') { + *buf++ = 0; + fb[n++] = buf; + continue; + } + buf++; + } + if (*buf == 10 && buf[-1] == 13) + buf[-1] = 0; + *buf = 0; + fb[n] = nullptr; + return n; +} + +int sf(char** fb, char* buf, size_t fb_sz) { + if (!(*buf && *buf != 10)) { + *fb = nullptr; + return 0; + } + fb_sz--; + int n = 1; + fb[0] = buf; + while (*buf && *buf != 10 && n < (int)fb_sz) { + if (*buf == '\t') { + *buf++ = 0; + fb[n++] = buf; + continue; + } + buf++; + } + if (*buf == 10 && buf[-1] == 13) + buf[-1] = 0; + *buf = 0; + fb[n] = nullptr; + return n; +} + +inline int sf_blank(char** fb, char* buf, size_t fb_sz) { + while (isspace((ui8)*buf)) + buf++; + if (!*buf) { + *fb = nullptr; + return 0; + } + fb_sz--; + int n = 1; + fb[0] = buf; + while (*buf && *buf != 10 && n < (int)fb_sz) { + if (isspace((ui8)*buf)) { + *buf++ = 0; + while (isspace((ui8)*buf)) + buf++; + if (*buf) + fb[n++] = buf; + continue; + } + buf++; + } + if (*buf == 10 && buf[-1] == 13) + buf[-1] = 0; + *buf = 0; + fb[n] = nullptr; + return n; +} + +int sf(char fs, char** fb, char* buf, size_t fb_sz) { + if (fs == ' ') + return sf_blank(fb, buf, fb_sz); + while (*buf == fs) + buf++; + if (!(*buf && *buf != 10)) { + *fb = nullptr; + return 0; + } + fb_sz--; + int n = 1; + fb[0] = buf; + while (*buf && *buf != 10 && n < (int)fb_sz) { + if (*buf == fs) { + *buf++ = 0; + while (*buf == fs) + buf++; + fb[n++] = buf; + continue; + } + buf++; + } + if (*buf == 10 && buf[-1] == 13) + buf[-1] = 0; + *buf = 0; + fb[n] = nullptr; + return n; +} + +int sf(const char* fs, char** fb, char* buf, size_t fb_sz) { + if (!(*buf && *buf != 10)) { + *fb = nullptr; + return 0; + } + int fs_len = strlen(fs); + fb_sz--; + int n = 1; + fb[0] = buf; + while (*buf && *buf != 10 && n < (int)fb_sz) { + if (*buf == *fs && !strncmp(buf + 1, fs + 1, fs_len - 1)) { + *buf = 0; + buf += fs_len; + fb[n++] = buf; + continue; + } + buf++; + } + if (*buf == 10 && buf[-1] == 13) + buf[-1] = 0; + *buf = 0; + fb[n] = nullptr; + return n; +} + +inline bool is_end(const char* p) { + return !p || !p[0]; +} + +int sf(const char* seps, char* buf, char** fb, size_t fb_sz) { + if (fb_sz < 1 || is_end(buf)) { + *fb = nullptr; + return 0; + } + str_spn sseps(seps); + fb[0] = nullptr; + int n = 0; + // skip leading delimeters + buf = sseps.cbrk(buf); + if (is_end(buf)) + return 0; + // store fields + while (n < (int)fb_sz) { + fb[n++] = buf; + // find delimeters + buf = sseps.brk(buf + 1); + if (is_end(buf)) + break; + *buf = 0; + // skip delimiters + buf = sseps.cbrk(buf + 1); + if (is_end(buf)) + break; + } + fb[n] = nullptr; + return n; +} + +void TLineSplitter::operator()(char* p, TVector<char*>& fields) const { + if (!p || !*p) + return; + char* q = p; + while (1) { + p = Sep.brk(p); + if (q && (p - q || !SkipEmpty())) + fields.push_back(q); + q = nullptr; + if (!*p) + break; + if (SepStrLen == 1 || (SepStrLen > 1 && !strncmp(p + 1, SepStr + 1, SepStrLen - 1))) { + *p = 0; + p += SepStrLen; + q = p; + } else + p++; + } +} + +void TLineSplitter::operator()(const char* p, TVector<std::pair<const char*, size_t>>& fields) const { + if (!p || !*p) + return; + const char* q = p; + while (1) { + p = Sep.brk(p); + if (q && (p - q || !SkipEmpty())) + fields.push_back(std::make_pair(q, p - q)); + q = nullptr; + if (!*p) + break; + if (SepStrLen == 1 || (SepStrLen > 1 && !strncmp(p + 1, SepStr + 1, SepStrLen - 1))) { + p += SepStrLen; + q = p; + } else + p++; + } +} + +TSFReader::TSFReader(const char* fname, char sep, i32 nfrq) // if sep == ' ' isspace will be imitated (for compat) + : Split(str_spn(sep == ' ' ? "\t\n\v\f\r " : TString(1, sep).data()), sep == ' ') + , OpenPipe(false) +{ + Open(fname, nfrq); +} + +TSFReader::TSFReader(const char* fname, const char* sep, i32 nfrq) + : Split(sep, false) + , OpenPipe(false) +{ + Open(fname, nfrq); +} + +TSFReader::TSFReader(const char* fname, const TLineSplitter& spl, i32 nfrq) + : Split(spl) + , OpenPipe(false) +{ + Open(fname, nfrq); +} + +void TSFReader::Open(const char* fname, i32 nfrq, size_t vbuf_size) { + FieldsRequired = nfrq; + NF = NR = 0; + + if (IsOpen()) + File.close(); + + if (!fname) + return; + + if (!strcmp(fname, "/dev/stdin")) { + File.assign(stdin, "/dev/stdin"); + } else { + if (OpenPipe) + File.popen(fname, "r"); + else + File.open(fname, "r"); + } + OpenPipe = false; + if (!isatty(fileno(File))) + setvbuf(File, nullptr, _IOFBF, vbuf_size); +} + +void TSFReader::Popen(const char* pname, i32 nfrq, size_t vbuf_size) { + OpenPipe = true; + Open(pname, nfrq, vbuf_size); +} + +bool TSFReader::NextLine(segmented_string_pool* pool) { + size_t line_len = 0; + +#ifdef __FreeBSD__ + char* ptr = fgetln(File, &line_len); + if (!ptr) + return false; + if (!line_len || ptr[line_len - 1] != '\n') { // last line w/o newline + Buf.AssignNoAlias(ptr, line_len); + ptr = Buf.begin(); + } else { + // can safely replace newline with \0 + ptr[line_len - 1] = 0; + --line_len; + } +#else + if (!getline(File, Buf)) + return false; + char* ptr = Buf.begin(); + line_len = Buf.size(); +#endif + if (line_len && ptr[line_len - 1] == '\r') + ptr[line_len - 1] = 0; + + if (pool) { + char* nptr = pool->append(ptr); + Y_ASSERT(!strcmp(ptr, nptr)); + ptr = nptr; + } + + ++NR; + Fields.clear(); + Split(ptr, Fields); + NF = Fields.size(); + + if (FieldsRequired != -1 && FieldsRequired != (int)NF) + ythrow yexception() << File.name() << " line " << NR << ": " << NF << " fields, expected " << FieldsRequired; + + return true; +} + +int prnstr::f(const char* c, ...) { + va_list params; + int n = asize - pos, k; + va_start(params, c); + while ((k = vsnprintf(buf + pos, n, c, params)) >= n) { + n += asize, asize *= 2; + while (k + pos >= n) + n += asize, asize *= 2; + char* t = new char[asize]; + memcpy(t, buf, pos); + delete[] buf; + buf = t; + va_end(params); + va_start(params, c); + } + pos += k; + va_end(params); + return k; +} +int prnstr::s(const char* c, size_t k) { + if (!c) + return 0; + size_t n = asize - pos; + if (k >= n) { + n += asize, asize *= 2; + while (k + pos >= n) + n += asize, asize *= 2; + char* t = new char[asize]; + memcpy(t, buf, pos); + delete[] buf; + buf = t; + } + memcpy(buf + pos, c, k); + pos += k; + buf[pos] = 0; + return k; +} +void prnstr::clear() { + pos = 0; + if (asize > 32768) { + asize = 32768; + delete[] buf; + buf = new char[asize]; + } +} + +void prnstr::swap(prnstr& w) { + std::swap(buf, w.buf); + std::swap(pos, w.pos); + std::swap(asize, w.asize); +} + +FILE* read_or_die(const char* fname) { + FILE* f = fopen(fname, "rb"); + if (!f) + err(1, "%s", fname); + return f; +} +FILE* write_or_die(const char* fname) { + FILE* f = fopen(fname, "wb"); + if (!f) + err(1, "%s", fname); + return f; +} +FILE* fopen_or_die(const char* fname, const char* mode) { + FILE* f = fopen(fname, mode); + if (!f) + err(1, "%s (mode '%s')", fname, mode); + return f; +} + +FILE* fopen_chk(const char* fname, const char* mode) { + FILE* f = fopen(fname, mode); + if (!f) + ythrow yexception() << fname << " (mode '" << mode << "'): " << LastSystemErrorText(); + return f; +} + +void fclose_chk(FILE* f, const char* fname) { + if (fclose(f)) + ythrow yexception() << "file " << fname << ": " << LastSystemErrorText(); +} diff --git a/library/cpp/deprecated/fgood/ffb.h b/library/cpp/deprecated/fgood/ffb.h new file mode 100644 index 0000000000..ca229eb65a --- /dev/null +++ b/library/cpp/deprecated/fgood/ffb.h @@ -0,0 +1,264 @@ +#pragma once + +#include "fgood.h" + +#include <util/string/util.h> // str_spn +#include <util/string/split.h> // str_spn +#include <util/memory/segmented_string_pool.h> +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/generic/noncopyable.h> + +#include <utility> + +#include <cstdarg> +#include <cstring> + +struct ffb: public TFILEPtr { + ffb() { + } + ffb(FILE* file); + ffb(const char* name, const char* mode) { + open(name, mode); + } + void operator=(FILE* f); // take ownership + void open(const char* name, const char* mode); + int f(const char* c, ...) { + va_list args; + va_start(args, c); + return vfprintf(*this, c, args); + } + void s(const char* c) { + fsput(c, strlen(c)); + } + void b(const void* cc, int n) { + fsput((const char*)cc, n); + } + void B(const void* cc, int N) { + fsput((const char*)cc, N); + } + void c(char c) { + fputc(c); + } + void cbe(wchar16 c) { // big endian utf-16 + fputc(char(c >> 8)); //Hi8 + fputc(char(c & 255)); //Lo8 + } + void sbe(const wchar16* c) { + for (; *c; c++) + cbe(*c); + } + void fclose() { + close(); + } +}; + +// split fields of tab-delimited line of text +// here and below fb actual size must be fb_sz + 1 to allow fb[fb_sz] be zero +int sf(char** fb, char* buf, size_t fb_sz); +int sf(char** fb, char* buf /* fb_sz == 32 */); + +// split fields of char-delimited line of text +// Achtung: delim = ' ' imitates awk: initial separators are skipped, +// repeated seps treated as one, all chars less than ' ' treated as separators. +int sf(char fs, char** fb, char* buf, size_t fb_sz = 32); + +// split fields of string-delimited line of text (fs is NOT a regexp) +// (usually fs is "@@") +int sf(const char* fs, char** fb, char* buf, size_t fb_sz = 32); + +// split fields of char-delimited line of text, set of char-separators is given +// Achtung: repeated seps treated as one, initial seps are skipped +// newlines are NOT ignored. +int sf(const char* seps, char* buf, char** fb, size_t fb_sz = 32); + +inline char* chomp(char* buf) { + char* c = buf + strlen(buf); + if (c > buf && c[-1] == '\n') { + *--c = 0; +#ifdef _win32_ + if (c > buf && c[-1] == '\r') + *--c = 0; +#endif + } + return buf; +} + +inline char* chomp_cr(char* buf) { + char* c = buf + strlen(buf); + if (c > buf && c[-1] == '\n') + *--c = 0; + if (c > buf && c[-1] == '\r') + *--c = 0; + return buf; +} + +class TLineSplitter { +protected: + enum { // Default: Split string by SepStr + SplitByAnySep = 1, // Split string by Sep + NoEmptyFields = 2 // Skip all empty fields between separators + }; + +private: + ui32 Flags; + const str_spn Sep; // collection of separators + const char* SepStr; // pointer exact string to separate by + size_t SepStrLen; // length of separator string + +public: + TLineSplitter(const char* sep, bool noEmpty) + : Flags(noEmpty ? NoEmptyFields : 0) + , Sep(TString(sep, 1).data()) + , SepStr(sep) + , SepStrLen(strlen(sep)) + { + } + TLineSplitter(const str_spn& sep, bool noEmpty = false) + : Flags(SplitByAnySep | (noEmpty ? NoEmptyFields : 0)) + , Sep(sep) + , SepStr(nullptr) + , SepStrLen(1) + { + } + bool AnySep() const { + return Flags & SplitByAnySep; + } + bool SkipEmpty() const { + return Flags & NoEmptyFields; + } + /// Separates string onto tokens + /// Expecting a zero-terminated string + /// By default returns empty fields between sequential separators + void operator()(char* p, TVector<char*>& fields) const; + /// Same, but for const string - fills vector of pairs (pointer, length) + void operator()(const char* p, TVector<std::pair<const char*, size_t>>& fields) const; +}; + +/** + * Use library/cpp/map_text_file/map_tsv_file.h instead. + */ +class TSFReader { + TString Buf; // buffer used for non-'\n'-terminated string and for non-freebsd work + TLineSplitter Split; + TVector<char*> Fields; + size_t NF; // Fields.size() + size_t NR; + + TFILEPtr File; + + bool OpenPipe; // internal flag that turns open() to popen() + + i32 FieldsRequired; // if != -1, != nf, terminate program + +public: + // char separator + // Achtung: delim = ' ' imitates awk: initial separators are skipped, + // all chars less than ' ' treated as separators. + TSFReader(const char* fname = nullptr, char sep = '\t', i32 nf_reqired = -1); + // exact string separator + TSFReader(const char* fname, const char* sep, i32 nf_reqired = -1); + // fully customizable + TSFReader(const char* fname, const TLineSplitter& spl, i32 nf_reqired = -1); + + void Open(const char* fname, i32 nf_reqired = -1, size_t vbufsize = 1u << 21); // use "/dev/stdin" for stdin + void Popen(const char* pname, i32 nf_reqired = -1, size_t vbufsize = 1u << 21); + + bool NextLine(segmented_string_pool* pool = nullptr); + + bool IsOpen() const { + return (FILE*)File != nullptr; + } + bool IsEof() const { + return feof(File); + } + void Close() { + File.close(); + } + void Rewind() { + File.seek(0, SEEK_SET); + } + void Seek(i64 offset, int mode = SEEK_SET) { + File.seek(offset, mode); + } + i64 Tell() const { + return ftell(File); + } + char*& operator[](size_t ind) { + //if (ind >= NF) + // throw yexception("Can't return reference to unexisting field %" PRISZT, ind); + return Fields[ind]; + } + const char* operator[](size_t ind) const { + if (ind >= NF) + return nullptr; + return Fields[ind]; + } + operator int() const { // note: empty input line makes 0 fields + return (int)NF; + } + const char* Name() const { + return File.name().data(); + } + size_t Line() const { + return NR; + } + const TVector<char*>& GetFields() const { + return Fields; + } +}; + +struct prnstr { + char* buf; + int pos; + int asize; + prnstr() + : pos(0) + { + asize = 32; + buf = new char[asize]; + } + explicit prnstr(int asz) + : pos(0) + { + asize = asz; + buf = new char[asize]; + } + int f(const char* c, ...); + int s(const char* c1, const char* c2); + int s(const char* c1, const char* c2, const char* c3); + int s(const char* c, size_t len); + //int s(const char *c); + int s(const char* c) { + return c ? s(c, strlen(c)) : 0; + } + int s(const TString& c); + int s_htmesc(const char* c, bool enc_utf = false); + int s_htmesc_w(const char* c); + int c(char c); + int cu(wchar32 c); //for utf-8 + void restart() { + *buf = 0; + pos = 0; + } + const char* operator~() const { + return buf; + } + int operator+() const { + return pos; + } + ~prnstr() { + delete[] buf; + } + void clear(); + void swap(prnstr& w); +}; + +// functions that terminate program upon failure +FILE* read_or_die(const char* fname); +FILE* write_or_die(const char* fname); +FILE* fopen_or_die(const char* fname, const char* mode); + +// functions that throw upon failure +FILE* fopen_chk(const char* fname, const char* mode); +void fclose_chk(FILE* f, const char* fname_dbg); diff --git a/library/cpp/deprecated/fgood/fgood.cpp b/library/cpp/deprecated/fgood/fgood.cpp new file mode 100644 index 0000000000..5d4725bfae --- /dev/null +++ b/library/cpp/deprecated/fgood/fgood.cpp @@ -0,0 +1,70 @@ +#include "fgood.h" + +#include <util/generic/cast.h> +#include <util/string/cast.h> +#include <util/system/fstat.h> + +#ifdef _win32_ +#include <io.h> +#endif + +i64 TFILEPtr::length() const { +#ifdef _win32_ + FHANDLE fd = (FHANDLE)_get_osfhandle(fileno(m_file)); +#else + FHANDLE fd = fileno(m_file); +#endif + i64 rv = GetFileLength(fd); + if (rv < 0) + ythrow yexception() << "TFILEPtr::length() " << Name.data() << ": " << LastSystemErrorText(); + return rv; +} + +FILE* OpenFILEOrFail(const TString& name, const char* mode) { + FILE* res = ::fopen(name.data(), mode); + if (!res) { + ythrow yexception() << "can't open \'" << name << "\' with mode \'" << mode << "\': " << LastSystemErrorText(); + } + return res; +} + +void TFILECloser::Destroy(FILE* file) { + ::fclose(file); +} + +#ifdef _freebsd_ // fgetln +#define getline getline_alt_4test +#endif // _freebsd_ + +bool getline(TFILEPtr& f, TString& s) { + char buf[4096]; + char* buf_ptr; + if (s.capacity() > sizeof(buf)) { + s.resize(s.capacity()); + if ((buf_ptr = fgets(s.begin(), IntegerCast<int>(s.capacity()), f)) == nullptr) + return false; + } else { + if ((buf_ptr = fgets(buf, sizeof(buf), f)) == nullptr) + return false; + } + size_t buf_len = strlen(buf_ptr); + bool line_complete = buf_len && buf_ptr[buf_len - 1] == '\n'; + if (line_complete) + buf_len--; + if (buf_ptr == s.begin()) + s.resize(buf_len); + else + s.AssignNoAlias(buf, buf_len); + if (line_complete) + return true; + while (fgets(buf, sizeof(buf), f)) { + size_t buf_len2 = strlen(buf); + if (buf_len2 && buf[buf_len2 - 1] == '\n') { + buf[buf_len2 - 1] = 0; + s.append(buf, buf_len2 - 1); + return true; + } + s.append(buf, buf_len2); + } + return true; +} diff --git a/library/cpp/deprecated/fgood/fgood.h b/library/cpp/deprecated/fgood/fgood.h new file mode 100644 index 0000000000..0aaf910c0f --- /dev/null +++ b/library/cpp/deprecated/fgood/fgood.h @@ -0,0 +1,328 @@ +#pragma once + +#include <util/system/yassert.h> +#include <util/system/defaults.h> +#include <util/generic/string.h> +#include <util/generic/yexception.h> +#include <util/generic/ptr.h> + +#include "fput.h" + +#include <cstdio> + +#include <fcntl.h> + +#ifdef _unix_ +extern "C" int __ungetc(int, FILE*); +#endif + +#if (!defined(__FreeBSD__) && !defined(__linux__) && !defined(_darwin_) && !defined(_cygwin_)) || defined(_bionic_) +#define feof_unlocked(_stream) feof(_stream) +#define ferror_unlocked(_stream) ferror(_stream) +#endif + +#ifndef _unix_ +#if defined(_MSC_VER) && (_MSC_VER < 1900) +#define getc_unlocked(_stream) (--(_stream)->_cnt >= 0 ? 0xff & *(_stream)->_ptr++ : _filbuf(_stream)) +#define putc_unlocked(_c, _stream) (--(_stream)->_cnt >= 0 ? 0xff & (*(_stream)->_ptr++ = (char)(_c)) : _flsbuf((_c), (_stream))) +#else +#define getc_unlocked(_stream) getc(_stream) +#define putc_unlocked(_c, _stream) putc(_c, _stream) +#endif +#endif + +inline bool fgood(FILE* f) { + return !feof_unlocked(f) && !ferror_unlocked(f); +} + +#ifdef _win32_ +// These functions will work only with static MSVC runtime linkage. For dynamic linkage, +// fseeki64.c and ftelli64.c from CRT sources should be included in project +extern "C" int __cdecl _fseeki64(FILE*, __int64, int); +extern "C" __int64 __cdecl _ftelli64(FILE*); + +inline i64 ftello(FILE* stream) { + return _ftelli64(stream); +} + +inline int fseeko(FILE* stream, i64 offset, int origin) { + return _fseeki64(stream, offset, origin); +} +#endif + +class TFILEPtr { +private: + enum { SHOULD_CLOSE = 1, + IS_PIPE = 2 }; + FILE* m_file; + int m_Flags; + TString Name; + +public: + TFILEPtr() noexcept { + m_file = nullptr; + m_Flags = 0; + } + TFILEPtr(const TString& name, const char* mode) { + m_file = nullptr; + m_Flags = 0; + open(name, mode); + } + TFILEPtr(const TFILEPtr& src) noexcept { + m_file = src.m_file; + m_Flags = 0; + } + TFILEPtr& operator=(const TFILEPtr& src) { + if (src.m_file != m_file) { + close(); + m_file = src.m_file; + m_Flags = 0; + } + return *this; + } + explicit TFILEPtr(FILE* f) noexcept { // take ownership + m_file = f; + m_Flags = SHOULD_CLOSE; + } + TFILEPtr& operator=(FILE* f) { // take ownership + if (f != m_file) { + close(); + m_file = f; + m_Flags = SHOULD_CLOSE; + } + return *this; + } + const TString& name() const { + return Name; + } + operator FILE*() const noexcept { + return m_file; + } + FILE* operator->() const noexcept { + return m_file; + } + bool operator!() const noexcept { + return m_file == nullptr; + } + bool operator!=(FILE* f) const noexcept { + return m_file != f; + } + bool operator==(FILE* f) const noexcept { + return m_file == f; + } + ~TFILEPtr() { + close(); + } + void Y_PRINTF_FORMAT(2, 3) check(const char* message, ...) const { + if (Y_UNLIKELY(!fgood(m_file))) { + va_list args; + va_start(args, message); + char buf[512]; + vsnprintf(buf, 512, message, args); + // XXX: errno is undefined here + ythrow yexception() << buf << ": " << LastSystemErrorText() << ", " << Name.data() << " at offset " << (i64)ftell(); + } + } + TFILEPtr& assign(FILE* f, const char* name = nullptr) { // take ownership and have a name + *this = f; + if (name) + Name = name; + return *this; + } + void open(const TString& name, const char* mode) { + Y_ASSERT(!name.empty()); + Y_ASSERT(m_file == nullptr); + m_file = ::fopen(name.data(), mode); + if (!m_file) + ythrow yexception() << "can't open \'" << name << "\' with mode \'" << mode << "\': " << LastSystemErrorText(); + m_Flags = SHOULD_CLOSE; + Name = name; + } + void popen(const TString& command, const char* mode) { + Y_ASSERT(!command.empty()); + Y_ASSERT(m_file == nullptr); + m_file = ::popen(command.data(), mode); + if (!m_file) + ythrow yexception() << "can't execute \'" << command << "\' with mode \'" << mode << "\': " << LastSystemErrorText(); + m_Flags = IS_PIPE | SHOULD_CLOSE; + Name = command; + } + void close() { + if (m_file != nullptr && (m_Flags & SHOULD_CLOSE)) { + if ((m_Flags & IS_PIPE) ? ::pclose(m_file) : ::fclose(m_file)) { + m_file = nullptr; + m_Flags = 0; + if (!UncaughtException()) + ythrow yexception() << "can't close file " << Name.data() << ": " << LastSystemErrorText(); + } + } + m_file = nullptr; + m_Flags = 0; + Name.clear(); + } + size_t write(const void* buffer, size_t size, size_t count) const { + Y_ASSERT(m_file != nullptr); + size_t r = ::fwrite(buffer, size, count, m_file); + check("can't write %lu bytes", (unsigned long)size * count); + return r; + } + size_t read(void* buffer, size_t size, size_t count) const { + Y_ASSERT(m_file != nullptr); + size_t r = ::fread(buffer, size, count, m_file); + if (ferror_unlocked(m_file)) + ythrow yexception() << "can't read " << (unsigned long)size * count << " bytes: " << LastSystemErrorText() << ", " << Name.data() << " at offset " << (i64)ftell(); + return r; + } + char* fgets(char* buffer, int size) const { + Y_ASSERT(m_file != nullptr); + char* r = ::fgets(buffer, size, m_file); + if (ferror_unlocked(m_file)) + ythrow yexception() << "can't read string of maximum size " << size << ": " << LastSystemErrorText() << ", " << Name.data() << " at offset " << (i64)ftell(); + return r; + } + void Y_PRINTF_FORMAT(2, 3) fprintf(const char* format, ...) { + Y_ASSERT(m_file != nullptr); + va_list args; + va_start(args, format); + vfprintf(m_file, format, args); + check("can't write"); + } + void seek(i64 offset, int origin) const { + Y_ASSERT(m_file != nullptr); +#if defined(_unix_) || defined(_win32_) + if (fseeko(m_file, offset, origin) != 0) +#else + Y_ASSERT(offset == (i64)(i32)offset); + if (::fseek(m_file, (long)offset, origin) != 0) +#endif + ythrow yexception() << "can't seek " << Name.data() << " by " << offset << ": " << LastSystemErrorText(); + } + i64 length() const; // uses various system headers -> in fileptr.cpp + + void setDirect() const { +#if !defined(_win_) && !defined(_darwin_) + if (!m_file) + ythrow yexception() << "file not open"; + if (fcntl(fileno(m_file), F_SETFL, O_DIRECT) == -1) + ythrow yexception() << "Cannot set O_DIRECT flag"; +#endif + } + + // for convenience + + i64 ftell() const noexcept { +#if defined(_unix_) || defined(_win32_) + return ftello(m_file); +#else + return ftell(m_file); +#endif + } + bool eof() const noexcept { + Y_ASSERT(m_file != nullptr); + return feof_unlocked(m_file) != 0; + } + int fputc(int c) { + Y_ASSERT(m_file != nullptr); + return putc_unlocked(c, m_file); + } + size_t fputs(const char* buffer) const { + return write(buffer, strlen(buffer), 1); + } + int fgetc() { + Y_ASSERT(m_file != nullptr); + return getc_unlocked(m_file); + } + int ungetc(int c) { + Y_ASSERT(m_file != nullptr); + return ::ungetc(c, m_file); + } + template <class T> + size_t fput(const T& a) { + Y_ASSERT(m_file != nullptr); + return ::fput(m_file, a); + } + template <class T> + size_t fget(T& a) { + Y_ASSERT(m_file != nullptr); + return ::fget(m_file, a); + } + size_t fsput(const char* s, size_t l) { + Y_ASSERT(m_file != nullptr); + return ::fsput(m_file, s, l); + } + size_t fsget(char* s, size_t l) { + Y_ASSERT(m_file != nullptr); + return ::fsget(m_file, s, l); + } + + void fflush() { + ::fflush(m_file); + } + + /* This block contains some TFile/TStream - compatible names */ + size_t Read(void* bufferIn, size_t numBytes) { + size_t r = fsget((char*)bufferIn, numBytes); + if (Y_UNLIKELY(ferror_unlocked(m_file))) + ythrow yexception() << "can't read " << numBytes << " bytes: " << LastSystemErrorText() << ", " << Name << " at offset " << (i64)ftell(); + return r; + } + void Write(const void* buffer, size_t numBytes) { + write(buffer, 1, numBytes); + } + i64 Seek(i64 offset, int origin /*SeekDir*/) { + seek(offset, origin); + return ftell(); + } + i64 GetPosition() const noexcept { + return ftell(); + } + i64 GetLength() const noexcept { + return length(); + } + bool ReadLine(TString& st); + + /* Similar to TAutoPtr::Release - return pointer and forget about it. */ + FILE* Release() noexcept { + FILE* result = m_file; + m_file = nullptr; + m_Flags = 0; + Name.clear(); + return result; + } +}; + +inline void fclose(TFILEPtr& F) { + F.close(); +} + +inline void fseek(const TFILEPtr& F, i64 offset, int whence) { + F.seek(offset, whence); +} + +#ifdef _freebsd_ // fgetln +inline bool getline(TFILEPtr& f, TString& s) { + size_t len; + char* buf = fgetln(f, &len); + if (!buf) + return false; + if (len && buf[len - 1] == '\n') + len--; + s.AssignNoAlias(buf, len); + return true; +} +#else +bool getline(TFILEPtr& f, TString& s); +#endif //_freebsd_ + +inline bool TFILEPtr::ReadLine(TString& st) { + return getline(*this, st); +} + +FILE* OpenFILEOrFail(const TString& name, const char* mode); + +//Should be used with THolder +struct TFILECloser { + static void Destroy(FILE* file); +}; + +using TFILEHolder = THolder<FILE, TFILECloser>; diff --git a/library/cpp/deprecated/fgood/fput.h b/library/cpp/deprecated/fgood/fput.h new file mode 100644 index 0000000000..690b06332d --- /dev/null +++ b/library/cpp/deprecated/fgood/fput.h @@ -0,0 +1,79 @@ +#pragma once + +#include <util/system/defaults.h> +#include <util/system/valgrind.h> + +#include <cstdio> + +#ifdef __FreeBSD__ +#include <cstring> + +template <class T> +Y_FORCE_INLINE size_t fput(FILE* F, const T& a) { + if (Y_LIKELY(F->_w >= int(sizeof(a)))) { + memcpy(F->_p, &a, sizeof(a)); + F->_p += sizeof(a); + F->_w -= sizeof(a); + return 1; + } else { + return fwrite(&a, sizeof(a), 1, F); + } +} + +template <class T> +Y_FORCE_INLINE size_t fget(FILE* F, T& a) { + if (Y_LIKELY(F->_r >= int(sizeof(a)))) { + memcpy(&a, F->_p, sizeof(a)); + F->_p += sizeof(a); + F->_r -= sizeof(a); + return 1; + } else { + return fread(&a, sizeof(a), 1, F); + } +} + +inline size_t fsput(FILE* F, const char* s, size_t l) { + VALGRIND_CHECK_READABLE(s, l); + + if ((size_t)F->_w >= l) { + memcpy(F->_p, s, l); + F->_p += l; + F->_w -= l; + return l; + } else { + return fwrite(s, 1, l, F); + } +} + +inline size_t fsget(FILE* F, char* s, size_t l) { + if ((size_t)F->_r >= l) { + memcpy(s, F->_p, l); + F->_p += l; + F->_r -= l; + return l; + } else { + return fread(s, 1, l, F); + } +} +#else +template <class T> +Y_FORCE_INLINE size_t fput(FILE* F, const T& a) { + return fwrite(&a, sizeof(a), 1, F); +} + +template <class T> +Y_FORCE_INLINE size_t fget(FILE* F, T& a) { + return fread(&a, sizeof(a), 1, F); +} + +inline size_t fsput(FILE* F, const char* s, size_t l) { +#ifdef WITH_VALGRIND + VALGRIND_CHECK_READABLE(s, l); +#endif + return fwrite(s, 1, l, F); +} + +inline size_t fsget(FILE* F, char* s, size_t l) { + return fread(s, 1, l, F); +} +#endif diff --git a/library/cpp/deprecated/fgood/ya.make b/library/cpp/deprecated/fgood/ya.make new file mode 100644 index 0000000000..2394f9ad7a --- /dev/null +++ b/library/cpp/deprecated/fgood/ya.make @@ -0,0 +1,8 @@ +LIBRARY() + +SRCS( + ffb.cpp + fgood.cpp +) + +END() diff --git a/library/cpp/deprecated/mapped_file/mapped_file.cpp b/library/cpp/deprecated/mapped_file/mapped_file.cpp new file mode 100644 index 0000000000..b0e4511299 --- /dev/null +++ b/library/cpp/deprecated/mapped_file/mapped_file.cpp @@ -0,0 +1,64 @@ +#include "mapped_file.h" + +#include <util/generic/yexception.h> +#include <util/system/defaults.h> +#include <util/system/hi_lo.h> +#include <util/system/filemap.h> + +TMappedFile::TMappedFile(TFileMap* map, const char* dbgName) { + Map_ = map; + i64 len = Map_->Length(); + if (Hi32(len) != 0 && sizeof(size_t) <= sizeof(ui32)) + ythrow yexception() << "File '" << dbgName << "' mapping error: " << len << " too large"; + + Map_->Map(0, static_cast<size_t>(len)); +} + +TMappedFile::TMappedFile(const TFile& file, TFileMap::EOpenMode om, const char* dbgName) + : Map_(nullptr) +{ + init(file, om, dbgName); +} + +void TMappedFile::precharge(size_t off, size_t size) const { + if (!Map_) + return; + + Map_->Precharge(off, size); +} + +void TMappedFile::init(const TString& name) { + THolder<TFileMap> map(new TFileMap(name)); + TMappedFile newFile(map.Get(), name.data()); + Y_UNUSED(map.Release()); + newFile.swap(*this); + newFile.term(); +} + +void TMappedFile::init(const TString& name, size_t length, TFileMap::EOpenMode om) { + THolder<TFileMap> map(new TFileMap(name, length, om)); + TMappedFile newFile(map.Get(), name.data()); + Y_UNUSED(map.Release()); + newFile.swap(*this); + newFile.term(); +} + +void TMappedFile::init(const TFile& file, TFileMap::EOpenMode om, const char* dbgName) { + THolder<TFileMap> map(new TFileMap(file, om)); + TMappedFile newFile(map.Get(), dbgName); + Y_UNUSED(map.Release()); + newFile.swap(*this); + newFile.term(); +} + +void TMappedFile::init(const TString& name, TFileMap::EOpenMode om) { + THolder<TFileMap> map(new TFileMap(name, om)); + TMappedFile newFile(map.Get(), name.data()); + Y_UNUSED(map.Release()); + newFile.swap(*this); + newFile.term(); +} + +void TMappedFile::flush() { + Map_->Flush(); +} diff --git a/library/cpp/deprecated/mapped_file/ya.make b/library/cpp/deprecated/mapped_file/ya.make new file mode 100644 index 0000000000..309341f1da --- /dev/null +++ b/library/cpp/deprecated/mapped_file/ya.make @@ -0,0 +1,7 @@ +LIBRARY() + +SRCS( + mapped_file.cpp +) + +END() diff --git a/library/cpp/eventlog/common.h b/library/cpp/eventlog/common.h new file mode 100644 index 0000000000..75c512c13e --- /dev/null +++ b/library/cpp/eventlog/common.h @@ -0,0 +1,10 @@ +#pragma once + +template <class T> +class TPacketInputStream { +public: + virtual bool Avail() const = 0; + virtual T operator*() const = 0; + virtual bool Next() = 0; + virtual ~TPacketInputStream() = default; +}; diff --git a/library/cpp/eventlog/evdecoder.cpp b/library/cpp/eventlog/evdecoder.cpp new file mode 100644 index 0000000000..e4413a1b0e --- /dev/null +++ b/library/cpp/eventlog/evdecoder.cpp @@ -0,0 +1,112 @@ +#include <util/memory/tempbuf.h> +#include <util/string/cast.h> +#include <util/stream/output.h> + +#include "evdecoder.h" +#include "logparser.h" + +static const char* const UNKNOWN_EVENT_CLASS = "Unknown event class"; + +static inline void LogError(ui64 frameAddr, const char* msg, bool strict) { + if (!strict) { + Cerr << "EventDecoder warning @" << frameAddr << ": " << msg << Endl; + } else { + ythrow yexception() << "EventDecoder error @" << frameAddr << ": " << msg; + } +} + +static inline bool SkipData(IInputStream& s, size_t amount) { + return (amount == s.Skip(amount)); +} + +// There are 2 log fomats: the one, that allows event skip without event decode (it has stored event length) +// and another, that requires each event decode just to seek over stream. needRead == true means the latter format. +static inline THolder<TEvent> DoDecodeEvent(IInputStream& s, const TEventFilter* const filter, const bool needRead, IEventFactory* fac) { + TEventTimestamp ts; + TEventClass c; + THolder<TEvent> e; + + ::Load(&s, ts); + ::Load(&s, c); + + bool needReturn = false; + + if (!filter || filter->EventAllowed(c)) { + needReturn = true; + } + + if (needRead || needReturn) { + e.Reset(fac->CreateLogEvent(c)); + + if (!!e) { + e->Timestamp = ts; + e->Load(s); + } else if (needReturn) { + e.Reset(new TUnknownEvent(ts, c)); + } + + if (!needReturn) { + e.Reset(nullptr); + } + } + + return e; +} + +THolder<TEvent> DecodeFramed(IInputStream& inp, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) { + ui32 len; + ::Load(&inp, len); + + if (len < sizeof(ui32)) { + ythrow TEventDecoderError() << "invalid event length"; + } + + TLengthLimitedInput s(&inp, len - sizeof(ui32)); + + try { + THolder<TEvent> e = DoDecodeEvent(s, filter, false, fac); + if (!!e) { + if (!s.Left()) { + return e; + } else if (e->Class == 0) { + if (!SkipData(s, s.Left())) { + ythrow TEventDecoderError() << "cannot skip bad event"; + } + + return e; + } + + LogError(frameAddr, "Event is not fully read", strict); + } + } catch (const TLoadEOF&) { + if (s.Left()) { + throw; + } + + LogError(frameAddr, "Unexpected event end", strict); + } + + if (!SkipData(s, s.Left())) { + ythrow TEventDecoderError() << "cannot skip bad event"; + } + + return nullptr; +} + +THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict) { + try { + if (framed) { + return DecodeFramed(s, frameAddr, filter, fac, strict); + } else { + THolder<TEvent> e = DoDecodeEvent(s, filter, true, fac); + // e(0) means event, skipped by filter. Not an error. + if (!!e && !e->Class) { + ythrow TEventDecoderError() << UNKNOWN_EVENT_CLASS; + } + + return e; + } + } catch (const TLoadEOF&) { + ythrow TEventDecoderError() << "unexpected frame end"; + } +} diff --git a/library/cpp/eventlog/evdecoder.h b/library/cpp/eventlog/evdecoder.h new file mode 100644 index 0000000000..eedfc82174 --- /dev/null +++ b/library/cpp/eventlog/evdecoder.h @@ -0,0 +1,16 @@ +#pragma once + +#include <util/generic/yexception.h> +#include <util/generic/ptr.h> + +#include "eventlog.h" + +class TEvent; +class IInputStream; +class TEventFilter; + +struct TEventDecoderError: public yexception { +}; + +THolder<TEvent> DecodeEvent(IInputStream& s, bool framed, ui64 frameAddr, const TEventFilter* const filter, IEventFactory* fac, bool strict = false); +bool AcceptableContent(TEventLogFormat); diff --git a/library/cpp/eventlog/event_field_output.cpp b/library/cpp/eventlog/event_field_output.cpp new file mode 100644 index 0000000000..f9d98dac9d --- /dev/null +++ b/library/cpp/eventlog/event_field_output.cpp @@ -0,0 +1,68 @@ +#include "event_field_output.h" + +#include <util/string/split.h> + +namespace { + TString MakeSeparators(EFieldOutputFlags flags) { + TString res; + res.reserve(3); + + if (flags & EFieldOutputFlag::EscapeTab) { + res.append('\t'); + } + if (flags & EFieldOutputFlag::EscapeNewLine) { + res.append('\n'); + res.append('\r'); + } + if (flags & EFieldOutputFlag::EscapeBackSlash) { + res.append('\\'); + } + + return res; + } +} + +TEventFieldOutput::TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags) + : Output(output) + , Flags(flags) + , Separators(MakeSeparators(flags)) +{ +} + +IOutputStream& TEventFieldOutput::GetOutputStream() { + return Output; +} + +EFieldOutputFlags TEventFieldOutput::GetFlags() const { + return Flags; +} + +void TEventFieldOutput::DoWrite(const void* buf, size_t len) { + if (!Flags) { + Output.Write(buf, len); + return; + } + + TStringBuf chunk{static_cast<const char*>(buf), len}; + + for (const auto part : StringSplitter(chunk).SplitBySet(Separators.data())) { + TStringBuf token = part.Token(); + TStringBuf delim = part.Delim(); + + if (!token.empty()) { + Output.Write(token); + } + if ("\n" == delim) { + Output.Write(TStringBuf("\\n")); + } else if ("\r" == delim) { + Output.Write(TStringBuf("\\r")); + } else if ("\t" == delim) { + Output.Write(TStringBuf("\\t")); + } else if ("\\" == delim) { + Output.Write(TStringBuf("\\\\")); + } else { + Y_ASSERT(delim.empty()); + } + } +} + diff --git a/library/cpp/eventlog/event_field_output.h b/library/cpp/eventlog/event_field_output.h new file mode 100644 index 0000000000..ed9db0ae16 --- /dev/null +++ b/library/cpp/eventlog/event_field_output.h @@ -0,0 +1,29 @@ +#pragma once + +#include <util/stream/output.h> +#include <util/generic/flags.h> + +enum class EFieldOutputFlag { + EscapeTab = 0x1, // escape \t in field value + EscapeNewLine = 0x2, // escape \n in field value + EscapeBackSlash = 0x4 // escape \ in field value +}; + +Y_DECLARE_FLAGS(EFieldOutputFlags, EFieldOutputFlag); +Y_DECLARE_OPERATORS_FOR_FLAGS(EFieldOutputFlags); + +class TEventFieldOutput: public IOutputStream { +public: + TEventFieldOutput(IOutputStream& output, EFieldOutputFlags flags); + + IOutputStream& GetOutputStream(); + EFieldOutputFlags GetFlags() const; + +protected: + void DoWrite(const void* buf, size_t len) override; + +private: + IOutputStream& Output; + EFieldOutputFlags Flags; + TString Separators; +}; diff --git a/library/cpp/eventlog/event_field_printer.cpp b/library/cpp/eventlog/event_field_printer.cpp new file mode 100644 index 0000000000..29c6b4b661 --- /dev/null +++ b/library/cpp/eventlog/event_field_printer.cpp @@ -0,0 +1,27 @@ +#include "event_field_printer.h" + +#include <library/cpp/protobuf/json/proto2json.h> + +namespace { + + const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig() + .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault) + .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>()); + +} // namespace + +TEventProtobufMessageFieldPrinter::TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode) + : Mode(mode) +{} + +template <> +void TEventProtobufMessageFieldPrinter::PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output) { + switch (Mode) { + case EProtobufMessageFieldPrintMode::DEFAULT: + case EProtobufMessageFieldPrintMode::JSON: { + // Do not use field.PrintJSON() here: IGNIETFERRO-2002 + NProtobufJson::Proto2Json(field, output, PROTO_2_JSON_CONFIG); + break; + } + } +} diff --git a/library/cpp/eventlog/event_field_printer.h b/library/cpp/eventlog/event_field_printer.h new file mode 100644 index 0000000000..835e8f4a85 --- /dev/null +++ b/library/cpp/eventlog/event_field_printer.h @@ -0,0 +1,38 @@ +#pragma once + +#include "event_field_output.h" + +#include <google/protobuf/message.h> + +// NB: For historical reasons print code for all primitive types/repeated fields/etc generated by https://a.yandex-team.ru/arc/trunk/arcadia/tools/event2cpp + +enum class EProtobufMessageFieldPrintMode { + // Use <TEventProtobufMessageFieldType>::Print method for fields that has it + // Print json for other fields + DEFAULT = 0, + + JSON = 1, +}; + +class TEventProtobufMessageFieldPrinter { +public: + explicit TEventProtobufMessageFieldPrinter(EProtobufMessageFieldPrintMode mode); + + template <typename TEventProtobufMessageFieldType, bool HasPrintFunction> + void PrintProtobufMessageFieldToOutput(const TEventProtobufMessageFieldType& field, TEventFieldOutput& output) { + if constexpr (HasPrintFunction) { + if (Mode == EProtobufMessageFieldPrintMode::DEFAULT) { + field.Print(output.GetOutputStream(), output.GetFlags()); + return; + } + } + + PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(field, output); + } + + template <> + void PrintProtobufMessageFieldToOutput<google::protobuf::Message, false>(const google::protobuf::Message& field, TEventFieldOutput& output); + +private: + EProtobufMessageFieldPrintMode Mode; +}; diff --git a/library/cpp/eventlog/eventlog.cpp b/library/cpp/eventlog/eventlog.cpp new file mode 100644 index 0000000000..458a632b4a --- /dev/null +++ b/library/cpp/eventlog/eventlog.cpp @@ -0,0 +1,554 @@ +#include <util/datetime/base.h> +#include <util/stream/zlib.h> +#include <util/stream/length.h> +#include <util/generic/buffer.h> +#include <util/generic/yexception.h> +#include <util/digest/murmur.h> +#include <util/generic/singleton.h> +#include <util/generic/function.h> +#include <util/stream/output.h> +#include <util/stream/format.h> +#include <util/stream/null.h> + +#include <google/protobuf/messagext.h> + +#include "eventlog.h" +#include "events_extension.h" +#include "evdecoder.h" +#include "logparser.h" +#include <library/cpp/eventlog/proto/internal.pb.h> + +#include <library/cpp/json/json_writer.h> +#include <library/cpp/protobuf/json/proto2json.h> + + +TAtomic eventlogFrameCounter = 0; + +namespace { + + const NProtobufJson::TProto2JsonConfig PROTO_2_JSON_CONFIG = NProtobufJson::TProto2JsonConfig() + .SetMissingRepeatedKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault) + .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>()); + + ui32 GenerateFrameId() { + return ui32(AtomicAdd(eventlogFrameCounter, 1)); + } + + inline const NProtoBuf::Message* UnknownEventMessage() { + return Singleton<NEventLogInternal::TUnknownEvent>(); + } + +} // namespace + +void TEvent::Print(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const { + if (options.OutputFormat == TOutputFormat::TabSeparatedRaw) { + PrintHeader(out, options, eventState); + DoPrint(out, {}); + } else if (options.OutputFormat == TOutputFormat::TabSeparated) { + PrintHeader(out, options, eventState); + DoPrint( + out, + EFieldOutputFlags{} | EFieldOutputFlag::EscapeNewLine | EFieldOutputFlag::EscapeBackSlash); + } else if (options.OutputFormat == TOutputFormat::Json) { + NJson::TJsonWriterConfig jsonWriterConfig; + jsonWriterConfig.FormatOutput = 0; + NJson::TJsonWriter jsonWriter(&out, jsonWriterConfig); + + jsonWriter.OpenMap(); + PrintJsonHeader(jsonWriter); + DoPrintJson(jsonWriter); + jsonWriter.CloseMap(); + } +} + +void TEvent::PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const { + if (options.HumanReadable) { + out << TInstant::MicroSeconds(Timestamp).ToString() << "\t"; + if (Timestamp >= eventState.FrameStartTime) + out << "+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.FrameStartTime)); + else // a bug somewhere? anyway, let's handle it in a nice fashion + out << "-" << HumanReadable(TDuration::MicroSeconds(eventState.FrameStartTime - Timestamp)); + + if (Timestamp >= eventState.PrevEventTime) + out << " (+" << HumanReadable(TDuration::MicroSeconds(Timestamp - eventState.PrevEventTime)) << ")"; + // else: these events are async and out-of-order, relative time diff makes no sense, skip it + + out << "\tF# " << FrameId << '\t'; + } else { + out << static_cast<TEventTimestamp>(Timestamp); + out << '\t' << FrameId << '\t'; + } +} + +void TEvent::PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const { + jsonWriter.Write("Timestamp", Timestamp); + jsonWriter.Write("FrameId", FrameId); +} + +class TProtobufEvent: public TEvent { +public: + TProtobufEvent(TEventTimestamp t, size_t eventId, const NProtoBuf::Message& msg) + : TEvent(eventId, t) + , Message_(&msg) + , EventFactory_(NProtoBuf::TEventFactory::Instance()) + { + } + + TProtobufEvent() + : TEvent(0, 0) + , EventFactory_(NProtoBuf::TEventFactory::Instance()) + { + } + + explicit TProtobufEvent(ui32 id, NProtoBuf::TEventFactory* eventFactory = NProtoBuf::TEventFactory::Instance()) + : TEvent(id, 0) + , EventFactory_(eventFactory) + { + InnerMsg_.Reset(EventFactory_->CreateEvent(Class)); + Message_ = InnerMsg_.Get(); + } + + ui32 Id() const { + return Class; + } + + void Load(IInputStream& in) override { + if (!!InnerMsg_) { + InnerMsg_->ParseFromArcadiaStream(&in); + } else { + TransferData(&in, &Cnull); + } + } + + void Save(IOutputStream& out) const override { + Message_->SerializeToArcadiaStream(&out); + } + + void SaveToBuffer(TBufferOutput& buf) const override { + size_t messageSize = Message_->ByteSize(); + size_t before = buf.Buffer().Size(); + buf.Buffer().Advance(messageSize); + Y_PROTOBUF_SUPPRESS_NODISCARD Message_->SerializeToArray(buf.Buffer().Data() + before, messageSize); + } + + TStringBuf GetName() const override { + return EventFactory_->NameById(Id()); + } + +private: + void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const override { + EventFactory_->PrintEvent(Id(), Message_, out, flags); + } + void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override { + jsonWriter.OpenMap("EventBody"); + jsonWriter.Write("Type", GetName()); + + jsonWriter.Write("Fields"); + NProtobufJson::Proto2Json(*GetProto(), jsonWriter, PROTO_2_JSON_CONFIG); + + jsonWriter.CloseMap(); + } + + const NProtoBuf::Message* GetProto() const override { + if (Message_) { + return Message_; + } + + return UnknownEventMessage(); + } + +private: + const NProtoBuf::Message* Message_ = nullptr; + NProtoBuf::TEventFactory* EventFactory_; + THolder<NProtoBuf::Message> InnerMsg_; + + friend class TEventLogFrame; +}; + +void TEventLogFrame::LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev) { + TProtobufEvent event(Now().MicroSeconds(), eventId, ev); + + LogEventImpl(event); +} + +void TEventLogFrame::LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev) { + TProtobufEvent event(timestamp, eventId, ev); + + LogEventImpl(event); +} + +template <> +void TEventLogFrame::DebugDump(const TProtobufEvent& ev) { + static TMutex lock; + + with_lock (lock) { + Cerr << ev.Timestamp << "\t" << ev.GetName() << "\t"; + ev.GetProto()->PrintJSON(Cerr); + Cerr << Endl; + } +} + +#pragma pack(push, 1) +struct TFrameHeaderData { + char SyncField[COMPRESSED_LOG_FRAME_SYNC_DATA.size()]; + TCompressedFrameBaseHeader Header; + TCompressedFrameHeader2 HeaderEx; +}; +#pragma pack(pop) + +TEventLogFrame::TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : EvLog_(parentLog.HasNullBackend() ? nullptr : &parentLog) + , NeedAlwaysSafeAdd_(needAlwaysSafeAdd) + , ForceDump_(false) + , WriteFrameCallback_(std::move(writeFrameCallback)) +{ + DoInit(); +} + +TEventLogFrame::TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : EvLog_(parentLog) + , NeedAlwaysSafeAdd_(needAlwaysSafeAdd) + , ForceDump_(false) + , WriteFrameCallback_(std::move(writeFrameCallback)) +{ + if (EvLog_ && EvLog_->HasNullBackend()) { + EvLog_ = nullptr; + } + + DoInit(); +} + +TEventLogFrame::TEventLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : EvLog_(nullptr) + , NeedAlwaysSafeAdd_(needAlwaysSafeAdd) + , ForceDump_(false) + , WriteFrameCallback_(std::move(writeFrameCallback)) +{ + DoInit(); +} + +void TEventLogFrame::Flush() { + if (EvLog_ == nullptr) + return; + + TBuffer& buf = Buf_.Buffer(); + + if (buf.Empty()) { + return; + } + + EvLog_->WriteFrame(buf, StartTimestamp_, EndTimestamp_, WriteFrameCallback_, std::move(MetaFlags_)); + + DoInit(); + + return; +} + +void TEventLogFrame::SafeFlush() { + TGuard<TMutex> g(Mtx_); + Flush(); +} + +void TEventLogFrame::AddEvent(TEventTimestamp timestamp) { + if (timestamp < StartTimestamp_) { + StartTimestamp_ = timestamp; + } + + if (timestamp > EndTimestamp_) { + EndTimestamp_ = timestamp; + } +} + +void TEventLogFrame::DoInit() { + Buf_.Buffer().Clear(); + + StartTimestamp_ = (TEventTimestamp)-1; + EndTimestamp_ = 0; +} + +void TEventLogFrame::VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory) { + const auto doVisit = [this, &visitor, eventFactory]() { + TBuffer& buf = Buf_.Buffer(); + + TBufferInput bufferInput(buf); + TLengthLimitedInput limitedInput(&bufferInput, buf.size()); + + TEventFilter EventFilter(false); + + while (limitedInput.Left()) { + THolder<TEvent> event = DecodeEvent(limitedInput, true, 0, &EventFilter, eventFactory); + + visitor.Visit(*event); + } + }; + if (NeedAlwaysSafeAdd_) { + TGuard<TMutex> g(Mtx_); + doVisit(); + } else { + doVisit(); + } +} + +TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback)) +{ +} + +TSelfFlushLogFrame::TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : TEventLogFrame(parentLog, needAlwaysSafeAdd, std::move(writeFrameCallback)) +{ +} + +TSelfFlushLogFrame::TSelfFlushLogFrame(bool needAlwaysSafeAdd, TWriteFrameCallbackPtr writeFrameCallback) + : TEventLogFrame(needAlwaysSafeAdd, std::move(writeFrameCallback)) +{ +} + +TSelfFlushLogFrame::~TSelfFlushLogFrame() { + try { + Flush(); + } catch (...) { + } +} + +IEventLog::~IEventLog() { +} + +static THolder<TLogBackend> ConstructBackend(const TString& fileName, const TEventLogBackendOptions& backendOpts) { + try { + THolder<TLogBackend> backend; + if (backendOpts.UseSyncPageCacheBackend) { + backend = MakeHolder<TSyncPageCacheFileLogBackend>(fileName, backendOpts.SyncPageCacheBackendBufferSize, backendOpts.SyncPageCacheBackendMaxPendingSize); + } else { + backend = MakeHolder<TFileLogBackend>(fileName); + } + return MakeHolder<TReopenLogBackend>(std::move(backend)); + } catch (...) { + Cdbg << "Warning: Cannot open event log '" << fileName << "': " << CurrentExceptionMessage() << "." << Endl; + } + + return MakeHolder<TNullLogBackend>(); +} + +TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat) + : Log_(ConstructBackend(fileName, backendOpts)) + , ContentFormat_(contentFormat) + , LogFormat_(logFormat.Defined() ? *logFormat : COMPRESSED_LOG_FORMAT_V4) + , HasNullBackend_(Log_.IsNullLog()) + , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc")) + , ZstdCodec_(NBlockCodecs::Codec("zstd_1")) +{ + Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5); + + if (contentFormat & 0xff000000) { + ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")"; + } +} + +TEventLog::TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts) + : TEventLog(fileName, contentFormat, backendOpts, COMPRESSED_LOG_FORMAT_V4) +{ +} + +TEventLog::TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat) + : Log_(log) + , ContentFormat_(contentFormat) + , LogFormat_(logFormat) + , HasNullBackend_(Log_.IsNullLog()) + , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc")) + , ZstdCodec_(NBlockCodecs::Codec("zstd_1")) +{ + if (contentFormat & 0xff000000) { + ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")"; + } +} + +TEventLog::TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat) + : Log_(MakeHolder<TNullLogBackend>()) + , ContentFormat_(contentFormat) + , LogFormat_(logFormat) + , HasNullBackend_(true) + , Lz4hcCodec_(NBlockCodecs::Codec("lz4hc")) + , ZstdCodec_(NBlockCodecs::Codec("zstd_1")) +{ + if (contentFormat & 0xff000000) { + ythrow yexception() << "wrong compressed event log content format code (" << contentFormat << ")"; + } +} + +TEventLog::~TEventLog() { +} + +void TEventLog::ReopenLog() { + Log_.ReopenLog(); +} + +void TEventLog::CloseLog() { + Log_.CloseLog(); +} + +void TEventLog::Flush() { +} + +namespace { + class TOnExceptionAction { + public: + TOnExceptionAction(std::function<void()>&& f) + : F_(std::move(f)) + { + } + + ~TOnExceptionAction() { + if (F_ && UncaughtException()) { + try { + F_(); + } catch (...) { + } + } + } + + private: + std::function<void()> F_; + }; +} + +void TEventLog::WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback, + TLogRecord::TMetaFlags metaFlags) { + Y_ENSURE(LogFormat_ == COMPRESSED_LOG_FORMAT_V4 || LogFormat_ == COMPRESSED_LOG_FORMAT_V5); + + TBuffer& b1 = buffer; + + size_t maxCompressedLength = (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) ? b1.Size() + 256 : ZstdCodec_->MaxCompressedLength(b1); + + // Reserve enough memory to minimize reallocs + TBufferOutput outbuf(sizeof(TFrameHeaderData) + maxCompressedLength); + TBuffer& b2 = outbuf.Buffer(); + b2.Proceed(sizeof(TFrameHeaderData)); + + { + TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data()); + + memcpy(hdr.SyncField, COMPRESSED_LOG_FRAME_SYNC_DATA.data(), COMPRESSED_LOG_FRAME_SYNC_DATA.size()); + hdr.Header.Format = (LogFormat_ << 24) | (ContentFormat_ & 0xffffff); + hdr.Header.FrameId = GenerateFrameId(); + hdr.HeaderEx.UncompressedDatalen = (ui32)b1.Size(); + hdr.HeaderEx.StartTimestamp = startTimestamp; + hdr.HeaderEx.EndTimestamp = endTimestamp; + hdr.HeaderEx.PayloadChecksum = 0; + hdr.HeaderEx.CompressorVersion = 0; + } + + if (LogFormat_ == COMPRESSED_LOG_FORMAT_V4) { + TBuffer encoded(b1.Size() + sizeof(TFrameHeaderData) + 256); + Lz4hcCodec_->Encode(b1, encoded); + + TZLibCompress compr(&outbuf, ZLib::ZLib, 6, 2048); + compr.Write(encoded.data(), encoded.size()); + compr.Finish(); + } else { + b2.Advance(ZstdCodec_->Compress(b1, b2.Pos())); + } + + { + const size_t k = sizeof(TCompressedFrameBaseHeader) + COMPRESSED_LOG_FRAME_SYNC_DATA.size(); + TFrameHeaderData& hdr = *reinterpret_cast<TFrameHeaderData*>(b2.data()); + hdr.Header.Length = static_cast<ui32>(b2.size() - k); + hdr.HeaderEx.PayloadChecksum = MurmurHash<ui32>(b2.data() + sizeof(TFrameHeaderData), b2.size() - sizeof(TFrameHeaderData)); + + const size_t n = sizeof(TFrameHeaderData) - (COMPRESSED_LOG_FRAME_SYNC_DATA.size() + sizeof(hdr.HeaderEx.HeaderChecksum)); + hdr.HeaderEx.HeaderChecksum = MurmurHash<ui32>(b2.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(), n); + } + + const TBuffer& frameData = outbuf.Buffer(); + + TOnExceptionAction actionCallback([this] { + if (ErrorCallback_) { + ErrorCallback_->OnWriteError(); + } + }); + + if (writeFrameCallback) { + writeFrameCallback->OnAfterCompress(frameData, startTimestamp, endTimestamp); + } + + Log_.Write(frameData.Data(), frameData.Size(), std::move(metaFlags)); + if (SuccessCallback_) { + SuccessCallback_->OnWriteSuccess(frameData); + } +} + +TEvent* TProtobufEventFactory::CreateLogEvent(TEventClass c) { + return new TProtobufEvent(c, EventFactory_); +} + +TEventClass TProtobufEventFactory::ClassByName(TStringBuf name) const { + return EventFactory_->IdByName(name); +} + +TEventClass TProtobufEventFactory::EventClassBegin() const { + const auto& items = EventFactory_->FactoryItems(); + + if (items.empty()) { + return static_cast<TEventClass>(0); + } + + return static_cast<TEventClass>(items.begin()->first); +} + +TEventClass TProtobufEventFactory::EventClassEnd() const { + const auto& items = EventFactory_->FactoryItems(); + + if (items.empty()) { + return static_cast<TEventClass>(0); + } + + return static_cast<TEventClass>(items.rbegin()->first + 1); +} + +namespace NEvClass { + IEventFactory* Factory() { + return Singleton<TProtobufEventFactory>(); + } + + IEventProcessor* Processor() { + return Singleton<TProtobufEventProcessor>(); + } +} + +const NProtoBuf::Message* TUnknownEvent::GetProto() const { + return UnknownEventMessage(); +} + +TStringBuf TUnknownEvent::GetName() const { + return TStringBuf("UnknownEvent"); +} + +void TUnknownEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const { + jsonWriter.OpenMap("EventBody"); + jsonWriter.Write("Type", GetName()); + jsonWriter.Write("EventId", (size_t)Class); + jsonWriter.CloseMap(); +} + +TStringBuf TEndOfFrameEvent::GetName() const { + return TStringBuf("EndOfFrame"); +} + +const NProtoBuf::Message* TEndOfFrameEvent::GetProto() const { + return Singleton<NEventLogInternal::TEndOfFrameEvent>(); +} + +void TEndOfFrameEvent::DoPrintJson(NJson::TJsonWriter& jsonWriter) const { + jsonWriter.OpenMap("EventBody"); + jsonWriter.Write("Type", GetName()); + jsonWriter.OpenMap("Fields"); + jsonWriter.CloseMap(); + jsonWriter.CloseMap(); +} + +THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev) { + return MakeHolder<TProtobufEvent>(ts, eventId, ev); +} diff --git a/library/cpp/eventlog/eventlog.h b/library/cpp/eventlog/eventlog.h new file mode 100644 index 0000000000..45c2dfb17f --- /dev/null +++ b/library/cpp/eventlog/eventlog.h @@ -0,0 +1,623 @@ +#pragma once + +#include "eventlog_int.h" +#include "event_field_output.h" +#include "events_extension.h" + +#include <library/cpp/blockcodecs/codecs.h> +#include <library/cpp/logger/all.h> + +#include <google/protobuf/message.h> + +#include <util/datetime/base.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/stream/output.h> +#include <util/stream/buffer.h> +#include <util/stream/str.h> +#include <util/system/mutex.h> +#include <util/stream/output.h> +#include <util/system/env.h> +#include <util/system/unaligned_mem.h> +#include <util/ysaveload.h> + +#include <cstdlib> + +namespace NJson { + class TJsonWriter; +} + +class IEventLog; + +class TEvent : public TThrRefBase { +public: + enum class TOutputFormat { + TabSeparated, + TabSeparatedRaw, // disables escaping + Json + }; + + struct TOutputOptions { + TOutputFormat OutputFormat = TOutputFormat::TabSeparated; + // Dump some fields (e.g. timestamp) in more human-readable format + bool HumanReadable = false; + + TOutputOptions(TOutputFormat outputFormat = TOutputFormat::TabSeparated) + : OutputFormat(outputFormat) + { + } + + TOutputOptions(TOutputFormat outputFormat, bool humanReadable) + : OutputFormat(outputFormat) + , HumanReadable(humanReadable) + { + } + }; + + struct TEventState { + TEventTimestamp FrameStartTime = 0; + TEventTimestamp PrevEventTime = 0; + TEventState() { + } + }; + + TEvent(TEventClass c, TEventTimestamp t) + : Class(c) + , Timestamp(t) + { + } + + virtual ~TEvent() = default; + + // Note, that descendants MUST have Save() & Load() methods to alter + // only its new variables, not the base class! + virtual void Save(IOutputStream& out) const = 0; + virtual void SaveToBuffer(TBufferOutput& out) const { + Save(out); + } + + // Note, that descendants MUST have Save() & Load() methods to alter + // only its new variables, not the base class! + virtual void Load(IInputStream& i) = 0; + + virtual TStringBuf GetName() const = 0; + virtual const NProtoBuf::Message* GetProto() const = 0; + + void Print(IOutputStream& out, const TOutputOptions& options = TOutputOptions(), const TEventState& eventState = TEventState()) const; + void PrintHeader(IOutputStream& out, const TOutputOptions& options, const TEventState& eventState) const; + + TString ToString() const { + TStringStream buff; + Print(buff); + return buff.Str(); + } + + void FullSaveToBuffer(TBufferOutput& buf) const { + SaveMessageHeader(buf); + this->SaveToBuffer(buf); + } + + void FullSave(IOutputStream& o) const { + SaveMessageHeader(o); + this->Save(o); + } + + void FullLoad(IInputStream& i) { + ::Load(&i, Timestamp); + ::Load(&i, Class); + this->Load(i); + } + + template <class T> + const T* Get() const { + return static_cast<const T*>(this->GetProto()); + } + + TEventClass Class; + TEventTimestamp Timestamp; + ui32 FrameId = 0; + +private: + void SaveMessageHeader(IOutputStream& out) const { + ::Save(&out, Timestamp); + ::Save(&out, Class); + } + + virtual void DoPrint(IOutputStream& out, EFieldOutputFlags flags) const = 0; + virtual void DoPrintJson(NJson::TJsonWriter& jsonWriter) const = 0; + + void PrintJsonHeader(NJson::TJsonWriter& jsonWriter) const; +}; + +using TEventPtr = TIntrusivePtr<TEvent>; +using TConstEventPtr = TIntrusiveConstPtr<TEvent>; + +class IEventProcessor { +public: + virtual void SetOptions(const TEvent::TOutputOptions& options) { + Options_ = options; + } + virtual void ProcessEvent(const TEvent* ev) = 0; + virtual bool CheckedProcessEvent(const TEvent* ev) { + ProcessEvent(ev); + return true; + } + virtual ~IEventProcessor() = default; + +protected: + TEvent::TOutputOptions Options_; +}; + +class IEventFactory { +public: + virtual TEvent* CreateLogEvent(TEventClass c) = 0; + virtual TEventLogFormat CurrentFormat() = 0; + virtual TEventClass ClassByName(TStringBuf name) const = 0; + virtual TEventClass EventClassBegin() const = 0; + virtual TEventClass EventClassEnd() const = 0; + virtual ~IEventFactory() = default; +}; + +class TUnknownEvent: public TEvent { +public: + TUnknownEvent(TEventTimestamp ts, TEventClass cls) + : TEvent(cls, ts) + { + } + + ~TUnknownEvent() override = default; + + void Save(IOutputStream& /* o */) const override { + ythrow yexception() << "TUnknownEvent cannot be saved"; + } + + void Load(IInputStream& /* i */) override { + ythrow yexception() << "TUnknownEvent cannot be loaded"; + } + + TStringBuf GetName() const override; + +private: + void DoPrint(IOutputStream& out, EFieldOutputFlags) const override { + out << GetName() << "\t" << (size_t)Class; + } + + void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override; + + const NProtoBuf::Message* GetProto() const override; +}; + +class TEndOfFrameEvent: public TEvent { +public: + enum { + EventClass = 0 + }; + + TEndOfFrameEvent(TEventTimestamp ts) + : TEvent(TEndOfFrameEvent::EventClass, ts) + { + } + + ~TEndOfFrameEvent() override = default; + + void Save(IOutputStream& o) const override { + (void)o; + ythrow yexception() << "TEndOfFrameEvent cannot be saved"; + } + + void Load(IInputStream& i) override { + (void)i; + ythrow yexception() << "TEndOfFrameEvent cannot be loaded"; + } + + TStringBuf GetName() const override; + +private: + void DoPrint(IOutputStream& out, EFieldOutputFlags) const override { + out << GetName(); + } + void DoPrintJson(NJson::TJsonWriter& jsonWriter) const override; + + const NProtoBuf::Message* GetProto() const override; +}; + +class ILogFrameEventVisitor { +public: + virtual ~ILogFrameEventVisitor() = default; + + virtual void Visit(const TEvent& event) = 0; +}; + +class IWriteFrameCallback : public TAtomicRefCount<IWriteFrameCallback> { +public: + virtual ~IWriteFrameCallback() = default; + + virtual void OnAfterCompress(const TBuffer& compressedFrame, TEventTimestamp startTimestamp, TEventTimestamp endTimestamp) = 0; +}; + +using TWriteFrameCallbackPtr = TIntrusivePtr<IWriteFrameCallback>; + +class TEventLogFrame { +public: + TEventLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TEventLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TEventLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + + virtual ~TEventLogFrame() = default; + + void Flush(); + void SafeFlush(); + + void ForceDump() { + ForceDump_ = true; + } + + template <class T> + inline void LogEvent(const T& ev) { + if (NeedAlwaysSafeAdd_) { + SafeLogEvent(ev); + } else { + UnSafeLogEvent(ev); + } + } + + template <class T> + inline void LogEvent(TEventTimestamp timestamp, const T& ev) { + if (NeedAlwaysSafeAdd_) { + SafeLogEvent(timestamp, ev); + } else { + UnSafeLogEvent(timestamp, ev); + } + } + + template <class T> + inline void UnSafeLogEvent(const T& ev) { + if (!IsEventIgnored(ev.ID)) + LogProtobufEvent(ev.ID, ev); + } + + template <class T> + inline void UnSafeLogEvent(TEventTimestamp timestamp, const T& ev) { + if (!IsEventIgnored(ev.ID)) + LogProtobufEvent(timestamp, ev.ID, ev); + } + + template <class T> + inline void SafeLogEvent(const T& ev) { + if (!IsEventIgnored(ev.ID)) { + TGuard<TMutex> g(Mtx_); + LogProtobufEvent(ev.ID, ev); + } + } + + template <class T> + inline void SafeLogEvent(TEventTimestamp timestamp, const T& ev) { + if (!IsEventIgnored(ev.ID)) { + TGuard<TMutex> g(Mtx_); + LogProtobufEvent(timestamp, ev.ID, ev); + } + } + + void VisitEvents(ILogFrameEventVisitor& visitor, IEventFactory* eventFactory); + + inline bool IsEventIgnored(size_t eventId) const { + Y_UNUSED(eventId); // in future we might want to selectively discard only some kinds of messages + return !IsDebugModeEnabled() && EvLog_ == nullptr && !ForceDump_; + } + + void Enable(IEventLog& evLog) { + EvLog_ = &evLog; + } + + void Disable() { + EvLog_ = nullptr; + } + + void SetNeedAlwaysSafeAdd(bool val) { + NeedAlwaysSafeAdd_ = val; + } + + void SetWriteFrameCallback(TWriteFrameCallbackPtr writeFrameCallback) { + WriteFrameCallback_ = writeFrameCallback; + } + + void AddMetaFlag(const TString& key, const TString& value) { + if (NeedAlwaysSafeAdd_) { + TGuard<TMutex> g(Mtx_); + MetaFlags_.emplace_back(key, value); + } else { + MetaFlags_.emplace_back(key, value); + } + } + +protected: + void LogProtobufEvent(size_t eventId, const NProtoBuf::Message& ev); + void LogProtobufEvent(TEventTimestamp timestamp, size_t eventId, const NProtoBuf::Message& ev); + +private: + static bool IsDebugModeEnabled() { + static struct TSelector { + bool Flag; + + TSelector() + : Flag(GetEnv("EVLOG_DEBUG") == TStringBuf("1")) + { + } + } selector; + + return selector.Flag; + } + + template <class T> + void DebugDump(const T& ev); + + // T must be a descendant of NEvClass::TEvent + template <class T> + inline void LogEventImpl(const T& ev) { + if (EvLog_ != nullptr || ForceDump_) { + TBuffer& b = Buf_.Buffer(); + size_t lastSize = b.size(); + ::Save(&Buf_, ui32(0)); + ev.FullSaveToBuffer(Buf_); + WriteUnaligned<ui32>(b.data() + lastSize, (ui32)(b.size() - lastSize)); + AddEvent(ev.Timestamp); + } + + if (IsDebugModeEnabled()) { + DebugDump(ev); + } + } + + void AddEvent(TEventTimestamp timestamp); + void DoInit(); + +private: + TBufferOutput Buf_; + TEventTimestamp StartTimestamp_, EndTimestamp_; + IEventLog* EvLog_; + TMutex Mtx_; + bool NeedAlwaysSafeAdd_; + bool ForceDump_; + TWriteFrameCallbackPtr WriteFrameCallback_; + TLogRecord::TMetaFlags MetaFlags_; + friend class TEventRecord; +}; + +class TSelfFlushLogFrame: public TEventLogFrame, public TAtomicRefCount<TSelfFlushLogFrame> { +public: + TSelfFlushLogFrame(bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TSelfFlushLogFrame(IEventLog& parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + TSelfFlushLogFrame(IEventLog* parentLog, bool needAlwaysSafeAdd = false, TWriteFrameCallbackPtr writeFrameCallback = nullptr); + + virtual ~TSelfFlushLogFrame(); +}; + +using TSelfFlushLogFramePtr = TIntrusivePtr<TSelfFlushLogFrame>; + +class IEventLog: public TAtomicRefCount<IEventLog> { +public: + class IErrorCallback { + public: + virtual ~IErrorCallback() { + } + + virtual void OnWriteError() = 0; + }; + + class ISuccessCallback { + public: + virtual ~ISuccessCallback() { + } + + virtual void OnWriteSuccess(const TBuffer& frameData) = 0; + }; + + virtual ~IEventLog(); + + virtual void ReopenLog() = 0; + virtual void CloseLog() = 0; + virtual void Flush() = 0; + virtual void SetErrorCallback(IErrorCallback*) { + } + virtual void SetSuccessCallback(ISuccessCallback*) { + } + + template <class T> + void LogEvent(const T& ev) { + TEventLogFrame frame(*this); + frame.LogEvent(ev); + frame.Flush(); + } + + virtual bool HasNullBackend() const = 0; + + virtual void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) = 0; +}; + +struct TEventLogBackendOptions { + bool UseSyncPageCacheBackend = false; + size_t SyncPageCacheBackendBufferSize = 0; + size_t SyncPageCacheBackendMaxPendingSize = 0; +}; + +class TEventLog: public IEventLog { +public: + /* + * Параметр contentformat указывает формат контента лога, например какие могут в логе + * встретится классы событий, какие параметры у этих событий, и пр. Старший байт параметра + * должен быть нулевым. + */ + TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts, TMaybe<TEventLogFormat> logFormat); + TEventLog(const TString& fileName, TEventLogFormat contentFormat, const TEventLogBackendOptions& backendOpts = {}); + TEventLog(const TLog& log, TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4); + TEventLog(TEventLogFormat contentFormat, TEventLogFormat logFormat = COMPRESSED_LOG_FORMAT_V4); + + ~TEventLog() override; + + void ReopenLog() override; + void CloseLog() override; + void Flush() override; + void SetErrorCallback(IErrorCallback* errorCallback) override { + ErrorCallback_ = errorCallback; + } + void SetSuccessCallback(ISuccessCallback* successCallback) override { + SuccessCallback_ = successCallback; + } + + template <class T> + void LogEvent(const T& ev) { + TEventLogFrame frame(*this); + frame.LogEvent(ev); + frame.Flush(); + } + + bool HasNullBackend() const override { + return HasNullBackend_; + } + + void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) override; + +private: + mutable TLog Log_; + TEventLogFormat ContentFormat_; + const TEventLogFormat LogFormat_; + bool HasNullBackend_; + const NBlockCodecs::ICodec* const Lz4hcCodec_; + const NBlockCodecs::ICodec* const ZstdCodec_; + IErrorCallback* ErrorCallback_ = nullptr; + ISuccessCallback* SuccessCallback_ = nullptr; +}; + +using TEventLogPtr = TIntrusivePtr<IEventLog>; + +class TEventLogWithSlave: public IEventLog { +public: + TEventLogWithSlave(IEventLog& parentLog) + : Slave_(&parentLog) + { + } + + TEventLogWithSlave(const TEventLogPtr& parentLog) + : SlavePtr_(parentLog) + , Slave_(SlavePtr_.Get()) + { + } + + ~TEventLogWithSlave() override { + try { + Slave().Flush(); + } catch (...) { + } + } + + void Flush() override { + Slave().Flush(); + } + + void ReopenLog() override { + return Slave().ReopenLog(); + } + void CloseLog() override { + return Slave().CloseLog(); + } + + bool HasNullBackend() const override { + return Slave().HasNullBackend(); + } + + void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) override { + Slave().WriteFrame(buffer, startTimestamp, endTimestamp, writeFrameCallback, std::move(metaFlags)); + } + + void SetErrorCallback(IErrorCallback* errorCallback) override { + Slave().SetErrorCallback(errorCallback); + } + + void SetSuccessCallback(ISuccessCallback* successCallback) override { + Slave().SetSuccessCallback(successCallback); + } + +protected: + inline IEventLog& Slave() const { + return *Slave_; + } + +private: + TEventLogPtr SlavePtr_; + IEventLog* Slave_ = nullptr; +}; + +extern TAtomic eventlogFrameCounter; + +class TProtobufEventProcessor: public IEventProcessor { +public: + void ProcessEvent(const TEvent* ev) override final { + ProcessEvent(ev, &Cout); + } + + void ProcessEvent(const TEvent* ev, IOutputStream *out) { + UpdateEventState(ev); + DoProcessEvent(ev, out); + EventState_.PrevEventTime = ev->Timestamp; + } +protected: + virtual void DoProcessEvent(const TEvent * ev, IOutputStream *out) { + ev->Print(*out, Options_, EventState_); + (*out) << Endl; + } + ui32 CurrentFrameId_ = Max<ui32>(); + TEvent::TEventState EventState_; + +private: + void UpdateEventState(const TEvent *ev) { + if (ev->FrameId != CurrentFrameId_) { + EventState_.FrameStartTime = ev->Timestamp; + EventState_.PrevEventTime = ev->Timestamp; + CurrentFrameId_ = ev->FrameId; + } + } +}; + +class TProtobufEventFactory: public IEventFactory { +public: + TProtobufEventFactory(NProtoBuf::TEventFactory* factory = NProtoBuf::TEventFactory::Instance()) + : EventFactory_(factory) + { + } + + TEvent* CreateLogEvent(TEventClass c) override; + + TEventLogFormat CurrentFormat() override { + return 0; + } + + TEventClass ClassByName(TStringBuf name) const override; + + TEventClass EventClassBegin() const override; + + TEventClass EventClassEnd() const override; + + ~TProtobufEventFactory() override = default; + +private: + NProtoBuf::TEventFactory* EventFactory_; +}; + +THolder<TEvent> MakeProtobufLogEvent(TEventTimestamp ts, TEventClass eventId, google::protobuf::Message& ev); + +namespace NEvClass { + IEventFactory* Factory(); + IEventProcessor* Processor(); +} diff --git a/library/cpp/eventlog/eventlog_int.cpp b/library/cpp/eventlog/eventlog_int.cpp new file mode 100644 index 0000000000..faa8c42cbe --- /dev/null +++ b/library/cpp/eventlog/eventlog_int.cpp @@ -0,0 +1,12 @@ +#include "eventlog_int.h" + +#include <util/string/cast.h> + +TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str) { + EEventLogFormat format; + if (TryFromString(str, format)) { + return static_cast<TEventLogFormat>(format); + } else { + return {}; + } +} diff --git a/library/cpp/eventlog/eventlog_int.h b/library/cpp/eventlog/eventlog_int.h new file mode 100644 index 0000000000..eb00fecfab --- /dev/null +++ b/library/cpp/eventlog/eventlog_int.h @@ -0,0 +1,72 @@ +#pragma once + +#include <util/stream/output.h> +#include <util/generic/maybe.h> +#include <util/generic/utility.h> +#include <util/generic/yexception.h> +#include <util/ysaveload.h> + +using TEventClass = ui32; +using TEventLogFormat = ui32; +using TEventTimestamp = ui64; + +constexpr TStringBuf COMPRESSED_LOG_FRAME_SYNC_DATA = + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + "\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + "\x00\x00\x00\x00\xfe\x00\x00\xff\xff\x00\x00\xff\xff\x00" + "\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff\xff\x00\x00\xff" + "\xff\x00\x00\xff\xff\x00\x00\xff"sv; + +static_assert(COMPRESSED_LOG_FRAME_SYNC_DATA.size() == 64); + +/* + * Коды форматов логов. Форматом лога считается формат служебных + * структур лога. К примеру формат заголовка, наличие компрессии, и т.д. + * Имеет значение только 1 младший байт. + */ + +enum EEventLogFormat : TEventLogFormat { + // Формат версии 1. Используется компрессор LZQ. + COMPRESSED_LOG_FORMAT_V1 = 1, + + // Формат версии 2. Используется компрессор ZLIB. Добавлены CRC заголовка и данных, + // поле типа компрессора. + COMPRESSED_LOG_FORMAT_V2 = 2, + + // Формат версии 3. Используется компрессор ZLIB. В начинке фреймов перед каждым событием добавлен его размер. + COMPRESSED_LOG_FORMAT_V3 = 3, + + // Lz4hc codec + zlib + COMPRESSED_LOG_FORMAT_V4 = 4 /* "zlib_lz4" */, + + // zstd + COMPRESSED_LOG_FORMAT_V5 = 5 /* "zstd" */, +}; + +TMaybe<TEventLogFormat> ParseEventLogFormat(TStringBuf str); + +#pragma pack(push, 1) + +struct TCompressedFrameBaseHeader { + TEventLogFormat Format; + ui32 Length; // Длина остатка фрейма в байтах, после этого заголовка + ui32 FrameId; +}; + +struct TCompressedFrameHeader { + TEventTimestamp StartTimestamp; + TEventTimestamp EndTimestamp; + ui32 UncompressedDatalen; // Длина данных, которые были закомпрессированы + ui32 PayloadChecksum; // В логе версии 1 поле не используется +}; + +struct TCompressedFrameHeader2: public TCompressedFrameHeader { + ui8 CompressorVersion; // Сейчас не используется + ui32 HeaderChecksum; +}; + +#pragma pack(pop) + +Y_DECLARE_PODTYPE(TCompressedFrameBaseHeader); +Y_DECLARE_PODTYPE(TCompressedFrameHeader); +Y_DECLARE_PODTYPE(TCompressedFrameHeader2); diff --git a/library/cpp/eventlog/events_extension.h b/library/cpp/eventlog/events_extension.h new file mode 100644 index 0000000000..0cf062f959 --- /dev/null +++ b/library/cpp/eventlog/events_extension.h @@ -0,0 +1,161 @@ +#pragma once + +#include "event_field_output.h" + +#include <google/protobuf/descriptor.h> +#include <google/protobuf/message.h> + +#include <library/cpp/threading/atomic/bool.h> +#include <library/cpp/string_utils/base64/base64.h> + +#include <util/generic/map.h> +#include <util/generic/deque.h> +#include <util/generic/singleton.h> +#include <util/string/hex.h> +#include <util/system/guard.h> +#include <util/system/mutex.h> + +namespace NProtoBuf { + class TEventFactory { + public: + typedef ::google::protobuf::Message Message; + typedef void (*TEventSerializer)(const Message* event, IOutputStream& output, EFieldOutputFlags flags); + typedef void (*TRegistrationFunc)(); + + private: + class TFactoryItem { + public: + TFactoryItem(const Message* prototype, const TEventSerializer serializer) + : Prototype_(prototype) + , Serializer_(serializer) + { + } + + TStringBuf GetName() const { + return Prototype_->GetDescriptor()->name(); + } + + Message* Create() const { + return Prototype_->New(); + } + + void PrintEvent(const Message* event, IOutputStream& out, EFieldOutputFlags flags) const { + (*Serializer_)(event, out, flags); + } + + private: + const Message* Prototype_; + const TEventSerializer Serializer_; + }; + + typedef TMap<size_t, TFactoryItem> TFactoryMap; + + public: + TEventFactory() + : FactoryItems_() + { + } + + void ScheduleRegistration(TRegistrationFunc func) { + EventRegistrators_.push_back(func); + } + + void RegisterEvent(size_t eventId, const Message* prototype, const TEventSerializer serializer) { + FactoryItems_.insert(std::make_pair(eventId, TFactoryItem(prototype, serializer))); + } + + size_t IdByName(TStringBuf eventname) { + DelayedRegistration(); + for (TFactoryMap::const_iterator it = FactoryItems_.begin(); it != FactoryItems_.end(); ++it) { + if (it->second.GetName() == eventname) + return it->first; + } + + ythrow yexception() << "do not know event '" << eventname << "'"; + } + + TStringBuf NameById(size_t id) { + DelayedRegistration(); + TFactoryMap::const_iterator it = FactoryItems_.find(id); + return it != FactoryItems_.end() ? it->second.GetName() : TStringBuf(); + } + + Message* CreateEvent(size_t eventId) { + DelayedRegistration(); + TFactoryMap::const_iterator it = FactoryItems_.find(eventId); + + if (it != FactoryItems_.end()) { + return it->second.Create(); + } + + return nullptr; + } + + const TMap<size_t, TFactoryItem>& FactoryItems() { + DelayedRegistration(); + return FactoryItems_; + } + + void PrintEvent( + size_t eventId, + const Message* event, + IOutputStream& output, + EFieldOutputFlags flags = {}) { + DelayedRegistration(); + TFactoryMap::const_iterator it = FactoryItems_.find(eventId); + + if (it != FactoryItems_.end()) { + it->second.PrintEvent(event, output, flags); + } + } + + static TEventFactory* Instance() { + return Singleton<TEventFactory>(); + } + + private: + void DelayedRegistration() { + if (!DelayedRegistrationDone_) { + TGuard<TMutex> guard(MutexEventRegistrators_); + Y_UNUSED(guard); + while (!EventRegistrators_.empty()) { + EventRegistrators_.front()(); + EventRegistrators_.pop_front(); + } + DelayedRegistrationDone_ = true; + } + } + + private: + TMap<size_t, TFactoryItem> FactoryItems_; + TDeque<TRegistrationFunc> EventRegistrators_; + NAtomic::TBool DelayedRegistrationDone_ = false; + TMutex MutexEventRegistrators_; + }; + + template <typename T> + void PrintAsBytes(const T& obj, IOutputStream& output) { + const ui8* b = reinterpret_cast<const ui8*>(&obj); + const ui8* e = b + sizeof(T); + const char* delim = ""; + + while (b != e) { + output << delim; + output << (int)*b++; + delim = "."; + } + } + + template <typename T> + void PrintAsHex(const T& obj, IOutputStream& output) { + output << "0x"; + output << HexEncode(&obj, sizeof(T)); + } + + inline void PrintAsBase64(TStringBuf data, IOutputStream& output) { + if (!data.empty()) { + output << Base64Encode(data); + } + } + +} diff --git a/library/cpp/eventlog/iterator.cpp b/library/cpp/eventlog/iterator.cpp new file mode 100644 index 0000000000..71f955bca8 --- /dev/null +++ b/library/cpp/eventlog/iterator.cpp @@ -0,0 +1,88 @@ +#include "iterator.h" + +#include <library/cpp/streams/growing_file_input/growing_file_input.h> + +#include <util/string/cast.h> +#include <util/string/split.h> +#include <util/string/type.h> +#include <util/stream/file.h> + +using namespace NEventLog; + +namespace { + inline TIntrusivePtr<TEventFilter> ConstructEventFilter(bool enableEvents, const TString& evList, IEventFactory* fac) { + if (evList.empty()) { + return nullptr; + } + + TVector<TString> events; + + StringSplitter(evList).Split(',').SkipEmpty().Collect(&events); + if (events.empty()) { + return nullptr; + } + + TIntrusivePtr<TEventFilter> filter(new TEventFilter(enableEvents)); + + for (const auto& event : events) { + if (IsNumber(event)) + filter->AddEventClass(FromString<size_t>(event)); + else + filter->AddEventClass(fac->ClassByName(event)); + } + + return filter; + } + + struct TIterator: public IIterator { + inline TIterator(const TOptions& o, IEventFactory* fac) + : First(true) + { + if (o.FileName.size()) { + if (o.ForceStreamMode || o.TailFMode) { + FileInput.Reset(o.TailFMode ? (IInputStream*)new TGrowingFileInput(o.FileName) : (IInputStream*)new TUnbufferedFileInput(o.FileName)); + FrameStream.Reset(new TFrameStreamer(*FileInput, fac, o.FrameFilter)); + } else { + FrameStream.Reset(new TFrameStreamer(o.FileName, o.StartTime, o.EndTime, o.MaxRequestDuration, fac, o.FrameFilter)); + } + } else { + FrameStream.Reset(new TFrameStreamer(*o.Input, fac, o.FrameFilter)); + } + + EvFilter = ConstructEventFilter(o.EnableEvents, o.EvList, fac); + EventStream.Reset(new TEventStreamer(*FrameStream, o.StartTime, o.EndTime, o.ForceStrongOrdering, EvFilter, o.ForceLosslessStrongOrdering)); + } + + TConstEventPtr Next() override { + if (First) { + First = false; + + if (!EventStream->Avail()) { + return nullptr; + } + } else { + if (!EventStream->Next()) { + return nullptr; + } + } + + return **EventStream; + } + + THolder<IInputStream> FileInput; + THolder<TFrameStreamer> FrameStream; + TIntrusivePtr<TEventFilter> EvFilter; + THolder<TEventStreamer> EventStream; + bool First; + }; +} + +IIterator::~IIterator() = default; + +THolder<IIterator> NEventLog::CreateIterator(const TOptions& o, IEventFactory* fac) { + return MakeHolder<TIterator>(o, fac); +} + +THolder<IIterator> NEventLog::CreateIterator(const TOptions& o) { + return MakeHolder<TIterator>(o, NEvClass::Factory()); +} diff --git a/library/cpp/eventlog/iterator.h b/library/cpp/eventlog/iterator.h new file mode 100644 index 0000000000..71a61ed549 --- /dev/null +++ b/library/cpp/eventlog/iterator.h @@ -0,0 +1,51 @@ +#pragma once + +#include <util/stream/input.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/generic/iterator.h> + +#include "eventlog.h" +#include "logparser.h" + +namespace NEventLog { + struct TOptions { + inline TOptions& SetFileName(const TString& fileName) { + FileName = fileName; + + return *this; + } + + inline TOptions& SetForceStrongOrdering(bool v) { + if(!ForceLosslessStrongOrdering) { + ForceStrongOrdering = v; + } + + return *this; + } + + ui64 StartTime = MIN_START_TIME; + ui64 EndTime = MAX_END_TIME; + ui64 MaxRequestDuration = MAX_REQUEST_DURATION; + TString FileName; + bool ForceStrongOrdering = false; + bool ForceWeakOrdering = false; + bool EnableEvents = true; + TString EvList; + bool ForceStreamMode = false; + bool ForceLosslessStrongOrdering = false; + bool TailFMode = false; + IInputStream* Input = &Cin; + IFrameFilterRef FrameFilter; + }; + + class IIterator: public TInputRangeAdaptor<IIterator> { + public: + virtual ~IIterator(); + + virtual TConstEventPtr Next() = 0; + }; + + THolder<IIterator> CreateIterator(const TOptions& o); + THolder<IIterator> CreateIterator(const TOptions& o, IEventFactory* fac); +} diff --git a/library/cpp/eventlog/logparser.cpp b/library/cpp/eventlog/logparser.cpp new file mode 100644 index 0000000000..6f8959f788 --- /dev/null +++ b/library/cpp/eventlog/logparser.cpp @@ -0,0 +1,814 @@ +#include "logparser.h" +#include "evdecoder.h" + +#include <util/stream/output.h> +#include <util/stream/zlib.h> +#include <util/digest/murmur.h> +#include <util/generic/algorithm.h> +#include <util/generic/scope.h> +#include <util/generic/hash_set.h> +#include <util/string/split.h> +#include <util/string/cast.h> +#include <util/string/escape.h> +#include <util/string/builder.h> + +#include <contrib/libs/re2/re2/re2.h> + +#include <algorithm> +#include <array> + +namespace { + bool FastforwardUntilSyncHeader(IInputStream* in) { + // Usually this function finds the correct header at the first hit + std::array<char, COMPRESSED_LOG_FRAME_SYNC_DATA.size()> buffer; + if (in->Load(buffer.data(), buffer.size()) != buffer.size()) { + return false; + } + + auto begin = buffer.begin(); + + for (;;) { + if (std::mismatch( + begin, buffer.end(), + COMPRESSED_LOG_FRAME_SYNC_DATA.begin()).first == buffer.end() && + std::mismatch( + buffer.begin(), begin, + COMPRESSED_LOG_FRAME_SYNC_DATA.begin() + (buffer.end() - begin)).first == begin) { + return true; + } + if (!in->ReadChar(*begin)) { + return false; + } + ++begin; + if (begin == buffer.end()) { + begin = buffer.begin(); + } + } + } + + bool HasCorrectChecksum(const TFrameHeader& header) { + // Calculating hash over all the fields of the read header except for the field with the hash of the header itself. + const size_t baseSize = sizeof(TCompressedFrameBaseHeader) + sizeof(TCompressedFrameHeader2) - sizeof(ui32); + const ui32 checksum = MurmurHash<ui32>(&header.Basehdr, baseSize); + return checksum == header.Framehdr.HeaderChecksum; + } + + TMaybe<TFrameHeader> FindNextFrameHeader(IInputStream* in) { + for (;;) { + if (FastforwardUntilSyncHeader(in)) { + try { + return TFrameHeader(*in); + } catch (const TFrameLoadError& err) { + Cdbg << err.what() << Endl; + in->Skip(err.SkipAfter); + } + } else { + return Nothing(); + } + } + } + + std::pair<TMaybe<TFrameHeader>, TStringBuf> FindNextFrameHeader(TStringBuf span) { + for (;;) { + auto iter = std::search( + span.begin(), span.end(), + COMPRESSED_LOG_FRAME_SYNC_DATA.begin(), COMPRESSED_LOG_FRAME_SYNC_DATA.end()); + const size_t offset = iter - span.begin(); + + if (offset != span.size()) { + span = span.substr(offset); + try { + TMemoryInput in( + span.data() + COMPRESSED_LOG_FRAME_SYNC_DATA.size(), + span.size() - COMPRESSED_LOG_FRAME_SYNC_DATA.size()); + return {TFrameHeader(in), span}; + } catch (const TFrameLoadError& err) { + Cdbg << err.what() << Endl; + span = span.substr(err.SkipAfter); + } + } else { + return {Nothing(), {}}; + } + } + } + + size_t FindFrames(const TStringBuf span, ui64 start, ui64 end, ui64 maxRequestDuration) { + Y_ENSURE(start <= end); + + const auto leftTimeBound = start - Min(start, maxRequestDuration); + const auto rightTimeBound = end + Min(maxRequestDuration, Max<ui64>() - end); + + TStringBuf subspan = span; + TMaybe<TFrameHeader> maybeLeftFrame; + std::tie(maybeLeftFrame, subspan) = FindNextFrameHeader(subspan); + + if (!maybeLeftFrame || maybeLeftFrame->EndTime() > rightTimeBound) { + return span.size(); + } + + if (maybeLeftFrame->StartTime() > leftTimeBound) { + return 0; + } + + while (subspan.size() > maybeLeftFrame->FullLength()) { + const auto mid = subspan.data() + subspan.size() / 2; + auto [midFrame, rightHalfSpan] = FindNextFrameHeader({mid, subspan.data() + subspan.size()}); + if (!midFrame) { + // If mid is in the middle of the last frame, here we will lose it meaning that + // we will find previous frame as the result. + // This is fine because we will iterate frames starting from that. + subspan = subspan.substr(0, subspan.size() / 2); + continue; + } + if (midFrame->StartTime() <= leftTimeBound) { + maybeLeftFrame = midFrame; + subspan = rightHalfSpan; + } else { + subspan = subspan.substr(0, subspan.size() / 2); + } + } + + return subspan.data() - span.data(); + } +} + +TFrameHeader::TFrameHeader(IInputStream& in) { + try { + ::Load(&in, Basehdr); + + Y_ENSURE(Basehdr.Length, "Empty frame additional data"); + + ::Load(&in, Framehdr); + switch (LogFormat()) { + case COMPRESSED_LOG_FORMAT_V1: + break; + + case COMPRESSED_LOG_FORMAT_V2: + case COMPRESSED_LOG_FORMAT_V3: + case COMPRESSED_LOG_FORMAT_V4: + case COMPRESSED_LOG_FORMAT_V5: + Y_ENSURE(!Framehdr.CompressorVersion, "Wrong compressor"); + + Y_ENSURE(HasCorrectChecksum(*this), "Wrong header checksum"); + break; + + default: + ythrow yexception() << "Unsupported log structure format"; + }; + + Y_ENSURE(Framehdr.StartTimestamp <= Framehdr.EndTimestamp, "Wrong start/end timestamps"); + + // Each frame must contain at least one event. + Y_ENSURE(Framehdr.UncompressedDatalen, "Empty frame payload"); + } catch (...) { + TString location = ""; + if (const auto* cnt = dynamic_cast<TCountingInput *>(&in)) { + location = "@ " + ToString(cnt->Counter()); + } + ythrow TFrameLoadError(FrameLength()) << "Frame Load Error" << location << ": " << CurrentExceptionMessage(); + } +} + +TFrame::TFrame(IInputStream& in, TFrameHeader header, IEventFactory* fac) + : TFrameHeader(header) + , Limiter_(MakeHolder<TLengthLimitedInput>(&in, header.FrameLength())) + , Fac_(fac) +{ + if (auto* cnt = dynamic_cast<TCountingInput *>(&in)) { + Address_ = cnt->Counter() - sizeof(TFrameHeader); + } else { + Address_ = 0; + } +} + +TFrame::TIterator TFrame::GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter) const { + if (EventsCache_.empty()) { + for (TFrameDecoder decoder{*this, eventFilter.Get()}; decoder.Avail(); decoder.Next()) { + EventsCache_.emplace_back(*decoder); + } + } + + return TIterator(*this, eventFilter); +} + +void TFrame::ClearEventsCache() const { + EventsCache_.clear(); +} + +TString TFrame::GetCompressedFrame() const { + const auto left = Limiter_->Left(); + TString payload = Limiter_->ReadAll(); + Y_ENSURE(payload.size() == left, "Could not read frame payload: premature end of stream"); + const ui32 checksum = MurmurHash<ui32>(payload.data(), payload.size()); + Y_ENSURE(checksum == Framehdr.PayloadChecksum, "Invalid frame checksum"); + + return payload; +} + +TString TFrame::GetRawFrame() const { + TString frameBuf = GetCompressedFrame(); + TStringInput sin(frameBuf); + return TZLibDecompress{&sin}.ReadAll(); +} + +TFrame::TIterator::TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter) + : Frame_(frame) + , Size_(frame.EventsCache_.size()) + , Filter_(filter) + , Index_(0) +{ + SkipToValidEvent(); +} + +TConstEventPtr TFrame::TIterator::operator*() const { + return Frame_.GetEvent(Index_); +} + +bool TFrame::TIterator::Next() { + Index_++; + SkipToValidEvent(); + return Index_ < Size_; +} + +void TFrame::TIterator::SkipToValidEvent() { + if (!Filter_) { + return; + } + + for (; Index_ < Size_; ++Index_) { + if (Filter_->EventAllowed(Frame_.GetEvent(Index_)->Class)) { + break; + } + } +} + +TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory* eventFactory) { + if (auto header = FindNextFrameHeader(in)) { + return TFrame{*in, *header, eventFactory}; + } else { + return Nothing(); + } +} + +TContainsEventFrameFilter::TContainsEventFrameFilter(const TString& unparsedMatchGroups, const IEventFactory* eventFactory) { + TVector<TStringBuf> tokens; + + SplitWithEscaping(tokens, unparsedMatchGroups, "/"); + + // Amount of match groups + size_t size = tokens.size(); + MatchGroups.resize(size); + + for (size_t i = 0; i < size; i++) { + TMatchGroup& group = MatchGroups[i]; + TVector<TStringBuf> groupTokens; + SplitWithEscaping(groupTokens, tokens[i], ":"); + + Y_ENSURE(groupTokens.size() == 3); + + try { + group.EventID = eventFactory->ClassByName(groupTokens[0]); + } catch (yexception& e) { + if (!TryFromString<TEventClass>(groupTokens[0], group.EventID)) { + e << "\nAppend:\n" << "Cannot derive EventId from EventType: " << groupTokens[0]; + throw e; + } + } + + group.FieldName = groupTokens[1]; + group.ValueToMatch = UnescapeCharacters(groupTokens[2], "/:"); + } +} + +bool TContainsEventFrameFilter::FrameAllowed(const TFrame& frame) const { + THashSet<size_t> toMatchSet; + for (size_t i = 0; i < MatchGroups.size(); i++) { + toMatchSet.insert(i); + } + + for (auto it = frame.GetIterator(); it.Avail(); it.Next()) { + TConstEventPtr event(*it); + TVector<size_t> indicesToErase; + + if (!toMatchSet.empty()) { + const NProtoBuf::Message* message = event->GetProto(); + const google::protobuf::Descriptor* descriptor = message->GetDescriptor(); + const google::protobuf::Reflection* reflection = message->GetReflection(); + + Y_ENSURE(descriptor); + Y_ENSURE(reflection); + + for (size_t groupIndex : toMatchSet) { + const TMatchGroup& group = MatchGroups[groupIndex]; + + if (event->Class == group.EventID) { + TVector<TString> parts = StringSplitter(group.FieldName).Split('.').ToList<TString>(); + TString lastPart = std::move(parts.back()); + parts.pop_back(); + + for (auto part : parts) { + auto fieldDescriptor = descriptor->FindFieldByName(part); + Y_ENSURE(fieldDescriptor, "Cannot find field \"" + part + "\". Full fieldname is \"" + group.FieldName + "\"."); + + message = &reflection->GetMessage(*message, fieldDescriptor); + descriptor = message->GetDescriptor(); + reflection = message->GetReflection(); + + Y_ENSURE(descriptor); + Y_ENSURE(reflection); + } + + const google::protobuf::FieldDescriptor* fieldDescriptor = descriptor->FindFieldByName(lastPart); + Y_ENSURE(fieldDescriptor, "Cannot find field \"" + lastPart + "\". Full fieldname is \"" + group.FieldName + "\"."); + + TString fieldValue = GetEventFieldAsString(message, fieldDescriptor, reflection); + if (re2::RE2::FullMatch(fieldValue, group.ValueToMatch)) { + indicesToErase.push_back(groupIndex); + } + } + } + + for (size_t idx : indicesToErase) { + toMatchSet.erase(idx); + } + + if (toMatchSet.empty()) { + return true; + } + } + } + + return toMatchSet.empty(); +} + +void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet) { + size_t tokenStart = 0; + const TString characterSet = TString::Join("\\", externalCharacterSet); + + for (size_t position = stringToSplit.find_first_of(characterSet); position != TString::npos; position = stringToSplit.find_first_of(characterSet, position + 1)) { + if (stringToSplit[position] == '\\') { + position++; + } else { + if (tokenStart != position) { + tokens.push_back(TStringBuf(stringToSplit, tokenStart, position - tokenStart)); + } + tokenStart = position + 1; + } + } + + if (tokenStart < stringToSplit.size()) { + tokens.push_back(TStringBuf(stringToSplit, tokenStart, stringToSplit.size() - tokenStart)); + } +} + +TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet) { + TStringBuilder stringBuilder; + size_t tokenStart = 0; + + for (size_t position = stringToUnescape.find('\\', 0u); position != TString::npos; position = stringToUnescape.find('\\', position + 2)) { + if (position + 1 < stringToUnescape.size() && characterSet.find(stringToUnescape[position + 1]) != TString::npos) { + stringBuilder << TStringBuf(stringToUnescape, tokenStart, position - tokenStart); + tokenStart = position + 1; + } + } + + if (tokenStart < stringToUnescape.size()) { + stringBuilder << TStringBuf(stringToUnescape, tokenStart, stringToUnescape.size() - tokenStart); + } + + return stringBuilder; +} + +TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection) { + Y_ENSURE(message); + Y_ENSURE(fieldDescriptor); + Y_ENSURE(reflection); + + TString result; + switch (fieldDescriptor->type()) { + case google::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: + result = ToString(reflection->GetDouble(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_FLOAT: + result = ToString(reflection->GetFloat(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_BOOL: + result = ToString(reflection->GetBool(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_INT32: + result = ToString(reflection->GetInt32(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_UINT32: + result = ToString(reflection->GetUInt32(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_INT64: + result = ToString(reflection->GetInt64(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_UINT64: + result = ToString(reflection->GetUInt64(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_STRING: + result = ToString(reflection->GetString(*message, fieldDescriptor)); + break; + case google::protobuf::FieldDescriptor::Type::TYPE_ENUM: + { + const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = reflection->GetEnum(*message, fieldDescriptor); + result = ToString(enumValueDescriptor->name()); + } + break; + default: + throw yexception() << "GetEventFieldAsString for type " << fieldDescriptor->type_name() << " is not implemented."; + } + return result; +} + +TFrameStreamer::TFrameStreamer(IInputStream& s, IEventFactory* fac, IFrameFilterRef ff) + : In_(&s) + , FrameFilter_(ff) + , EventFactory_(fac) +{ + Frame_ = FindNextFrame(&In_, EventFactory_); + + SkipToAllowedFrame(); +} + +TFrameStreamer::TFrameStreamer( + const TString& fileName, + ui64 startTime, + ui64 endTime, + ui64 maxRequestDuration, + IEventFactory* fac, + IFrameFilterRef ff) + : File_(TBlob::FromFile(fileName)) + , MemoryIn_(File_.Data(), File_.Size()) + , In_(&MemoryIn_) + , StartTime_(startTime) + , EndTime_(endTime) + , CutoffTime_(endTime + Min(maxRequestDuration, Max<ui64>() - endTime)) + , FrameFilter_(ff) + , EventFactory_(fac) +{ + In_.Skip(FindFrames(File_.AsStringBuf(), startTime, endTime, maxRequestDuration)); + Frame_ = FindNextFrame(&In_, fac); + SkipToAllowedFrame(); +} + +TFrameStreamer::~TFrameStreamer() = default; + +bool TFrameStreamer::Avail() const { + return Frame_.Defined(); +} + +const TFrame& TFrameStreamer::operator*() const { + Y_ENSURE(Frame_, "Frame streamer depleted"); + + return *Frame_; +} + +bool TFrameStreamer::Next() { + DoNext(); + SkipToAllowedFrame(); + + return Frame_.Defined(); +} + +bool TFrameStreamer::AllowedTimeRange(const TFrame& frame) const { + const bool allowedStartTime = (StartTime_ == 0) || ((StartTime_ <= frame.StartTime()) && (frame.StartTime() <= EndTime_)); + const bool allowedEndTime = (EndTime_ == 0) || ((StartTime_ <= frame.EndTime()) && (frame.EndTime() <= EndTime_)); + return allowedStartTime || allowedEndTime; +} + +bool TFrameStreamer::DoNext() { + if (!Frame_) { + return false; + } + In_.Skip(Frame_->Limiter_->Left()); + Frame_ = FindNextFrame(&In_, EventFactory_); + + if (Frame_ && CutoffTime_ > 0 && Frame_->EndTime() > CutoffTime_) { + Frame_.Clear(); + } + + return Frame_.Defined(); +} + +namespace { + struct TDecodeBuffer { + TDecodeBuffer(const TString codec, IInputStream& src, size_t bs) { + TBuffer from(bs); + + { + TBufferOutput b(from); + TransferData(&src, &b); + } + + NBlockCodecs::Codec(codec)->Decode(from, DecodeBuffer); + } + + explicit TDecodeBuffer(IInputStream& src) { + TBufferOutput b(DecodeBuffer); + TransferData(&src, &b); + } + + TBuffer DecodeBuffer; + }; + + class TBlockCodecStream: private TDecodeBuffer, public TBufferInput { + public: + TBlockCodecStream(const TString codec, IInputStream& src, size_t bs) + : TDecodeBuffer(codec, src, bs) + , TBufferInput(DecodeBuffer) + {} + + explicit TBlockCodecStream(IInputStream& src) + : TDecodeBuffer(src) + , TBufferInput(DecodeBuffer) + {} + }; +} + +TFrameDecoder::TFrameDecoder(const TFrame& fr, const TEventFilter* const filter, bool strict, bool withRawData) + : Frame_(fr) + , Event_(nullptr) + , Flt_(filter) + , Fac_(fr.Fac_) + , EndOfFrame_(new TEndOfFrameEvent(Frame_.EndTime())) + , Strict_(strict) + , WithRawData_(withRawData) +{ + switch (fr.LogFormat()) { + case COMPRESSED_LOG_FORMAT_V2: + case COMPRESSED_LOG_FORMAT_V3: + case COMPRESSED_LOG_FORMAT_V4: + case COMPRESSED_LOG_FORMAT_V5: { + const auto payload = fr.GetCompressedFrame(); + TMemoryInput payloadInput{payload}; + + if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V5) { + Decompressor_.Reset(new TBlockCodecStream("zstd_1", payloadInput, payload.size())); + } else { + TZLibDecompress zlib(&payloadInput); + Decompressor_.Reset(new TBlockCodecStream(zlib)); + if (fr.LogFormat() == COMPRESSED_LOG_FORMAT_V4) { + Decompressor_.Reset(new TBlockCodecStream("lz4hc", *Decompressor_, payload.size())); + } + } + + break; + } + + default: + ythrow yexception() << "unsupported log format: " << fr.LogFormat() << Endl; + break; + }; + + if (WithRawData_) { + TBufferOutput out(UncompressedData_); + TLengthLimitedInput limiter(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen); + + TransferData(&limiter, &out); + Decompressor_.Reset(new TMemoryInput(UncompressedData_.data(), UncompressedData_.size())); + } + + Limiter_.Reset(new TLengthLimitedInput(Decompressor_.Get(), fr.Framehdr.UncompressedDatalen)); + + Decode(); +} + +TFrameDecoder::~TFrameDecoder() = default; + +bool TFrameDecoder::Avail() const { + return HaveData(); +} + +TConstEventPtr TFrameDecoder::operator*() const { + Y_ENSURE(HaveData(), "Decoder depleted"); + + return Event_; +} + +bool TFrameDecoder::Next() { + if (HaveData()) { + Decode(); + } + + return HaveData(); +} + +void TFrameDecoder::Decode() { + Event_ = nullptr; + const bool framed = (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V3) || (Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V4 || Frame_.LogFormat() == COMPRESSED_LOG_FORMAT_V5); + + size_t evBegin = 0; + size_t evEnd = 0; + if (WithRawData_) + evBegin = UncompressedData_.Size() - Limiter_->Left(); + + while (Limiter_->Left() && !(Event_ = DecodeEvent(*Limiter_, framed, Frame_.Address(), Flt_, Fac_, Strict_).Release())) { + } + + if (WithRawData_) { + evEnd = UncompressedData_.Size() - Limiter_->Left(); + RawEventData_ = TStringBuf(UncompressedData_.data() + evBegin, UncompressedData_.data() + evEnd); + } + + if (!Event_ && (!Flt_ || (Flt_->EventAllowed(TEndOfFrameEvent::EventClass)))) { + Event_ = EndOfFrame_.Release(); + } + + if (!!Event_) { + Event_->FrameId = Frame_.FrameId(); + } +} + +const TStringBuf TFrameDecoder::GetRawEvent() const { + return RawEventData_; +} + +TEventStreamer::TEventStreamer(TFrameStream& fs, ui64 s, ui64 e, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering) + : Frames_(fs) + , Start_(s) + , End_(e) + , MaxEndTimestamp_(0) + , Frontier_(0) + , StrongOrdering_(strongOrdering) + , LosslessStrongOrdering_(losslessStrongOrdering) + , EventFilter_(filter) +{ + + if (Start_ > End_) { + ythrow yexception() << "Wrong main interval"; + } + + TEventStreamer::Next(); +} + +TEventStreamer::~TEventStreamer() = default; + +bool TEventStreamer::Avail() const { + return Events_.Avail() && (*Events_)->Timestamp <= Frontier_; +} + +TConstEventPtr TEventStreamer::operator*() const { + Y_ENSURE(TEventStreamer::Avail(), "Event streamer depleted"); + + return *Events_; +} + +bool TEventStreamer::Next() { + if (Events_.Avail() && Events_.Next() && (*Events_)->Timestamp <= Frontier_) { + return true; + } + + for (;;) { + if (!LoadMoreEvents()) { + return false; + } + + if (TEventStreamer::Avail()) { + return true; + } + } +} + +/* +Two parameters are used in the function: +Frontier - the moment of time up to which inclusively all the log events made their way + into the buffer (and might have been already extracted out of it). +Horizon - the moment of time, that equals to Frontier + MAX_REQUEST_DURATION. +In order to get all the log events up to the Frontier inclusively, + frames need to be read until "end time" of the current frame exceeds the Horizon. +*/ +bool TEventStreamer::LoadMoreEvents() { + if (!Frames_.Avail()) { + return false; + } + + const TFrame& fr1 = *Frames_; + const ui64 maxRequestDuration = (StrongOrdering_ ? MAX_REQUEST_DURATION : 0); + + if (fr1.EndTime() <= Frontier_ + maxRequestDuration) { + ythrow yexception() << "Wrong frame stream state"; + } + + if (Frontier_ >= End_) { + return false; + } + + const ui64 old_frontier = Frontier_; + Frontier_ = fr1.EndTime(); + + { + Y_DEFER { + Events_.Reorder(StrongOrdering_); + }; + + for (; Frames_.Avail(); Frames_.Next()) { + const TFrame& fr2 = *Frames_; + + // Frames need to start later than the Frontier. + if (StrongOrdering_ && fr2.StartTime() <= old_frontier) { + Cdbg << "Invalid frame encountered" << Endl; + continue; + } + + if (fr2.EndTime() > MaxEndTimestamp_) { + MaxEndTimestamp_ = fr2.EndTime(); + } + + if (fr2.EndTime() > Frontier_ + maxRequestDuration && !LosslessStrongOrdering_) { + return true; + } + + // Checking for the frame to be within the main time borders. + if (fr2.EndTime() >= Start_ && fr2.StartTime() <= End_) { + TransferEvents(fr2); + } + } + } + + Frontier_ = MaxEndTimestamp_; + + return true; +} + +void TEventStreamer::TransferEvents(const TFrame& fr) { + Events_.SetCheckpoint(); + + try { + for (auto it = fr.GetIterator(EventFilter_); it.Avail(); it.Next()) { + TConstEventPtr ev = *it; + + if (ev->Timestamp > fr.EndTime() || ev->Timestamp < fr.StartTime()) { + ythrow TInvalidEventTimestamps() << "Event timestamp out of frame range"; + } + + if (ev->Timestamp >= Start_ && ev->Timestamp <= End_) { + Events_.Append(ev, StrongOrdering_); + } + } + } catch (const TInvalidEventTimestamps& err) { + Events_.Rollback(); + Cdbg << "EventsTransfer error: InvalidEventTimestamps: " << err.what() << Endl; + } catch (const TFrameLoadError& err) { + Events_.Rollback(); + Cdbg << "EventsTransfer error: " << err.what() << Endl; + } catch (const TEventDecoderError& err) { + Events_.Rollback(); + Cdbg << "EventsTransfer error: EventDecoder error: " << err.what() << Endl; + } catch (const TZLibDecompressorError& err) { + Events_.Rollback(); + Cdbg << "EventsTransfer error: ZLibDecompressor error: " << err.what() << Endl; + } catch (...) { + Events_.Rollback(); + throw; + } +} + +void TEventStreamer::TEventBuffer::SetCheckpoint() { + BufLen_ = Buffer_.size(); +} + +void TEventStreamer::TEventBuffer::Rollback() { + Buffer_.resize(BufLen_); +} + +void TEventStreamer::TEventBuffer::Reorder(bool strongOrdering) { + SetCheckpoint(); + + std::reverse(Buffer_.begin(), Buffer_.end()); + + if (strongOrdering) { + StableSort(Buffer_.begin(), Buffer_.end(), [&](const auto& a, const auto& b) { + return (a->Timestamp > b->Timestamp) || + ((a->Timestamp == b->Timestamp) && !a->Class && b->Class); + }); + } +} + +void TEventStreamer::TEventBuffer::Append(TConstEventPtr ev, bool strongOrdering) { + // Events in buffer output must be in an ascending order. + Y_ENSURE(!strongOrdering || ev->Timestamp >= LastTimestamp_, "Trying to append out-of-order event"); + + Buffer_.push_back(std::move(ev)); +} + +bool TEventStreamer::TEventBuffer::Avail() const { + return !Buffer_.empty(); +} + +TConstEventPtr TEventStreamer::TEventBuffer::operator*() const { + Y_ENSURE(!Buffer_.empty(), "Event buffer is empty"); + + return Buffer_.back(); +} + +bool TEventStreamer::TEventBuffer::Next() { + if (!Buffer_.empty()) { + LastTimestamp_ = Buffer_.back()->Timestamp; + Buffer_.pop_back(); + return !Buffer_.empty(); + } else { + return false; + } +} diff --git a/library/cpp/eventlog/logparser.h b/library/cpp/eventlog/logparser.h new file mode 100644 index 0000000000..f819e72589 --- /dev/null +++ b/library/cpp/eventlog/logparser.h @@ -0,0 +1,343 @@ +#pragma once + +#include <util/generic/ptr.h> +#include <util/generic/yexception.h> +#include <util/generic/vector.h> +#include <util/generic/set.h> +#include <util/generic/maybe.h> +#include <util/memory/blob.h> +#include <util/stream/length.h> +#include <util/stream/mem.h> + +#include "eventlog_int.h" +#include "eventlog.h" +#include "common.h" + +class IInputStream; + +static const ui64 MAX_REQUEST_DURATION = 60'000'000; +static const ui64 MIN_START_TIME = MAX_REQUEST_DURATION; +static const ui64 MAX_END_TIME = ((ui64)-1) - MAX_REQUEST_DURATION; + +class TEventFilter: public TSet<TEventClass>, public TSimpleRefCount<TEventFilter> { +public: + TEventFilter(bool enableEvents) + : Enable_(enableEvents) + { + } + + void AddEventClass(TEventClass cls) { + insert(cls); + } + + bool EventAllowed(TEventClass cls) const { + bool found = (find(cls) != end()); + + return Enable_ == found; + } + +private: + bool Enable_; +}; + +using TEventStream = TPacketInputStream<TConstEventPtr>; + +struct TFrameHeader { + // Reads header from the stream. The caller must make sure that the + // sync data is present just befor the stream position. + explicit TFrameHeader(IInputStream& in); + + ui64 StartTime() const { + return Framehdr.StartTimestamp; + } + + ui64 EndTime() const { + return Framehdr.EndTimestamp; + } + + ui32 FrameId() const { + return Basehdr.FrameId; + } + + ui64 Duration() const { + return EndTime() - StartTime(); + } + + TEventLogFormat ContentFormat() const { + return Basehdr.Format & 0xffffff; + } + + TEventLogFormat LogFormat() const { + return Basehdr.Format >> 24; + } + + ui64 FrameLength() const { + return Basehdr.Length - sizeof(TCompressedFrameHeader2); + } + + // Length including the header + ui64 FullLength() const { + return sizeof(*this) + FrameLength(); + } + + TCompressedFrameBaseHeader Basehdr; + TCompressedFrameHeader2 Framehdr; +}; + +struct TFrameLoadError: public yexception { + explicit TFrameLoadError(size_t skipAfter) + : SkipAfter(skipAfter) + {} + + size_t SkipAfter; +}; + +class TFrame : public TFrameHeader { +public: + // Reads the frame after the header has been read. + TFrame(IInputStream& in, TFrameHeader header, IEventFactory*); + + TString GetRawFrame() const; + TString GetCompressedFrame() const; + + ui64 Address() const { return Address_; } + +private: + const TConstEventPtr& GetEvent(size_t index) const { + return EventsCache_[index]; + } + + void ClearEventsCache() const; + + THolder<TLengthLimitedInput> Limiter_; + mutable TVector<TConstEventPtr> EventsCache_; + + IEventFactory* Fac_; + ui64 Address_; + + friend class TFrameDecoder; + friend class TFrameStreamer; + +private: + class TIterator: TEventStream { + public: + TIterator(const TFrame& frame, TIntrusiveConstPtr<TEventFilter> filter); + ~TIterator() override = default; + + bool Avail() const override { + return Index_ < Size_; + } + + TConstEventPtr operator*() const override; + bool Next() override; + + private: + void SkipToValidEvent(); + + const TFrame& Frame_; + size_t Size_; + TIntrusiveConstPtr<TEventFilter> Filter_; + size_t Index_; + }; + +public: + TFrame::TIterator GetIterator(TIntrusiveConstPtr<TEventFilter> eventFilter = nullptr) const; +}; + +// If `in` is derived from TCountingInput, Frame's address will +// be set accorting to the in->Counter(). Otherwise it will be zeroO +TMaybe<TFrame> FindNextFrame(IInputStream* in, IEventFactory*); + +using TFrameStream = TPacketInputStream<const TFrame&>; + +class IFrameFilter: public TSimpleRefCount<IFrameFilter> { +public: + IFrameFilter() { + } + + virtual ~IFrameFilter() = default; + + virtual bool FrameAllowed(const TFrame& frame) const = 0; +}; + +using IFrameFilterRef = TIntrusivePtr<IFrameFilter>; + +class TDurationFrameFilter: public IFrameFilter { +public: + TDurationFrameFilter(ui64 minFrameDuration, ui64 maxFrameDuration = Max<ui64>()) + : MinDuration_(minFrameDuration) + , MaxDuration_(maxFrameDuration) + { + } + + bool FrameAllowed(const TFrame& frame) const override { + return frame.Duration() >= MinDuration_ && frame.Duration() <= MaxDuration_; + } + +private: + const ui64 MinDuration_; + const ui64 MaxDuration_; +}; + +class TFrameIdFrameFilter: public IFrameFilter { +public: + TFrameIdFrameFilter(ui32 frameId) + : FrameId_(frameId) + { + } + + bool FrameAllowed(const TFrame& frame) const override { + return frame.FrameId() == FrameId_; + } + +private: + const ui32 FrameId_; +}; + +class TContainsEventFrameFilter: public IFrameFilter { +public: + TContainsEventFrameFilter(const TString& args, const IEventFactory* fac); + + bool FrameAllowed(const TFrame& frame) const override; + +private: + struct TMatchGroup { + TEventClass EventID; + TString FieldName; + TString ValueToMatch; + }; + + TVector<TMatchGroup> MatchGroups; +}; + +void SplitWithEscaping(TVector<TStringBuf>& tokens, const TStringBuf& stringToSplit, const TStringBuf& externalCharacterSet); + +TString UnescapeCharacters(const TStringBuf& stringToUnescape, const TStringBuf& characterSet); + +TString GetEventFieldAsString(const NProtoBuf::Message* message, const google::protobuf::FieldDescriptor* fieldDescriptor, const google::protobuf::Reflection* reflection); + +class TFrameStreamer: public TFrameStream { +public: + TFrameStreamer(IInputStream&, IEventFactory* fac, IFrameFilterRef ff = nullptr); + TFrameStreamer( + const TString& fileName, + ui64 startTime, + ui64 endTime, + ui64 maxRequestDuration, + IEventFactory* fac, + IFrameFilterRef ff = nullptr); + ~TFrameStreamer() override; + + bool Avail() const override; + const TFrame& operator*() const override; + bool Next() override; + +private: + bool DoNext(); + bool AllowedTimeRange(const TFrame& frame) const; + + bool AllowedFrame(const TFrame& frame) const { + return AllowedTimeRange(frame) && (!FrameFilter_ || FrameFilter_->FrameAllowed(frame)); + } + + void SkipToAllowedFrame() { + if (Frame_) { + while (!AllowedFrame(*Frame_) && DoNext()) { + //do nothing + } + } + } + + TBlob File_; + TMemoryInput MemoryIn_; + TCountingInput In_; + THolder<IInputStream> Stream_; + ui64 StartTime_ = 0; + ui64 EndTime_ = 0; + ui64 CutoffTime_ = 0; + TMaybe<TFrame> Frame_; + IFrameFilterRef FrameFilter_; + IEventFactory* EventFactory_; +}; + +class TFrameDecoder: TEventStream { +public: + TFrameDecoder(const TFrame&, const TEventFilter* const filter, bool strict = false, bool withRawData = false); + ~TFrameDecoder() override; + + bool Avail() const override; + + TConstEventPtr operator*() const override; + bool Next() override; + + const TStringBuf GetRawEvent() const; + +private: + TFrameDecoder(const TFrameDecoder&); + void operator=(const TFrameDecoder&); + + inline bool HaveData() const { + return Event_ != nullptr; + } + + void Decode(); + +private: + const TFrame& Frame_; + THolder<IInputStream> Decompressor_; + THolder<TLengthLimitedInput> Limiter_; + TEventPtr Event_; + const TEventFilter* const Flt_; + IEventFactory* Fac_; + THolder<TEvent> EndOfFrame_; + bool Strict_; + TBuffer UncompressedData_; + TStringBuf RawEventData_; + bool WithRawData_; +}; + +class TEventStreamer: public TEventStream { +public: + TEventStreamer(TFrameStream&, ui64 start, ui64 end, bool strongOrdering, TIntrusivePtr<TEventFilter> filter, bool losslessStrongOrdering = false); + ~TEventStreamer() override; + + bool Avail() const override; + TConstEventPtr operator*() const override; + bool Next() override; + +private: + class TEventBuffer: public TEventStream { + public: + void SetCheckpoint(); + void Rollback(); + void Reorder(bool strongOrdering); + void Append(TConstEventPtr event, bool strongOrdering); + + bool Avail() const override; + TConstEventPtr operator*() const override; + bool Next() override; + + private: + TVector<TConstEventPtr> Buffer_; + size_t BufLen_ = 0; + ui64 LastTimestamp_ = 0; + }; + +private: + struct TInvalidEventTimestamps: public yexception { + }; + + bool LoadMoreEvents(); + void TransferEvents(const TFrame&); + +private: + TFrameStream& Frames_; + TEventBuffer Events_; + + ui64 Start_, End_; + ui64 MaxEndTimestamp_; + ui64 Frontier_; + bool StrongOrdering_; + bool LosslessStrongOrdering_; + TIntrusivePtr<TEventFilter> EventFilter_; +}; diff --git a/library/cpp/eventlog/proto/events_extension.proto b/library/cpp/eventlog/proto/events_extension.proto new file mode 100644 index 0000000000..7db1af3a59 --- /dev/null +++ b/library/cpp/eventlog/proto/events_extension.proto @@ -0,0 +1,22 @@ +import "google/protobuf/descriptor.proto"; + +option go_package = "github.com/ydb-platform/ydb/library/cpp/eventlog/proto;extensions"; +option java_package = "NEventLogEventsExtension"; + +extend google.protobuf.MessageOptions { + optional uint32 message_id = 50001; + optional string realm_name = 50002; +} + +message Repr { + enum ReprType { + none = 0; + as_bytes = 1; // Only for primitive types + as_hex = 2; // Only for primitive types + as_base64 = 3; // Only for 'string' and 'bytes' fields + }; +} + +extend google.protobuf.FieldOptions { + optional Repr.ReprType repr = 55003 [default = none]; +} diff --git a/library/cpp/eventlog/proto/internal.proto b/library/cpp/eventlog/proto/internal.proto new file mode 100644 index 0000000000..8070a09685 --- /dev/null +++ b/library/cpp/eventlog/proto/internal.proto @@ -0,0 +1,9 @@ +option go_package = "github.com/ydb-platform/ydb/library/cpp/eventlog/proto;extensions"; + +package NEventLogInternal; + +message TUnknownEvent { +}; + +message TEndOfFrameEvent { +}; diff --git a/library/cpp/eventlog/proto/ya.make b/library/cpp/eventlog/proto/ya.make new file mode 100644 index 0000000000..fbf5a6c619 --- /dev/null +++ b/library/cpp/eventlog/proto/ya.make @@ -0,0 +1,12 @@ +PROTO_LIBRARY() + +IF (NOT PY_PROTOS_FOR) + INCLUDE_TAGS(GO_PROTO) +ENDIF() + +SRCS( + events_extension.proto + internal.proto +) + +END() diff --git a/library/cpp/eventlog/threaded_eventlog.cpp b/library/cpp/eventlog/threaded_eventlog.cpp new file mode 100644 index 0000000000..67839063fb --- /dev/null +++ b/library/cpp/eventlog/threaded_eventlog.cpp @@ -0,0 +1 @@ +#include "threaded_eventlog.h" diff --git a/library/cpp/eventlog/threaded_eventlog.h b/library/cpp/eventlog/threaded_eventlog.h new file mode 100644 index 0000000000..52382b856d --- /dev/null +++ b/library/cpp/eventlog/threaded_eventlog.h @@ -0,0 +1,154 @@ +#pragma once + +#include "eventlog.h" + +#include <util/generic/string.h> +#include <util/thread/pool.h> + +class TThreadedEventLog: public TEventLogWithSlave { +public: + class TWrapper; + using TOverflowCallback = std::function<void(TWrapper& wrapper)>; + + enum class EDegradationResult { + ShouldWrite, + ShouldDrop, + }; + using TDegradationCallback = std::function<EDegradationResult(float fillFactor)>; + +public: + TThreadedEventLog( + IEventLog& parentLog, + size_t threadCount, + size_t queueSize, + TOverflowCallback cb, + TDegradationCallback degradationCallback = {}) + : TEventLogWithSlave(parentLog) + , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog")) + , ThreadCount(threadCount) + , QueueSize(queueSize) + , OverflowCallback(std::move(cb)) + , DegradationCallback(std::move(degradationCallback)) + { + Init(); + } + + TThreadedEventLog( + const TEventLogPtr& parentLog, + size_t threadCount, + size_t queueSize, + TOverflowCallback cb, + TDegradationCallback degradationCallback = {}) + : TEventLogWithSlave(parentLog) + , LogSaver(TThreadPoolParams().SetThreadName("ThreadedEventLog")) + , ThreadCount(threadCount) + , QueueSize(queueSize) + , OverflowCallback(std::move(cb)) + , DegradationCallback(std::move(degradationCallback)) + { + Init(); + } + + TThreadedEventLog(IEventLog& parentLog) + : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback()) + { + } + + TThreadedEventLog(const TEventLogPtr& parentLog) + : TThreadedEventLog(parentLog, 1, 0, TOverflowCallback()) + { + } + + ~TThreadedEventLog() override { + try { + LogSaver.Stop(); + } catch (...) { + } + } + + void ReopenLog() override { + TEventLogWithSlave::ReopenLog(); + } + + void CloseLog() override { + LogSaver.Stop(); + TEventLogWithSlave::CloseLog(); + } + + void WriteFrame(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) override { + float fillFactor = 0.0f; + if (Y_LIKELY(LogSaver.GetMaxQueueSize() > 0)) { + fillFactor = static_cast<float>(LogSaver.Size()) / LogSaver.GetMaxQueueSize(); + } + + EDegradationResult status = EDegradationResult::ShouldWrite; + if (DegradationCallback) { + status = DegradationCallback(fillFactor); + } + if (Y_UNLIKELY(status == EDegradationResult::ShouldDrop)) { + return; + } + + THolder<TWrapper> wrapped; + wrapped.Reset(new TWrapper(buffer, startTimestamp, endTimestamp, Slave(), writeFrameCallback, std::move(metaFlags))); + + if (LogSaver.Add(wrapped.Get())) { + Y_UNUSED(wrapped.Release()); + } else if (OverflowCallback) { + OverflowCallback(*wrapped); + } + } + +private: + void Init() { + LogSaver.Start(ThreadCount, QueueSize); + } + +public: + class TWrapper: public IObjectInQueue { + public: + TWrapper(TBuffer& buffer, + TEventTimestamp startTimestamp, + TEventTimestamp endTimestamp, + IEventLog& slave, + TWriteFrameCallbackPtr writeFrameCallback = nullptr, + TLogRecord::TMetaFlags metaFlags = {}) + : StartTimestamp(startTimestamp) + , EndTimestamp(endTimestamp) + , Slave(&slave) + , WriteFrameCallback(writeFrameCallback) + , MetaFlags(std::move(metaFlags)) + { + Buffer.Swap(buffer); + } + + void Process(void*) override { + THolder<TWrapper> holder(this); + + WriteFrame(); + } + + void WriteFrame() { + Slave->WriteFrame(Buffer, StartTimestamp, EndTimestamp, WriteFrameCallback, std::move(MetaFlags)); + } + + private: + TBuffer Buffer; + TEventTimestamp StartTimestamp; + TEventTimestamp EndTimestamp; + IEventLog* Slave; + TWriteFrameCallbackPtr WriteFrameCallback; + TLogRecord::TMetaFlags MetaFlags; + }; + +private: + TThreadPool LogSaver; + const size_t ThreadCount; + const size_t QueueSize; + const TOverflowCallback OverflowCallback; + const TDegradationCallback DegradationCallback; +}; diff --git a/library/cpp/eventlog/ya.make b/library/cpp/eventlog/ya.make new file mode 100644 index 0000000000..fbbc1eff00 --- /dev/null +++ b/library/cpp/eventlog/ya.make @@ -0,0 +1,29 @@ +LIBRARY() + +PEERDIR( + library/cpp/blockcodecs + library/cpp/eventlog/proto + library/cpp/json + library/cpp/logger + library/cpp/protobuf/json + library/cpp/streams/growing_file_input + library/cpp/string_utils/base64 + contrib/libs/re2 +) + +SRCS( + common.h + evdecoder.cpp + event_field_output.cpp + event_field_printer.cpp + eventlog.cpp + eventlog_int.cpp + iterator.cpp + logparser.cpp + threaded_eventlog.cpp +) + +GENERATE_ENUM_SERIALIZATION(eventlog.h) +GENERATE_ENUM_SERIALIZATION(eventlog_int.h) + +END() diff --git a/library/cpp/fieldcalc/field_calc.cpp b/library/cpp/fieldcalc/field_calc.cpp new file mode 100644 index 0000000000..1066b5b5e6 --- /dev/null +++ b/library/cpp/fieldcalc/field_calc.cpp @@ -0,0 +1,1136 @@ +#include <cstdio> + +#include <util/str_stl.h> +#include <util/string/subst.h> +#include <util/string/util.h> +#include <util/string/cast.h> +#include <util/stream/printf.h> + +#include "field_calc_int.h" + +using namespace std; + +enum Operators { + OP_ADD, + OP_SUBSTRACT, + OP_MULTIPLY, + OP_DIVIDE, + OP_MODULUS, + OP_REGEXP, + OP_REGEXP_NOT, + OP_LEFT_SHIFT, + OP_RIGHT_SHIFT, + OP_EQUAL, + OP_NOT_EQUAL, + OP_LESS, + OP_LESS_OR_EQUAL, + OP_GREATER, + OP_GREATER_OR_EQUAL, + OP_XOR, + OP_BITWISE_OR, + OP_BITWISE_AND, + OP_LOGICAL_OR, + OP_LOGICAL_AND, + OP_UNARY_NOT, + OP_UNARY_COMPLEMENT, + OP_UNARY_MINUS, + OP_LOG, + OP_LOG10, + OP_ROUND, + OP_ASSIGN, + OP_QUESTION, + OP_COLON, + + OP_UNKNOWN, +}; + +struct calc_op; + +struct calc_elem { + dump_item item; + char oper; + int op_prio; +}; + +struct calc_op { + dump_item Left, Right; + char Oper; + bool force_long; + bool unary; + bool is_variable; + bool string_op; // TODO -> bitop + + // for local vars + mutable bool calculated; + mutable eval_res_type result; + + calc_op(calc_elem& left, calc_elem& right) + : Left(left.item) + , Right(right.item) + , Oper(right.oper) + , is_variable(false) + , calculated(false) + , result(false) + { + force_long = Oper == OP_XOR || Oper == OP_BITWISE_OR || Oper == OP_BITWISE_AND || + Oper == OP_LOGICAL_OR || Oper == OP_LOGICAL_AND || Oper == OP_UNARY_NOT || + Oper == OP_UNARY_COMPLEMENT || Oper == OP_LEFT_SHIFT || Oper == OP_RIGHT_SHIFT || + Oper == OP_MODULUS; + unary = Oper == OP_UNARY_NOT || Oper == OP_UNARY_COMPLEMENT || Oper == OP_UNARY_MINUS || + Oper == OP_LOG || Oper == OP_LOG10 || Oper == OP_ROUND; + string_op = IsStringType(Left.type) && IsStringType(Right.type) && + (Oper == OP_REGEXP || Oper == OP_REGEXP_NOT || Oper == OP_EQUAL || Oper == OP_NOT_EQUAL || + Oper == OP_LESS || Oper == OP_LESS_OR_EQUAL || Oper == OP_GREATER || Oper == OP_GREATER_OR_EQUAL); + if (Oper == OP_REGEXP || Oper == OP_REGEXP_NOT) { + if (!string_op) + ythrow yexception() << "calc-expr: regexp requested for non-strings"; + ythrow yexception() << "calc-expr: regexps currently not supported"; + } + } + + Y_FORCE_INLINE void eval(const char** dd) const { + if (is_variable) { + if (!calculated) { + do_eval(dd); + calculated = true; + } + } else { + do_eval(dd); + } + } + +private: + Y_FORCE_INLINE void do_eval(const char** dd) const; +}; + +void calc_op::do_eval(const char** dd) const { + eval_res_type left1 = unary ? (eval_res_type) false : Left.eval(dd); + if (Oper == OP_QUESTION) { + left1.to_long(); + if (left1.res_long) { + result = Right.eval(dd); + } else { + result = eval_res_type(); // null + } + return; + } else if (Oper == OP_COLON) { + if (left1.is_null()) { + result = Right.eval(dd); + } else { + result = left1; + } + return; + } + + if (Y_UNLIKELY(string_op)) { + TStringBuf left2 = Left.GetStrBuf(dd); + TStringBuf right2 = Right.GetStrBuf(dd); + switch (Oper) { + case OP_REGEXP: + result = false; + break; + case OP_REGEXP_NOT: + result = false; + break; + case OP_EQUAL: + result = left2 == right2; + break; + case OP_NOT_EQUAL: + result = left2 != right2; + break; + case OP_LESS: + result = left2 < right2; + break; + case OP_LESS_OR_EQUAL: + result = left2 <= right2; + break; + case OP_GREATER: + result = left2 > right2; + break; + case OP_GREATER_OR_EQUAL: + result = left2 >= right2; + break; + default: + assert(false); + } + return; + } + + eval_res_type right1 = Right.eval(dd); + if (force_long) { // logical ops will be all long + left1.to_long(); + right1.to_long(); + } + switch (Oper) { + case OP_ADD: + result = left1 + right1; + break; + case OP_SUBSTRACT: + result = left1 - right1; + break; + case OP_MULTIPLY: + result = left1 * right1; + break; + case OP_DIVIDE: + result = left1 / right1; + break; + case OP_MODULUS: + result = left1.res_long ? left1.res_long % right1.res_long : 0; + break; + case OP_LEFT_SHIFT: + result = left1.res_long << right1.res_long; + break; + case OP_RIGHT_SHIFT: + result = left1.res_long >> right1.res_long; + break; + case OP_EQUAL: + result = left1 == right1; + break; + case OP_NOT_EQUAL: + result = !(left1 == right1); + break; + case OP_LESS: + result = left1 < right1; + break; + case OP_LESS_OR_EQUAL: + result = !(right1 < left1); + break; // <= + case OP_GREATER: + result = right1 < left1; + break; + case OP_GREATER_OR_EQUAL: + result = !(left1 < right1); + break; // >= + case OP_XOR: + result = left1.res_long ^ right1.res_long; + break; + case OP_BITWISE_OR: + result = left1.res_long | right1.res_long; + break; + case OP_BITWISE_AND: + result = left1.res_long & right1.res_long; + break; + case OP_LOGICAL_OR: + result = left1.res_long || right1.res_long; + break; + case OP_LOGICAL_AND: + result = left1.res_long && right1.res_long; + break; + case OP_UNARY_NOT: + result = !right1.res_long; + break; + case OP_UNARY_COMPLEMENT: + result = ~right1.res_long; + break; + case OP_UNARY_MINUS: + result = Minus(right1); + break; + case OP_LOG: + result = Log(right1); + break; + case OP_LOG10: + result = Log10(right1); + break; + case OP_ROUND: + result = Round(right1); + break; + default: + assert(false); + } +} + +namespace { + // copy-paste of fcat(TString) + // we don't want it to be too slow, yet we don't want do slow down our + // main functionality, libc fprintf, even a little + size_t Y_PRINTF_FORMAT(2, 3) fprintf(TString* s, const char* c, ...) { + TStringOutput so(*s); + + va_list params; + va_start(params, c); + const size_t ret = Printf(so, c, params); + va_end(params); + + return ret; + } + size_t Y_PRINTF_FORMAT(2, 3) fprintf(IOutputStream* s, const char* c, ...) { + va_list params; + va_start(params, c); + const size_t ret = Printf(*s, c, params); + va_end(params); + + return ret; + } +} + +template <class TOut> +void dump_item::print(TOut* p, const char** dd) const { + const char* d = dd[pack_id]; + const fake* f = reinterpret_cast<const fake*>(d); + + switch (type) { + case DIT_FAKE_ITEM: + assert(false); + break; + case DIT_MATH_RESULT: + assert(false); + break; // must call eval instead + case DIT_NAME: + assert(false); + break; // no op + + case DIT_BOOL_FIELD: + fprintf(p, *(bool*)(d + field_offset) ? "true" : "false"); + break; + case DIT_UI8_FIELD: + fprintf(p, "%u", *(ui8*)(d + field_offset)); + break; + case DIT_UI16_FIELD: + fprintf(p, "%u", *(ui16*)(d + field_offset)); + break; + case DIT_UI32_FIELD: + fprintf(p, "%u", *(ui32*)(d + field_offset)); + break; + case DIT_I64_FIELD: + fprintf(p, "%" PRId64, *(i64*)(d + field_offset)); + break; + case DIT_UI64_FIELD: + fprintf(p, "%" PRIu64, *(ui64*)(d + field_offset)); + break; + case DIT_FLOAT_FIELD: + fprintf(p, "%.4f", *(float*)(d + field_offset)); + break; + case DIT_DOUBLE_FIELD: + fprintf(p, "%.7f", *(double*)(d + field_offset)); + break; + case DIT_TIME_T32_FIELD: + fprintf(p, "%ld", (long)*(time_t32*)(d + field_offset)); + break; + case DIT_PF16UI32_FIELD: + fprintf(p, "%u", (ui32) * (pf16ui32*)(d + field_offset)); + break; + case DIT_PF16FLOAT_FIELD: + fprintf(p, "%.4f", (float)*(pf16float*)(d + field_offset)); + break; + case DIT_SF16FLOAT_FIELD: + fprintf(p, "%.4f", (float)*(sf16float*)(d + field_offset)); + break; + case DIT_STRING_FIELD: + fprintf(p, "%s", (d + field_offset)); + break; + + case DIT_LONG_CONST: + fprintf(p, "%ld", long_const); + break; + case DIT_FLOAT_CONST: + fprintf(p, "%.4f", float_const); + break; + case DIT_STR_CONST: + fprintf(p, "%.*s", (int)the_buf.size(), the_buf.data()); + break; + + case DIT_INT_FUNCTION: + fprintf(p, "%d", (f->*int_fn)()); + break; + case DIT_FLOAT_FUNCTION: + fprintf(p, "%.4f", (f->*float_fn)()); + break; + case DIT_BOOL_FUNCTION: + fprintf(p, "%d", (f->*bool_fn)()); + break; + case DIT_STR_FUNCTION: + fprintf(p, "%s", (f->*str_fn)()); + break; + case DIT_STRBUF_FUNCTION: + the_buf.clear(); + fprintf(p, "%s", (f->*strbuf_2_fn)(the_buf, nullptr)); + break; + + case DIT_UI8_EXT_FUNCTION: + fprintf(p, "%u", (*ui8_ext_fn)(f)); + break; + case DIT_UI16_EXT_FUNCTION: + fprintf(p, "%u", (*ui16_ext_fn)(f)); + break; + case DIT_UI32_EXT_FUNCTION: + fprintf(p, "%u", (*ui32_ext_fn)(f)); + break; + case DIT_UI64_EXT_FUNCTION: + fprintf(p, "%" PRIu64, (*ui64_ext_fn)(f)); + break; + + case DIT_UI8_ENUM_EQ: + fprintf(p, "%d", *(ui8*)(d + field_offset) == enum_val); + break; + case DIT_UI8_ENUM_SET: + fprintf(p, "%d", !!(*(ui8*)(d + field_offset) & enum_val)); + break; + + case DIT_UI16_ENUM_EQ: + fprintf(p, "%d", *(ui16*)(d + field_offset) == enum_val); + break; + case DIT_UI16_ENUM_SET: + fprintf(p, "%d", !!(*(ui16*)(d + field_offset) & enum_val)); + break; + + case DIT_UI32_ENUM_EQ: + fprintf(p, "%d", *(ui32*)(d + field_offset) == enum_val); + break; + case DIT_UI32_ENUM_SET: + fprintf(p, "%d", !!(*(ui32*)(d + field_offset) & enum_val)); + break; + + case DIT_INT_ENUM_FUNCTION_EQ: + fprintf(p, "%d", (ui32)(f->*int_enum_fn)() == enum_val); + break; + case DIT_INT_ENUM_FUNCTION_SET: + fprintf(p, "%d", !!(ui32)((f->*int_enum_fn)() & enum_val)); + break; + + case DIT_BOOL_FUNC_FIXED_STR: + fprintf(p, "%u", (ui32)(f->*bool_strbuf_fn)(the_buf)); + break; + case DIT_UI8_FUNC_FIXED_STR: + fprintf(p, "%u", (ui32)(f->*ui8_strbuf_fn)(the_buf)); + break; + case DIT_UI16_FUNC_FIXED_STR: + fprintf(p, "%u", (ui32)(f->*ui16_strbuf_fn)(the_buf)); + break; + case DIT_UI32_FUNC_FIXED_STR: + fprintf(p, "%u", (f->*ui32_strbuf_fn)(the_buf)); + break; + case DIT_I64_FUNC_FIXED_STR: + fprintf(p, "%" PRId64, (f->*i64_strbuf_fn)(the_buf)); + break; + case DIT_UI64_FUNC_FIXED_STR: + fprintf(p, "%" PRIu64, (f->*ui64_strbuf_fn)(the_buf)); + break; + case DIT_FLOAT_FUNC_FIXED_STR: + fprintf(p, "%.4f", (f->*float_strbuf_fn)(the_buf)); + break; + case DIT_DOUBLE_FUNC_FIXED_STR: + fprintf(p, "%.7f", (f->*double_strbuf_fn)(the_buf)); + break; + + case DIT_RESOLVE_BY_NAME: + fprintf(p, "%s", (f->*resolve_fn)(the_buf).data()); + break; + + default: + assert(false); + break; + } +} + +// instantiate, just for a case +template void dump_item::print<FILE>(FILE* p, const char** dd) const; +template void dump_item::print<TString>(TString* p, const char** dd) const; +template void dump_item::print<IOutputStream>(IOutputStream* p, const char** dd) const; + +TStringBuf dump_item::GetStrBuf(const char** dd) const { + const char* d = dd[pack_id]; + const fake* f = reinterpret_cast<const fake*>(d); + switch (type) { + case DIT_STRING_FIELD: + return d + field_offset; + case DIT_STR_CONST: + return the_buf; + case DIT_STR_FUNCTION: + return (f->*str_fn)(); + case DIT_STRBUF_FUNCTION: + the_buf.clear(); + return (f->*strbuf_2_fn)(the_buf, nullptr); + case DIT_RESOLVE_BY_NAME: + return (f->*resolve_fn)(the_buf); + default: + assert(false); + return TStringBuf(); + } +} + +// recursive +eval_res_type dump_item::eval(const char** dd) const { + const char* d = dd[pack_id]; + const fake* f = reinterpret_cast<const fake*>(d); + + switch (type) { + case DIT_FAKE_ITEM: + assert(false); + return (long int)0; + case DIT_MATH_RESULT: + this->op->eval(dd); + return this->op->result; + case DIT_NAME: + assert(false); + return (long int)0; + + case DIT_BOOL_FIELD: + return (ui32) * (bool*)(d + field_offset); + case DIT_UI8_FIELD: + return (ui32) * (ui8*)(d + field_offset); + case DIT_UI16_FIELD: + return (ui32) * (ui16*)(d + field_offset); + case DIT_UI32_FIELD: + return (ui32) * (ui32*)(d + field_offset); + case DIT_I64_FIELD: + return (long)*(i64*)(d + field_offset); // TODO: 64 bit support in calculator? + case DIT_UI64_FIELD: + return (long)*(ui64*)(d + field_offset); // TODO: 64 bit support in calculator? + case DIT_FLOAT_FIELD: + return (float)*(float*)(d + field_offset); + case DIT_DOUBLE_FIELD: + return *(double*)(d + field_offset); + case DIT_TIME_T32_FIELD: + return (long)*(time_t32*)(d + field_offset); + case DIT_PF16UI32_FIELD: + return (ui32) * (pf16ui32*)(d + field_offset); + case DIT_PF16FLOAT_FIELD: + return (float)*(pf16float*)(d + field_offset); + case DIT_SF16FLOAT_FIELD: + return (float)*(sf16float*)(d + field_offset); + case DIT_STRING_FIELD: + return !!d[field_offset]; // we don't have any string functions, just 0 if empty + + case DIT_LONG_CONST: + return long_const; + case DIT_FLOAT_CONST: + return float_const; + case DIT_STR_CONST: + return !!the_buf; + + case DIT_INT_FUNCTION: + return (long)(f->*int_fn)(); + case DIT_FLOAT_FUNCTION: + return (float)(f->*float_fn)(); + case DIT_BOOL_FUNCTION: + return (long)(f->*bool_fn)(); + case DIT_STR_FUNCTION: + return !!*(f->*str_fn)(); // string -> int + case DIT_STRBUF_FUNCTION: + the_buf.clear(); + return !!*(f->*strbuf_2_fn)(the_buf, nullptr); // string -> 0/1 + + case DIT_UI8_EXT_FUNCTION: + return (ui32)(*ui8_ext_fn)(f); + case DIT_UI16_EXT_FUNCTION: + return (ui32)(*ui16_ext_fn)(f); + case DIT_UI32_EXT_FUNCTION: + return (ui32)(*ui32_ext_fn)(f); + case DIT_UI64_EXT_FUNCTION: + return (long)(*ui64_ext_fn)(f); // TODO: 64 bit support in calculator? + + case DIT_UI8_ENUM_EQ: + return (ui32)(*(ui8*)(d + field_offset) == enum_val); + case DIT_UI8_ENUM_SET: + return !!(ui32)(*(ui8*)(d + field_offset) & enum_val); + + case DIT_UI16_ENUM_EQ: + return (ui32)(*(ui16*)(d + field_offset) == enum_val); + case DIT_UI16_ENUM_SET: + return !!(ui32)(*(ui16*)(d + field_offset) & enum_val); + + case DIT_UI32_ENUM_EQ: + return (ui32)(*(ui32*)(d + field_offset) == enum_val); + case DIT_UI32_ENUM_SET: + return !!(ui32)(*(ui32*)(d + field_offset) & enum_val); + + case DIT_INT_ENUM_FUNCTION_EQ: + return (ui32)((ui32)(f->*int_enum_fn)() == enum_val); + case DIT_INT_ENUM_FUNCTION_SET: + return !!(ui32)((ui32)(f->*int_enum_fn)() & enum_val); + + case DIT_BOOL_FUNC_FIXED_STR: + return (ui32)(f->*bool_strbuf_fn)(the_buf); + case DIT_UI8_FUNC_FIXED_STR: + return (ui32)(f->*ui8_strbuf_fn)(the_buf); + case DIT_UI16_FUNC_FIXED_STR: + return (ui32)(f->*ui16_strbuf_fn)(the_buf); + case DIT_UI32_FUNC_FIXED_STR: + return (ui32)(f->*ui32_strbuf_fn)(the_buf); + case DIT_I64_FUNC_FIXED_STR: + return (long)(f->*i64_strbuf_fn)(the_buf); + case DIT_UI64_FUNC_FIXED_STR: + return (long)(f->*ui64_strbuf_fn)(the_buf); + case DIT_FLOAT_FUNC_FIXED_STR: + return (float)(f->*float_strbuf_fn)(the_buf); + case DIT_DOUBLE_FUNC_FIXED_STR: + return (double)(f->*double_strbuf_fn)(the_buf); + + case DIT_RESOLVE_BY_NAME: + return !!(f->*resolve_fn)(the_buf); + + default: + assert(false); + break; + } + + // unreached + return eval_res_type(false); +} + +void dump_item::set_arrind(int arrind) { + switch (type) { + case DIT_BOOL_FIELD: + field_offset += arrind * sizeof(bool); + break; + case DIT_UI8_FIELD: + field_offset += arrind * sizeof(ui8); + break; + case DIT_UI16_FIELD: + field_offset += arrind * sizeof(ui16); + break; + case DIT_UI32_FIELD: + field_offset += arrind * sizeof(ui32); + break; + case DIT_I64_FIELD: + field_offset += arrind * sizeof(i64); + break; + case DIT_UI64_FIELD: + field_offset += arrind * sizeof(ui64); + break; + case DIT_FLOAT_FIELD: + field_offset += arrind * sizeof(float); + break; + case DIT_DOUBLE_FIELD: + field_offset += arrind * sizeof(double); + break; + case DIT_TIME_T32_FIELD: + field_offset += arrind * sizeof(time_t32); + break; + case DIT_PF16UI32_FIELD: + field_offset += arrind * sizeof(pf16ui32); + break; + case DIT_PF16FLOAT_FIELD: + field_offset += arrind * sizeof(pf16float); + break; + case DIT_SF16FLOAT_FIELD: + field_offset += arrind * sizeof(sf16float); + break; + default: + break; + } +} + +static str_spn FieldNameChars("a-zA-Z0-9_$", true); +static str_spn MathOpChars("-+=*%/&|<>()!~^?:#", true); +static str_spn SpaceChars("\t\n\r ", true); + +TFieldCalculatorBase::TFieldCalculatorBase() { +} + +TFieldCalculatorBase::~TFieldCalculatorBase() = default; + +bool TFieldCalculatorBase::item_by_name(dump_item& it, const char* name) const { + for (size_t i = 0; i < named_dump_items.size(); i++) { + const named_dump_item* list = named_dump_items[i].first; + size_t sz = named_dump_items[i].second; + for (unsigned int n = 0; n < sz; n++) { + if (!stricmp(name, list[n].name)) { + it = list[n].item; + it.pack_id = i; + return true; + } + } + } + return false; +} + +bool TFieldCalculatorBase::get_local_var(dump_item& dst, char* var_name) { + TMap<const char*, dump_item>::const_iterator it = local_vars.find(var_name); + if (it == local_vars.end()) { + // New local variable + dst.type = DIT_LOCAL_VARIABLE; + dst.local_var_name = pool.append(var_name); + return false; + } else { + dst = it->second; + return true; + } +} + +char* TFieldCalculatorBase::get_field(dump_item& dst, char* s) { + if (!stricmp(s, "name")) { + dst.type = DIT_NAME; + return s + 4; // leave there 0 + } + + if (*s == '"' || *s == '\'') { + char* end = strchr(s + 1, *s); + bool hasEsc = false; + while (end && end > s + 1 && end[-1] == '\\') { + end = strchr(end + 1, *s); + hasEsc = true; + } + if (!end) + ythrow yexception() << "calc-expr: unterminated string constant at " << s; + dst.type = DIT_STR_CONST; + dst.the_buf.assign(s + 1, end); + if (hasEsc) + SubstGlobal(dst.the_buf, *s == '"' ? "\\\"" : "\\'", *s == '"' ? "\"" : "'"); + dst.set_arrind(0); // just for a case + return end + 1; + } + + bool is_number = isdigit((ui8)*s) || (*s == '+' || *s == '-') && isdigit((ui8)s[1]), is_float = false; + char* end = FieldNameChars.cbrk(s + is_number); + if (is_number && *end == '.') { + is_float = true; + end = FieldNameChars.cbrk(end + 1); + } + char* next = SpaceChars.cbrk(end); + int arr_index = 0; + bool has_arr_index = false; + if (*next == '[') { + arr_index = atoi(next + 1); + has_arr_index = true; + next = strchr(next, ']'); + if (!next) + ythrow yexception() << "calc-expr: No closing ']' for '" << s << "'"; + next = SpaceChars.cbrk(next + 1); + } + char end_sav = *end; + *end = 0; + + if (!item_by_name(dst, s)) { + if (!is_number) { + get_local_var(dst, s); + } else if (is_float) { + dst = (float)strtod(s, nullptr); + } else + dst = strtol(s, nullptr, 10); + + dst.pack_id = 0; + *end = end_sav; + return next; + } + + // check array/not array + if (has_arr_index && !dst.is_array_field()) + ythrow yexception() << "calc-expr: field " << s << " is not an array"; + + //if (!has_arr_index && dst.is_array_field()) + // yexception("calc-expr: field %s is array, index required", s); + + if (has_arr_index && (arr_index < 0 || arr_index >= dst.arr_length)) + ythrow yexception() << "calc-expr: array index [" << arr_index << "] is out of range for field " << s << " (length is " << dst.arr_length << ")"; + + *end = end_sav; + dst.set_arrind(arr_index); + return next; +} + +// BEGIN Stack calculator functions +inline char* skipspace(char* c, int& bracket_depth) { + while ((ui8)*c <= ' ' && *c || *c == '(' || *c == ')') { + if (*c == '(') + bracket_depth++; + else if (*c == ')') + bracket_depth--; + c++; + } + return c; +} + +void ensure_defined(const dump_item& item) { + if (item.type == DIT_LOCAL_VARIABLE) { + ythrow yexception() << "Usage of non-defined field or local variable '" << item.local_var_name << "'"; + } +} + +void TFieldCalculatorBase::emit_op(TVector<calc_op>& ops, calc_elem& left, calc_elem& right) { + int out_op = ops.size(); + char oper = right.oper; + ensure_defined(right.item); + if (oper == OP_ASSIGN) { + if (left.item.type != DIT_LOCAL_VARIABLE) { + ythrow yexception() << "Assignment only to local variables is allowed"; + } + if (local_vars.find(left.item.local_var_name) != local_vars.end()) { + ythrow yexception() << "Reassignment to the local variable " << left.item.local_var_name << " is not allowed"; + } + local_vars[left.item.local_var_name] = right.item; + if (right.item.type == DIT_MATH_RESULT) { + calc_ops[right.item.arr_ind].is_variable = true; + } + left = right; + } else { + ensure_defined(left.item); + ops.push_back(calc_op(left, right)); + left.item.type = DIT_MATH_RESULT; + left.item.arr_ind = out_op; + } +} + +inline int get_op_prio(char c) { + switch (c) { + case OP_ASSIGN: + return 1; + case OP_QUESTION: + case OP_COLON: + return 2; + case OP_LOGICAL_OR: + return 3; + case OP_LOGICAL_AND: + return 4; + case OP_BITWISE_OR: + return 5; + case OP_XOR: + return 6; + case OP_BITWISE_AND: + return 7; + case OP_EQUAL: + case OP_NOT_EQUAL: + return 8; + case OP_LESS: + case OP_LESS_OR_EQUAL: + case OP_GREATER: + case OP_GREATER_OR_EQUAL: + return 9; + case OP_LEFT_SHIFT: + case OP_RIGHT_SHIFT: + return 10; + case OP_ADD: + case OP_SUBSTRACT: + return 11; + case OP_MULTIPLY: + case OP_DIVIDE: + case OP_MODULUS: + return 12; + case OP_REGEXP: + case OP_REGEXP_NOT: + return 13; + case OP_UNARY_NOT: + case OP_UNARY_COMPLEMENT: + case OP_UNARY_MINUS: + case OP_LOG: + case OP_LOG10: + case OP_ROUND: + return 14; + default: + return 0; + } +} + +Operators get_oper(char*& c, bool unary_op_near) { + Operators cur_oper = OP_UNKNOWN; + switch (*c++) { + case '&': + if (*c == '&') + cur_oper = OP_LOGICAL_AND, c++; + else + cur_oper = OP_BITWISE_AND; + break; + case '|': + if (*c == '|') + cur_oper = OP_LOGICAL_OR, c++; + else + cur_oper = OP_BITWISE_OR; + break; + case '<': + if (*c == '=') + cur_oper = OP_LESS_OR_EQUAL, c++; + else if (*c == '<') + cur_oper = OP_LEFT_SHIFT, c++; + else + cur_oper = OP_LESS; + break; + case '>': + if (*c == '=') + cur_oper = OP_GREATER_OR_EQUAL, c++; + else if (*c == '>') + cur_oper = OP_RIGHT_SHIFT, c++; + else + cur_oper = OP_GREATER; + break; + case '!': + if (*c == '=') + cur_oper = OP_NOT_EQUAL, c++; + else if (*c == '~') + cur_oper = OP_REGEXP_NOT, c++; + else + cur_oper = OP_UNARY_NOT; + break; + case '=': + if (*c == '=') + cur_oper = OP_EQUAL, c++; + else if (*c == '~') + cur_oper = OP_REGEXP, c++; + else + cur_oper = OP_ASSIGN; + break; + case '-': + if (unary_op_near) + cur_oper = OP_UNARY_MINUS; + else + cur_oper = OP_SUBSTRACT; + break; + case '#': + if (!strncmp(c, "LOG#", 4)) { + cur_oper = OP_LOG; + c += 4; + } else if (!strncmp(c, "LOG10#", 6)) { + cur_oper = OP_LOG10; + c += 6; + } else if (!strncmp(c, "ROUND#", 6)) { + cur_oper = OP_ROUND; + c += 6; + } + break; + case '+': + cur_oper = OP_ADD; + break; + case '*': + cur_oper = OP_MULTIPLY; + break; + case '/': + cur_oper = OP_DIVIDE; + break; + case '%': + cur_oper = OP_MODULUS; + break; + case '^': + cur_oper = OP_XOR; + break; + case '~': + cur_oper = OP_UNARY_COMPLEMENT; + break; + case '?': + cur_oper = OP_QUESTION; + break; + case ':': + cur_oper = OP_COLON; + break; + } + return cur_oper; +} +// END Stack calculator functions + +void TFieldCalculatorBase::Compile(char** field_names, int field_count) { + out_el = 0, out_cond = 0; + autoarray<dump_item>(field_count).swap(printouts); + autoarray<dump_item>(field_count).swap(conditions); + local_vars.clear(); + + // parse arguments into calculator's "pseudo-code" + for (int el = 0; el < field_count; el++) { + char* c = field_names[el]; + bool is_expr = !!*MathOpChars.brk(c), is_cond = *c == '?'; + if (is_cond) + c++; + if (!is_expr && !is_cond) { + get_field(printouts[out_el], c); + ensure_defined(printouts[out_el]); + ++out_el; + continue; + } else { // Stack Calculator + const int maxstack = 64; + calc_elem fstack[maxstack]; // calculator's stack + int bdepth = 0; // brackets depth + int stack_cur = -1; + bool unary_op_near = false; // indicates that the next operator in unary + bool had_assignment_out_of_brackets = false; + int uop_seq = 0; // maintains right-to left order for unary operators + while (*(c = skipspace(c, bdepth))) { + /** https://wiki.yandex.ru/JandeksPoisk/Antispam/OwnersData/attselect#calc */ + //printf("1.%i c = '%s'\n", unary_op_near, c); + Operators cur_oper = OP_UNKNOWN; + int op_prio = 0; + if (stack_cur >= 0) { + cur_oper = get_oper(c, unary_op_near); + op_prio = get_op_prio(cur_oper); + if (!op_prio) + ythrow yexception() << "calc-expr: Unsupported operator '" << c[-1] << "'"; + op_prio += bdepth * 256 + uop_seq; + if (unary_op_near) + uop_seq += 20; + while (op_prio <= fstack[stack_cur].op_prio && stack_cur > 0) { + emit_op(calc_ops, fstack[stack_cur - 1], fstack[stack_cur]); + stack_cur--; + } + } + //printf("2.%i c = '%s'\n", unary_op_near, c); + had_assignment_out_of_brackets |= (bdepth == 0 && cur_oper == OP_ASSIGN); + c = skipspace(c, bdepth); + unary_op_near = *c == '-' && !isdigit((ui8)c[1]) || *c == '~' || (*c == '!' && c[1] != '=') || + !strncmp(c, "#LOG#", 5) || !strncmp(c, "#LOG10#", 7) || !strncmp(c, "#ROUND#", 7); + if (!unary_op_near) + uop_seq = 0; + if (stack_cur >= maxstack - 1) + ythrow yexception() << "calc-expr: Math eval stack overflow!\n"; + stack_cur++; + fstack[stack_cur].oper = cur_oper; + fstack[stack_cur].op_prio = op_prio; + //printf("3.%i c = '%s'\n", unary_op_near, c); + if (unary_op_near) + fstack[stack_cur].item = dump_item(); + else + c = get_field(fstack[stack_cur].item, c); + } + while (stack_cur > 0) { + emit_op(calc_ops, fstack[stack_cur - 1], fstack[stack_cur]); + stack_cur--; + } + ensure_defined(fstack[0].item); + if (is_cond) { + if (had_assignment_out_of_brackets) + ythrow yexception() << "Assignment in condition. (Did you mean '==' instead of '='?)"; + if (fstack[0].item.type != DIT_FAKE_ITEM) // Skip empty conditions: "?()". + conditions[out_cond++] = fstack[0].item; + } else if (!had_assignment_out_of_brackets) { + printouts[out_el++] = fstack[0].item; + } + } + } + // calc_ops will not grow any more, so arr_ind -> op + for (int n = 0; n < out_cond; n++) + conditions[n].rewrite_op(calc_ops.data()); + for (int n = 0; n < out_el; n++) + printouts[n].rewrite_op(calc_ops.data()); + for (auto& local_var : local_vars) { + local_var.second.rewrite_op(calc_ops.data()); + } + for (int n = 0; n < (int)calc_ops.size(); n++) { + calc_ops[n].Left.rewrite_op(calc_ops.data()); + calc_ops[n].Right.rewrite_op(calc_ops.data()); + } +} + +void dump_item::rewrite_op(const calc_op* ops) { + if (type == DIT_MATH_RESULT) + op = ops + arr_ind; +} + +void TFieldCalculatorBase::MarkLocalVarsAsUncalculated() { + for (auto& local_var : local_vars) { + if (local_var.second.type == DIT_MATH_RESULT) { + local_var.second.op->calculated = false; + } + } +} + +bool TFieldCalculatorBase::Cond(const char** d) { + MarkLocalVarsAsUncalculated(); + for (int n = 0; n < out_cond; n++) { + /** https://wiki.yandex.ru/JandeksPoisk/Antispam/OwnersData/attselect#conditions */ + eval_res_type res = conditions[n].eval(d); + bool is_true = res.type == 0 ? !!res.res_ui32 : res.type == 1 ? !!res.res_long : !!res.res_dbl; + if (!is_true) + return false; + } + return true; +} + +bool TFieldCalculatorBase::CondById(const char** d, int condNumber) { + MarkLocalVarsAsUncalculated(); + if (condNumber >= out_cond) + return false; + eval_res_type res = conditions[condNumber].eval(d); + bool is_true = res.type == 0 ? !!res.res_ui32 : res.type == 1 ? !!res.res_long : !!res.res_dbl; + if (!is_true) + return false; + return true; +} + +void TFieldCalculatorBase::Print(FILE* p, const char** d, const char* Name) { + for (int n = 0; n < out_el; n++) { + if (printouts[n].type == DIT_NAME) { + fprintf(p, "%s", Name); + } else if (printouts[n].type == DIT_MATH_RESULT) { // calculate + eval_res_type res = printouts[n].eval(d); + switch (res.type) { + case 0: + fprintf(p, "%u", res.res_ui32); + break; + case 1: + fprintf(p, "%ld", res.res_long); + break; + case 2: + fprintf(p, "%f", res.res_dbl); + break; + } + } else { + printouts[n].print(p, d); + } + fprintf(p, n != out_el - 1 ? "\t" : "\n"); + } +} + +void TFieldCalculatorBase::CalcAll(const char** d, TVector<float>& result) const { + result.clear(); + for (int n = 0; n < out_el; ++n) { + if (printouts[n].type == DIT_MATH_RESULT || printouts[n].type == DIT_FLOAT_FIELD) { + eval_res_type res = printouts[n].eval(d); + result.push_back(res.res_dbl); + } + } +} + +void TFieldCalculatorBase::SelfTest() { + if (out_el < 1) + ythrow yexception() << "Please specify conditions for test mode"; + const char* dummy = ""; + eval_res_type res = printouts[0].eval(&dummy); + switch (res.type) { + case 0: + printf("%u\n", res.res_ui32); + break; + case 1: + printf("%ld\n", res.res_long); + break; + case 2: + printf("%f\n", res.res_dbl); + break; + } +} + +void TFieldCalculatorBase::PrintDiff(const char* rec1, const char* rec2) { + for (size_t n = 0; n < named_dump_items[0].second; n++) { + const dump_item& field = named_dump_items[0].first[n].item; + if (!field.is_field()) + continue; // not really a field + for (int ind = 0, arrsz = field.is_array_field() ? field.arr_length : 1; ind < arrsz; ind++) { + intptr_t sav_field_offset = field.field_offset; + const_cast<dump_item&>(field).set_arrind(ind); + if (field.eval(&rec1) == field.eval(&rec2)) { + const_cast<dump_item&>(field).field_offset = sav_field_offset; + continue; + } + if (field.is_array_field()) + printf("\t%s[%i]: ", named_dump_items[0].first[n].name, ind); + else + printf("\t%s: ", named_dump_items[0].first[n].name); + field.print(stdout, &rec1); + printf(" -> "); + field.print(stdout, &rec2); + const_cast<dump_item&>(field).field_offset = sav_field_offset; + } + } +} + +void TFieldCalculatorBase::DumpAll(IOutputStream& s, const char** d, const TStringBuf& delim) { + bool firstPrinted = false; + for (size_t k = 0; k < named_dump_items.size(); k++) { + const named_dump_item* fields = named_dump_items[k].first; + size_t numFields = named_dump_items[k].second; + const char* obj = d[k]; + for (size_t n = 0; n < numFields; n++) { + const dump_item& field = fields[n].item; + if (!field.is_field()) + continue; + for (int ind = 0, arrsz = field.is_array_field() ? field.arr_length : 1; ind < arrsz; ind++) { + if (firstPrinted) + s << delim; + else + firstPrinted = true; + s << fields[n].name; + if (field.is_array_field()) + Printf(s, "[%i]", ind); + s << "="; + intptr_t sav_field_offset = field.field_offset; + const_cast<dump_item&>(field).set_arrind(ind); + field.print(&s, &obj); + const_cast<dump_item&>(field).field_offset = sav_field_offset; + } + } + } +} diff --git a/library/cpp/fieldcalc/field_calc.h b/library/cpp/fieldcalc/field_calc.h new file mode 100644 index 0000000000..46bf371a60 --- /dev/null +++ b/library/cpp/fieldcalc/field_calc.h @@ -0,0 +1,136 @@ +#pragma once + +#include <cstdio> + +#include <library/cpp/deprecated/autoarray/autoarray.h> +#include <util/generic/map.h> +#include <util/generic/vector.h> +#include <util/memory/segmented_string_pool.h> + +struct dump_item; +struct calc_op; +struct named_dump_item; +struct calc_elem; +class IOutputStream; + +template <class T> +std::pair<const named_dump_item*, size_t> get_named_dump_items(); + +class TFieldCalculatorBase { +private: + segmented_string_pool pool; + void emit_op(TVector<calc_op>& ops, calc_elem& left, calc_elem& right); + void MarkLocalVarsAsUncalculated(); + +protected: + autoarray<dump_item> printouts, conditions; + int out_el, out_cond; + TVector<calc_op> calc_ops; // operands for calculator, indexed by arr_ind for DIT_math_result + + TVector<std::pair<const named_dump_item*, size_t>> named_dump_items; + TMap<const char*, dump_item> local_vars; + + char* get_field(dump_item& dst, char* s); + bool get_local_var(dump_item& dst, char* s); + virtual bool item_by_name(dump_item& it, const char* name) const; + + TFieldCalculatorBase(); + virtual ~TFieldCalculatorBase(); + + bool Cond(const char** d); + bool CondById(const char** d, int condNumber); + void Print(FILE* p, const char** d, const char* Name); + void Compile(char** field_names, int field_count); + void SelfTest(); + void PrintDiff(const char* d1, const char* d2); + void CalcAll(const char** d, TVector<float>& result) const; + void DumpAll(IOutputStream& s, const char** d, const TStringBuf& delim); +}; + +template <class T> +class TFieldCalculator: protected TFieldCalculatorBase { +public: + TFieldCalculator() { + named_dump_items.push_back(get_named_dump_items<T>()); + } + + ~TFieldCalculator() override = default; + + bool Cond(const T& d) { + const char* dd = reinterpret_cast<const char*>(&d); + return TFieldCalculatorBase::Cond(&dd); + } + + bool CondById(const T& d, int condNumber) { + const char* dd = reinterpret_cast<const char*>(&d); + return TFieldCalculatorBase::CondById(&dd, condNumber); + } + + void Print(const T& d, const char* Name) { + const char* dd = reinterpret_cast<const char*>(&d); + return TFieldCalculatorBase::Print(stdout, &dd, Name); + } + + void Print(FILE* p, const T& d, const char* Name) { + const char* dd = reinterpret_cast<const char*>(&d); + return TFieldCalculatorBase::Print(p, &dd, Name); + } + + size_t Compile(char** field_names, int field_count) { + TFieldCalculatorBase::Compile(field_names, field_count); + return out_el; // number of fields printed + } + + void SelfTest() { + return TFieldCalculatorBase::SelfTest(); + } + + void PrintDiff(const T& d1, const T& d2) { + return TFieldCalculatorBase::PrintDiff((const char*)&d1, (const char*)&d2); + } + + void CalcAll(const T& d, TVector<float>& result) const { + const char* dd = reinterpret_cast<const char*>(&d); + return TFieldCalculatorBase::CalcAll(&dd, result); + } + + // it appends to `result', clear it yourself + void DumpAll(IOutputStream& s, const T& d, const TStringBuf& delim) { + const char* dd = reinterpret_cast<const char*>(&d); + return TFieldCalculatorBase::DumpAll(s, &dd, delim); + } +}; + +template <class T, class T2> +class TFieldCalculator2: protected TFieldCalculator<T> { +public: + TFieldCalculator2() { + TFieldCalculator<T>::named_dump_items.push_back(get_named_dump_items<T2>()); + } + + ~TFieldCalculator2() override = default; + + bool Cond(const T& d, const T2& d2) { + const char* dd[2] = {reinterpret_cast<const char*>(&d), reinterpret_cast<const char*>(&d2)}; + return TFieldCalculatorBase::Cond(dd); + } + + bool CondById(const T& d, const T2& d2, int condNumber) { + const char* dd[2] = {reinterpret_cast<const char*>(&d), reinterpret_cast<const char*>(&d2)}; + return TFieldCalculatorBase::CondById(dd, condNumber); + } + + void Print(const T& d, const T2& d2, const char* Name) { + const char* dd[2] = {reinterpret_cast<const char*>(&d), reinterpret_cast<const char*>(&d2)}; + return TFieldCalculatorBase::Print(stdout, dd, Name); + } + + void Print(FILE* p, const T& d, const T2& d2, const char* Name) { + const char* dd[2] = {reinterpret_cast<const char*>(&d), reinterpret_cast<const char*>(&d2)}; + return TFieldCalculatorBase::Print(p, dd, Name); + } + + size_t Compile(char** field_names, int field_count) { + return TFieldCalculator<T>::Compile(field_names, field_count); + } +}; diff --git a/library/cpp/fieldcalc/field_calc_int.h b/library/cpp/fieldcalc/field_calc_int.h new file mode 100644 index 0000000000..5f71fafbda --- /dev/null +++ b/library/cpp/fieldcalc/field_calc_int.h @@ -0,0 +1,593 @@ +#pragma once + +#include <cmath> + +#include <util/system/defaults.h> +#include <util/system/yassert.h> +#include <util/memory/alloc.h> +#include <util/generic/yexception.h> + +#include "lossy_types.h" +#include "field_calc.h" + +// eval_res_type +struct eval_res_type { + union { + ui32 res_ui32; + long res_long; + double res_dbl; + }; + int type; + eval_res_type(ui32 v) + : res_ui32(v) + , type(0) + { + } + eval_res_type(long v) + : res_long(v) + , type(1) + { + } + eval_res_type(bool v) + : res_long(v) + , type(1) + { + } + eval_res_type(double v) + : res_dbl(v) + , type(2) + { + } + // a special null value for ternary operator + explicit eval_res_type() + : type(3) + { + } + operator ui32() const; + operator long() const; + operator double() const; + void to_long(); + bool is_null() const; +}; + +inline bool eval_res_type::is_null() const { + return type == 3; +} + +inline void eval_res_type::to_long() { + if (type == 0) + res_long = res_ui32; + else if (type == 2) + res_long = (long)res_dbl; + type = 1; +} + +inline eval_res_type::operator ui32() const { + assert(type == 0); + return res_ui32; +} + +inline eval_res_type::operator long() const { + assert(type == 0 || type == 1); + return type == 1 ? res_long : res_ui32; +} + +inline eval_res_type::operator double() const { + return type == 2 ? res_dbl : type == 1 ? (double)res_long : (double)res_ui32; +} + +inline eval_res_type operator+(const eval_res_type& a, const eval_res_type& b) { + switch (std::max(a.type, b.type)) { + case 0: + return (ui32)a + (ui32)b; + case 1: + return (long)a + (long)b; + /*case 2*/ default: + return (double)a + (double)b; + } +} + +inline eval_res_type operator-(const eval_res_type& a, const eval_res_type& b) { + switch (std::max(a.type, b.type)) { + case 0: + case 1: + return (long)a - (long)b; + /*case 2*/ default: + return (double)a - (double)b; + } +} + +inline eval_res_type Minus(const eval_res_type& a) { + switch (a.type) { + case 0: + return -(long)a.res_ui32; + case 1: + return -a.res_long; + /*case 2*/ default: + return -a.res_dbl; + } +} + +inline eval_res_type Log(const eval_res_type& a) { + switch (a.type) { + case 0: + return log(a.res_ui32); + case 1: + return log(a.res_long); + /*case 2*/ default: + return log(a.res_dbl); + } +} + +inline eval_res_type Log10(const eval_res_type& a) { + switch (a.type) { + case 0: + return log10(a.res_ui32); + case 1: + return log10(a.res_long); + /*case 2*/ default: + return log10(a.res_dbl); + } +} + +inline eval_res_type Round(const eval_res_type& a) { + switch (a.type) { + case 0: + return a.res_ui32; + case 1: + return a.res_long; + /*case 2*/ default: + return round(a.res_dbl); + } +} + +inline bool operator==(const eval_res_type& a, const eval_res_type& b) { + switch (std::max(a.type, b.type)) { + case 0: + return (ui32)a == (ui32)b; + case 1: + return (long)a == (long)b; + /*case 2*/ default: + return (double)a == (double)b; + } +} + +inline bool operator<(const eval_res_type& a, const eval_res_type& b) { + switch (std::max(a.type, b.type)) { + case 0: + return (ui32)a < (ui32)b; + case 1: + return (long)a < (long)b; + /*case 2*/ default: + return (double)a < (double)b; + } +} + +inline eval_res_type operator*(const eval_res_type& a, const eval_res_type& b) { + switch (std::max(a.type, b.type)) { + case 0: + return (ui32)a * (ui32)b; + case 1: + return (long)a * (long)b; + /*case 2*/ default: + return (double)a * (double)b; + } +} + +inline double operator/(const eval_res_type& a, const eval_res_type& b) { + double a1 = a, b1 = b; + if (b1 == 0) { + if (a1 == 0) + return 0.; // assume that a should be 0 + ythrow yexception() << "Division by zero"; // TODO: show parameter names + } + return a1 / b1; +} + +// dump_item +enum EDumpItemType { + DIT_FAKE_ITEM, // fake item - value never used + DIT_MATH_RESULT, // eval result + DIT_NAME, + + DIT_FIELDS_START, // Start of item types for real fields + + DIT_BOOL_FIELD, + DIT_UI8_FIELD, + DIT_UI16_FIELD, + DIT_UI32_FIELD, + DIT_I64_FIELD, + DIT_UI64_FIELD, + DIT_FLOAT_FIELD, + DIT_DOUBLE_FIELD, + DIT_TIME_T32_FIELD, + DIT_PF16UI32_FIELD, + DIT_PF16FLOAT_FIELD, + DIT_SF16FLOAT_FIELD, + DIT_STRING_FIELD, // new + + DIT_FIELDS_END, // End of item types for real fields + + DIT_LONG_CONST, + DIT_FLOAT_CONST, + DIT_STR_CONST, + + DIT_INT_FUNCTION, + DIT_FLOAT_FUNCTION, + DIT_BOOL_FUNCTION, + DIT_STR_FUNCTION, // new + DIT_STRBUF_FUNCTION, // new + + DIT_UI8_EXT_FUNCTION, + DIT_UI16_EXT_FUNCTION, + DIT_UI32_EXT_FUNCTION, + DIT_UI64_EXT_FUNCTION, + + DIT_UI8_ENUM_EQ, + DIT_UI8_ENUM_SET, + DIT_UI16_ENUM_EQ, + DIT_UI16_ENUM_SET, + DIT_UI32_ENUM_EQ, + DIT_UI32_ENUM_SET, + DIT_INT_ENUM_FUNCTION_EQ, + DIT_INT_ENUM_FUNCTION_SET, + + DIT_BOOL_FUNC_FIXED_STR, + DIT_UI8_FUNC_FIXED_STR, + DIT_UI16_FUNC_FIXED_STR, + DIT_UI32_FUNC_FIXED_STR, + DIT_I64_FUNC_FIXED_STR, + DIT_UI64_FUNC_FIXED_STR, + DIT_FLOAT_FUNC_FIXED_STR, + DIT_DOUBLE_FUNC_FIXED_STR, + + DIT_RESOLVE_BY_NAME, //new - for external functions + + DIT_LOCAL_VARIABLE +}; + +inline bool IsStringType(EDumpItemType type) { + return type == DIT_STRING_FIELD || type == DIT_STR_CONST || type == DIT_STR_FUNCTION || type == DIT_STRBUF_FUNCTION || type == DIT_RESOLVE_BY_NAME; +} + +struct fake {}; + +struct calc_op; + +typedef int (fake::*int_fn_t)() const; +typedef float (fake::*float_fn_t)() const; +typedef bool (fake::*bool_fn_t)() const; +typedef ui16 (fake::*ui16_fn_t)() const; +typedef ui32 (fake::*ui32_fn_t)() const; +typedef bool (fake::*bool_strbuf_fn_t)(const TStringBuf&) const; // string -> bool +typedef ui8 (fake::*ui8_strbuf_fn_t)(const TStringBuf&) const; // string -> ui8 +typedef ui16 (fake::*ui16_strbuf_fn_t)(const TStringBuf&) const; // string -> ui16 +typedef ui32 (fake::*ui32_strbuf_fn_t)(const TStringBuf&) const; // string -> ui32 +typedef i64 (fake::*i64_strbuf_fn_t)(const TStringBuf&) const; // string -> i64 +typedef ui64 (fake::*ui64_strbuf_fn_t)(const TStringBuf&) const; // string -> ui64 +typedef float (fake::*float_strbuf_fn_t)(const TStringBuf&) const; // string -> float +typedef double (fake::*double_strbuf_fn_t)(const TStringBuf&) const; // string -> double +typedef const char* (fake::*str_fn_t)() const; +typedef const char* (fake::*strbuf_2_fn_t)(TString& buf, const char* nul) const; +typedef TStringBuf (fake::*resolve_fn_t)(const TStringBuf&) const; // string -> string, $var -> "value" + +// note: we can not reuse the above signatures, calling conventions may differ +typedef ui8 (*ui8_ext_fn_t)(const fake*); +typedef ui16 (*ui16_ext_fn_t)(const fake*); +typedef ui32 (*ui32_ext_fn_t)(const fake*); +typedef ui64 (*ui64_ext_fn_t)(const fake*); + +struct dump_item { + EDumpItemType type; + int pack_id = 0; + + union { + // fields + intptr_t field_offset; + + // constants + long long_const; + float float_const; + + // functions + int_fn_t int_fn; + float_fn_t float_fn; + bool_fn_t bool_fn; + str_fn_t str_fn; + strbuf_2_fn_t strbuf_2_fn; + resolve_fn_t resolve_fn; + + bool_strbuf_fn_t bool_strbuf_fn; + ui8_strbuf_fn_t ui8_strbuf_fn; + ui16_strbuf_fn_t ui16_strbuf_fn; + ui32_strbuf_fn_t ui32_strbuf_fn; + i64_strbuf_fn_t i64_strbuf_fn; + ui64_strbuf_fn_t ui64_strbuf_fn; + float_strbuf_fn_t float_strbuf_fn; + double_strbuf_fn_t double_strbuf_fn; + + ui8_ext_fn_t ui8_ext_fn; + ui16_ext_fn_t ui16_ext_fn; + ui32_ext_fn_t ui32_ext_fn; + ui64_ext_fn_t ui64_ext_fn; + + // enum + int_fn_t int_enum_fn; + + // for DIT_MATH_RESULT + const calc_op* op; + }; + + // for enum + ui32 enum_val; + + // for local vars, also used to mark accessor functions to use them in dump + const char* local_var_name = nullptr; + + int arr_ind; // externally initialized! + int arr_length; + + mutable TString the_buf; // buffer for string function, string constants also here + + // Ctors + dump_item() + : type(DIT_FAKE_ITEM) + , field_offset(0) + { + } + + dump_item(bool* ptr, int arrlen = 0) + : type(DIT_BOOL_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(ui8* ptr, int arrlen = 0) + : type(DIT_UI8_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(ui16* ptr, int arrlen = 0) + : type(DIT_UI16_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(ui32* ptr, int arrlen = 0) + : type(DIT_UI32_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(i64* ptr, int arrlen = 0) + : type(DIT_I64_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(ui64* ptr, int arrlen = 0) + : type(DIT_UI64_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(float* ptr, int arrlen = 0) + : type(DIT_FLOAT_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(double* ptr, int arrlen = 0) + : type(DIT_DOUBLE_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(time_t32* ptr, int arrlen = 0) + : type(DIT_TIME_T32_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(pf16ui32* ptr, int arrlen = 0) + : type(DIT_PF16UI32_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(pf16float* ptr, int arrlen = 0) + : type(DIT_PF16FLOAT_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(sf16float* ptr, int arrlen = 0) + : type(DIT_SF16FLOAT_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + dump_item(char* ptr, int arrlen = 0) + : type(DIT_STRING_FIELD) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , arr_length(arrlen) + { + } + + dump_item(long val) + : type(DIT_LONG_CONST) + , long_const(val) + { + } + dump_item(float val) + : type(DIT_FLOAT_CONST) + , float_const(val) + { + } + dump_item(TString& val) + : type(DIT_STR_CONST) + , the_buf(val) + { + } + + dump_item(int_fn_t fn) + : type(DIT_INT_FUNCTION) + , int_fn(fn) + { + } + dump_item(float_fn_t fn) + : type(DIT_FLOAT_FUNCTION) + , float_fn(fn) + { + } + dump_item(bool_fn_t fn) + : type(DIT_BOOL_FUNCTION) + , bool_fn(fn) + { + } + dump_item(bool_strbuf_fn_t fn, const char* name) + : type(DIT_BOOL_FUNC_FIXED_STR) + , bool_strbuf_fn(fn) + , the_buf(name) + { + } + dump_item(ui8_strbuf_fn_t fn, const char* name) + : type(DIT_UI8_FUNC_FIXED_STR) + , ui8_strbuf_fn(fn) + , the_buf(name) + { + } + dump_item(ui16_strbuf_fn_t fn, const char* name) + : type(DIT_UI16_FUNC_FIXED_STR) + , ui16_strbuf_fn(fn) + , the_buf(name) + { + } + dump_item(ui32_strbuf_fn_t fn, const char* name) + : type(DIT_UI32_FUNC_FIXED_STR) + , ui32_strbuf_fn(fn) + , the_buf(name) + { + } + dump_item(i64_strbuf_fn_t fn, const char* name) + : type(DIT_I64_FUNC_FIXED_STR) + , i64_strbuf_fn(fn) + , the_buf(name) + { + } + dump_item(ui64_strbuf_fn_t fn, const char* name) + : type(DIT_UI64_FUNC_FIXED_STR) + , ui64_strbuf_fn(fn) + , the_buf(name) + { + } + dump_item(float_strbuf_fn_t fn, const char* name) + : type(DIT_FLOAT_FUNC_FIXED_STR) + , float_strbuf_fn(fn) + , the_buf(name) + { + } + dump_item(double_strbuf_fn_t fn, const char* name) + : type(DIT_DOUBLE_FUNC_FIXED_STR) + , double_strbuf_fn(fn) + , the_buf(name) + { + } + dump_item(str_fn_t fn) + : type(DIT_STR_FUNCTION) + , str_fn(fn) + { + } + dump_item(strbuf_2_fn_t fn) + : type(DIT_STRBUF_FUNCTION) + , strbuf_2_fn(fn) + { + } + + dump_item(ui8_ext_fn_t fn, const char* lvn = nullptr) + : type(DIT_UI8_EXT_FUNCTION) + , ui8_ext_fn(fn) + , local_var_name(lvn) + { + } + dump_item(ui16_ext_fn_t fn, const char* lvn = nullptr) + : type(DIT_UI16_EXT_FUNCTION) + , ui16_ext_fn(fn) + , local_var_name(lvn) + { + } + dump_item(ui32_ext_fn_t fn, const char* lvn = nullptr) + : type(DIT_UI32_EXT_FUNCTION) + , ui32_ext_fn(fn) + , local_var_name(lvn) + { + } + dump_item(ui64_ext_fn_t fn, const char* lvn = nullptr) + : type(DIT_UI64_EXT_FUNCTION) + , ui64_ext_fn(fn) + , local_var_name(lvn) + { + } + + dump_item(ui8* ptr, ui32 val, bool bitset) + : type(bitset ? DIT_UI8_ENUM_SET : DIT_UI8_ENUM_EQ) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , enum_val(val) + { + } + + dump_item(ui16* ptr, ui32 val, bool bitset) + : type(bitset ? DIT_UI16_ENUM_SET : DIT_UI16_ENUM_EQ) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , enum_val(val) + { + } + + dump_item(ui32* ptr, ui32 val, bool bitset) + : type(bitset ? DIT_UI32_ENUM_SET : DIT_UI32_ENUM_EQ) + , field_offset(reinterpret_cast<intptr_t>(ptr)) + , enum_val(val) + { + } + + dump_item(int_fn_t fn, ui32 val, bool bitset) + : type(bitset ? DIT_INT_ENUM_FUNCTION_SET : DIT_INT_ENUM_FUNCTION_EQ) + , int_enum_fn(fn) + , enum_val(val) + { + } + + dump_item(resolve_fn_t fn, const char* name) + : type(DIT_RESOLVE_BY_NAME) + , resolve_fn(fn) + , the_buf(name) + { + } //name of variable saved in the_buf + + // Functions + template <class TOut> // implemented for FILE*, TString* (appends) and IOutputStream* + void print(TOut* p, const char** dd) const; + TStringBuf GetStrBuf(const char** dd) const; // for char-types only! + eval_res_type eval(const char** dd) const; + void set_arrind(int arrind); + void rewrite_op(const calc_op* ops); + + bool is_accessor_func() const { + return type >= DIT_INT_FUNCTION && type <= DIT_UI64_EXT_FUNCTION && local_var_name; + } + + bool is_field() const { + return type > DIT_FIELDS_START && type < DIT_FIELDS_END || is_accessor_func(); + } + + bool is_array_field() const { + return is_field() && arr_length > 0; + } +}; + +// named_dump_item +struct named_dump_item { + const char* name; + dump_item item; +}; diff --git a/library/cpp/fieldcalc/lossy_types.h b/library/cpp/fieldcalc/lossy_types.h new file mode 100644 index 0000000000..98acfea902 --- /dev/null +++ b/library/cpp/fieldcalc/lossy_types.h @@ -0,0 +1,52 @@ +#pragma once + +#include <util/generic/cast.h> + +// although target value is float, this thing is only used as unsigned int container +struct pf16ui32 { + ui16 val; + pf16ui32() + : val(0) + { + } + void operator=(ui32 t) { + val = static_cast<ui16>(BitCast<ui32>(static_cast<float>(t)) >> 15); + } + operator ui32() const { + return (ui32)BitCast<float>((ui32)(val << 15)); + } +}; + +// unsigned float value +struct pf16float { + ui16 val; + pf16float() + : val(0) + { + } + void operator=(float t) { + assert(t >= 0.); + val = static_cast<ui16>(BitCast<ui32>(t) >> 15); + } + operator float() const { + return BitCast<float>((ui32)(val << 15)); + } +}; + +// signed float value +struct sf16float { + ui16 val; + sf16float() + : val(0) + { + } + void operator=(float t) { + assert(t >= 0.); + val = BitCast<ui32>(t) >> 16; + } + operator float() const { + return BitCast<float>((ui32)(val << 16)); + } +}; + +typedef i32 time_t32; // not really lossy, should be placed somewhere else diff --git a/library/cpp/fieldcalc/ya.make b/library/cpp/fieldcalc/ya.make new file mode 100644 index 0000000000..9796592996 --- /dev/null +++ b/library/cpp/fieldcalc/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +PEERDIR( + library/cpp/deprecated/autoarray +) + +SRCS( + field_calc.cpp + lossy_types.h + field_calc_int.h +) + +END() diff --git a/library/cpp/malloc/galloc/malloc-info.cpp b/library/cpp/malloc/galloc/malloc-info.cpp new file mode 100644 index 0000000000..fbcfa7ee06 --- /dev/null +++ b/library/cpp/malloc/galloc/malloc-info.cpp @@ -0,0 +1,9 @@ +#include <library/cpp/malloc/api/malloc.h> + +using namespace NMalloc; + +TMallocInfo NMalloc::MallocInfo() { + TMallocInfo r; + r.Name = "tcmalloc"; + return r; +} diff --git a/library/cpp/malloc/galloc/ya.make b/library/cpp/malloc/galloc/ya.make new file mode 100644 index 0000000000..b6646a6cf6 --- /dev/null +++ b/library/cpp/malloc/galloc/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +NO_UTIL() +ALLOCATOR_IMPL() + +PEERDIR( + library/cpp/malloc/api + contrib/deprecated/galloc +) + +SRCS( + malloc-info.cpp +) + +END() diff --git a/library/cpp/on_disk/multi_blob/multiblob.cpp b/library/cpp/on_disk/multi_blob/multiblob.cpp new file mode 100644 index 0000000000..d92b31e613 --- /dev/null +++ b/library/cpp/on_disk/multi_blob/multiblob.cpp @@ -0,0 +1,67 @@ +#include <util/generic/yexception.h> +#include <util/system/align.h> + +#include <library/cpp/on_disk/chunks/reader.h> + +#include "multiblob.h" + +void TSubBlobs::ReadMultiBlob(const TBlob& multi) { + if (multi.Size() < sizeof(TMultiBlobHeader)) { + ythrow yexception() << "not a blob, too small"; + } + + Multi = multi; + memcpy((void*)&Header, Multi.Data(), sizeof(TMultiBlobHeader)); + + if (Header.BlobMetaSig != BLOBMETASIG) { + if (Header.BlobRecordSig != TMultiBlobHeader::RecordSig) { + if (ReadChunkedData(multi)) + return; + } + ythrow yexception() << "is not a blob, MetaSig was read: " + << Header.BlobMetaSig + << ", must be" << BLOBMETASIG; + } + + if (Header.BlobRecordSig != TMultiBlobHeader::RecordSig) + ythrow yexception() << "unknown multiblob RecordSig " + << Header.BlobRecordSig; + + reserve(size() + Header.Count); + if (Header.Flags & EMF_INTERLAY) { + size_t pos = Header.HeaderSize(); + for (size_t i = 0; i < Header.Count; ++i) { + pos = AlignUp<ui64>(pos, sizeof(ui64)); + ui64 size = *((ui64*)((const char*)multi.Data() + pos)); + pos = AlignUp<ui64>(pos + sizeof(ui64), Header.Align); + push_back(multi.SubBlob(pos, pos + size)); + pos += size; + } + } else { + const ui64* sizes = Header.Sizes(multi.Data()); + size_t pos = Header.HeaderSize() + Header.Count * sizeof(ui64); + for (size_t i = 0; i < Header.Count; ++i) { + pos = AlignUp<ui64>(pos, Header.Align); + push_back(multi.SubBlob(pos, pos + *sizes)); + pos += *sizes; + sizes++; + } + } +} + +bool TSubBlobs::ReadChunkedData(const TBlob& multi) noexcept { + Multi = multi; + memset((void*)&Header, 0, sizeof(Header)); + + TChunkedDataReader reader(Multi); + Header.Count = reader.GetBlocksCount(); + resize(GetHeader()->Count); + for (size_t i = 0; i < size(); ++i) + // We can use TBlob::NoCopy() because of reader.GetBlock(i) returns + // address into memory of multi blob. + // This knowledge was acquired from implementation of + // TChunkedDataReader, so we need care about any changes that. + (*this)[i] = TBlob::NoCopy(reader.GetBlock(i), reader.GetBlockLen(i)); + Header.Flags |= EMF_CHUNKED_DATA_READER; + return true; +} diff --git a/library/cpp/on_disk/multi_blob/multiblob.h b/library/cpp/on_disk/multi_blob/multiblob.h new file mode 100644 index 0000000000..b40a5ae6af --- /dev/null +++ b/library/cpp/on_disk/multi_blob/multiblob.h @@ -0,0 +1,77 @@ +#pragma once + +#include <util/generic/vector.h> +#include <util/memory/blob.h> + +#define BLOBMETASIG 0x3456789Au + +enum E_Multiblob_Flags { + // if EMF_INTERLAY is clear + // multiblob format + // HeaderSize() bytes for TMultiBlobHeader + // Count*sizeof(ui64) bytes for blob sizes + // blob1 + // (alignment) + // blob2 + // (alignment) + // ... + // (alignment) + // blobn + // if EMF_INTERLAY is set + // multiblob format + // HeaderSize() bytes for TMultiBlobHeader + // size1 ui64, the size of 1st blob + // blob1 + // (alignment) + // size2 ui64, the size of 2nd blob + // blob2 + // (alignment) + // ... + // (alignment) + // sizen ui64, the size of n'th blob + // blobn + EMF_INTERLAY = 1, + + // Means that multiblob contains blocks in TChunkedDataReader format + // Legacy, use it only for old files, created for TChunkedDataReader + EMF_CHUNKED_DATA_READER = 2, + + // Flags that may be configured for blobbuilder in client code + EMF_WRITEABLE = EMF_INTERLAY, +}; + +struct TMultiBlobHeader { + // data + ui32 BlobMetaSig; + ui32 BlobRecordSig; + ui64 Count; // count of sub blobs + ui32 Align; // alignment for every subblob + ui32 Flags; + static const ui32 RecordSig = 0x23456789; + static inline size_t HeaderSize() { + return 4 * sizeof(ui64); + } + inline const ui64* Sizes(const void* Data) const { + return (const ui64*)((const char*)Data + HeaderSize()); + } +}; + +class TSubBlobs: public TVector<TBlob> { +public: + TSubBlobs() { + } + TSubBlobs(const TBlob& multi) { + ReadMultiBlob(multi); + } + void ReadMultiBlob(const TBlob& multi); + const TMultiBlobHeader* GetHeader() const { + return (const TMultiBlobHeader*)&Header; + } + +protected: + TMultiBlobHeader Header; + TBlob Multi; + +private: + bool ReadChunkedData(const TBlob& multi) noexcept; +}; diff --git a/library/cpp/on_disk/multi_blob/multiblob_builder.cpp b/library/cpp/on_disk/multi_blob/multiblob_builder.cpp new file mode 100644 index 0000000000..44aa4a6c2f --- /dev/null +++ b/library/cpp/on_disk/multi_blob/multiblob_builder.cpp @@ -0,0 +1,146 @@ +#include <util/memory/tempbuf.h> +#include <util/system/align.h> + +#include "multiblob_builder.h" + +/* + * TBlobSaverMemory + */ +TBlobSaverMemory::TBlobSaverMemory(const void* ptr, size_t size) + : Blob(TBlob::NoCopy(ptr, size)) +{ +} + +TBlobSaverMemory::TBlobSaverMemory(const TBlob& blob) + : Blob(blob) +{ +} + +void TBlobSaverMemory::Save(IOutputStream& output, ui32 /*flags*/) { + output.Write((void*)Blob.Data(), Blob.Length()); +} + +size_t TBlobSaverMemory::GetLength() { + return Blob.Length(); +} + +/* + * TBlobSaverFile + */ + +TBlobSaverFile::TBlobSaverFile(TFile file) + : File(file) +{ + Y_ASSERT(File.IsOpen()); +} + +TBlobSaverFile::TBlobSaverFile(const char* filename, EOpenMode oMode) + : File(filename, oMode) +{ + Y_ASSERT(File.IsOpen()); +} + +void TBlobSaverFile::Save(IOutputStream& output, ui32 /*flags*/) { + TTempBuf buffer(1 << 20); + while (size_t size = File.Read((void*)buffer.Data(), buffer.Size())) + output.Write((void*)buffer.Data(), size); +} + +size_t TBlobSaverFile::GetLength() { + return File.GetLength(); +} + +/* + * TMultiBlobBuilder + */ + +TMultiBlobBuilder::TMultiBlobBuilder(bool isOwn) + : IsOwner(isOwn) +{ +} + +TMultiBlobBuilder::~TMultiBlobBuilder() { + if (IsOwner) + DeleteSubBlobs(); +} + +namespace { + ui64 PadToAlign(IOutputStream& output, ui64 fromPos, ui32 align) { + ui64 toPos = AlignUp<ui64>(fromPos, align); + for (; fromPos < toPos; ++fromPos) { + output << (char)0; + } + return toPos; + } +} + +void TMultiBlobBuilder::Save(IOutputStream& output, ui32 flags) { + TMultiBlobHeader header; + memset((void*)&header, 0, sizeof(header)); + header.BlobMetaSig = BLOBMETASIG; + header.BlobRecordSig = TMultiBlobHeader::RecordSig; + header.Count = Blobs.size(); + header.Align = ALIGN; + header.Flags = flags & EMF_WRITEABLE; + output.Write((void*)&header, sizeof(header)); + for (size_t i = sizeof(header); i < header.HeaderSize(); ++i) + output << (char)0; + ui64 pos = header.HeaderSize(); + if (header.Flags & EMF_INTERLAY) { + for (size_t i = 0; i < Blobs.size(); ++i) { + ui64 size = Blobs[i]->GetLength(); + pos = PadToAlign(output, pos, sizeof(ui64)); // Align size record + output.Write((void*)&size, sizeof(ui64)); + pos = PadToAlign(output, pos + sizeof(ui64), header.Align); // Align blob + Blobs[i]->Save(output, header.Flags); + pos += size; + } + } else { + for (size_t i = 0; i < Blobs.size(); ++i) { + ui64 size = Blobs[i]->GetLength(); + output.Write((void*)&size, sizeof(ui64)); + } + pos += Blobs.size() * sizeof(ui64); + for (size_t i = 0; i < Blobs.size(); ++i) { + pos = PadToAlign(output, pos, header.Align); + Blobs[i]->Save(output, header.Flags); + pos += Blobs[i]->GetLength(); + } + } + // Compensate for imprecise size + for (ui64 len = GetLength(); pos < len; ++pos) { + output << (char)0; + } +} + +size_t TMultiBlobBuilder::GetLength() { + // Sizes may be diferent with and without EMF_INTERLAY, so choose greater of 2 + size_t resNonInter = TMultiBlobHeader::HeaderSize() + Blobs.size() * sizeof(ui64); + size_t resInterlay = TMultiBlobHeader::HeaderSize(); + for (size_t i = 0; i < Blobs.size(); ++i) { + resInterlay = AlignUp<ui64>(resInterlay, sizeof(ui64)) + sizeof(ui64); + resInterlay = AlignUp<ui64>(resInterlay, ALIGN) + Blobs[i]->GetLength(); + resNonInter = AlignUp<ui64>(resNonInter, ALIGN) + Blobs[i]->GetLength(); + } + resInterlay = AlignUp<ui64>(resInterlay, ALIGN); + resNonInter = AlignUp<ui64>(resNonInter, ALIGN); + return Max(resNonInter, resInterlay); +} + +TMultiBlobBuilder::TSavers& TMultiBlobBuilder::GetBlobs() { + return Blobs; +} + +const TMultiBlobBuilder::TSavers& TMultiBlobBuilder::GetBlobs() const { + return Blobs; +} + +void TMultiBlobBuilder::AddBlob(IBlobSaverBase* blob) { + Blobs.push_back(blob); +} + +void TMultiBlobBuilder::DeleteSubBlobs() { + for (size_t i = 0; i < Blobs.size(); ++i) + delete Blobs[i]; + Blobs.clear(); +} diff --git a/library/cpp/on_disk/multi_blob/multiblob_builder.h b/library/cpp/on_disk/multi_blob/multiblob_builder.h new file mode 100644 index 0000000000..a8e3c6d35e --- /dev/null +++ b/library/cpp/on_disk/multi_blob/multiblob_builder.h @@ -0,0 +1,64 @@ +#pragma once + +#include <util/system/align.h> +#include <util/stream/output.h> +#include <util/stream/file.h> +#include <util/draft/holder_vector.h> + +#include "multiblob.h" + +class IBlobSaverBase { +public: + virtual ~IBlobSaverBase() { + } + virtual void Save(IOutputStream& output, ui32 flags = 0) = 0; + virtual size_t GetLength() = 0; +}; + +inline void MultiBlobSave(IOutputStream& output, IBlobSaverBase& saver) { + saver.Save(output); +} + +class TBlobSaverMemory: public IBlobSaverBase { +public: + TBlobSaverMemory(const void* ptr, size_t size); + TBlobSaverMemory(const TBlob& blob); + void Save(IOutputStream& output, ui32 flags = 0) override; + size_t GetLength() override; + +private: + TBlob Blob; +}; + +class TBlobSaverFile: public IBlobSaverBase { +public: + TBlobSaverFile(TFile file); + TBlobSaverFile(const char* filename, EOpenMode oMode = RdOnly); + void Save(IOutputStream& output, ui32 flags = 0) override; + size_t GetLength() override; + +protected: + TFile File; +}; + +class TMultiBlobBuilder: public IBlobSaverBase { +protected: + // Data will be stored with default alignment DEVTOOLS-4548 + static const size_t ALIGN = 16; + +public: + typedef TVector<IBlobSaverBase*> TSavers; + + TMultiBlobBuilder(bool isOwn = true); + ~TMultiBlobBuilder() override; + void Save(IOutputStream& output, ui32 flags = 0) override; + size_t GetLength() override; + TSavers& GetBlobs(); + const TSavers& GetBlobs() const; + void AddBlob(IBlobSaverBase* blob); + void DeleteSubBlobs(); + +protected: + TSavers Blobs; + bool IsOwner; +}; diff --git a/library/cpp/on_disk/multi_blob/ya.make b/library/cpp/on_disk/multi_blob/ya.make new file mode 100644 index 0000000000..50615fc901 --- /dev/null +++ b/library/cpp/on_disk/multi_blob/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + multiblob.cpp + multiblob_builder.cpp +) + +PEERDIR( + library/cpp/on_disk/chunks + util/draft +) + +END() diff --git a/library/cpp/on_disk/st_hash/fake.cpp b/library/cpp/on_disk/st_hash/fake.cpp new file mode 100644 index 0000000000..ef5af4d432 --- /dev/null +++ b/library/cpp/on_disk/st_hash/fake.cpp @@ -0,0 +1,4 @@ +#include "save_stl.h" +#include "static_hash.h" +#include "static_hash_map.h" +#include "sthash_iterators.h" diff --git a/library/cpp/on_disk/st_hash/save_stl.h b/library/cpp/on_disk/st_hash/save_stl.h new file mode 100644 index 0000000000..00f8f0e20d --- /dev/null +++ b/library/cpp/on_disk/st_hash/save_stl.h @@ -0,0 +1,84 @@ +#pragma once + +#include <util/generic/hash.h> +#include <util/system/yassert.h> +#include <util/stream/output.h> + +// this structure might be replaced with sthashtable class +template <class HF, class Eq, class size_type> +struct sthashtable_nvm_sv { + sthashtable_nvm_sv() { + if (sizeof(sthashtable_nvm_sv) != sizeof(HF) + sizeof(Eq) + 3 * sizeof(size_type)) { + memset(this, 0, sizeof(sthashtable_nvm_sv)); + } + } + + sthashtable_nvm_sv(const HF& phf, const Eq& peq, const size_type& pnb, const size_type& pne, const size_type& pnd) + : sthashtable_nvm_sv() + { + hf = phf; + eq = peq; + num_buckets = pnb; + num_elements = pne; + data_end_off = pnd; + } + + HF hf; + Eq eq; + size_type num_buckets; + size_type num_elements; + size_type data_end_off; +}; + +/** + * Some hack to save both THashMap and sthash. + * Working with stHash does not depend on the template parameters, because the content of stHash is not used inside this method. + */ +template <class V, class K, class HF, class Ex, class Eq, class A> +template <class KeySaver> +inline int THashTable<V, K, HF, Ex, Eq, A>::save_for_st(IOutputStream* stream, KeySaver& ks, sthash<int, int, THash<int>, TEqualTo<int>, typename KeySaver::TSizeType>* stHash) const { + Y_ASSERT(!stHash || stHash->bucket_count() == bucket_count()); + typedef sthashtable_nvm_sv<HF, Eq, typename KeySaver::TSizeType> sv_type; + sv_type sv = {this->_get_hash_fun(), this->_get_key_eq(), static_cast<typename KeySaver::TSizeType>(buckets.size()), static_cast<typename KeySaver::TSizeType>(num_elements), 0}; + // to do: m.b. use just the size of corresponding object? + typename KeySaver::TSizeType cur_off = sizeof(sv_type) + + (sv.num_buckets + 1) * sizeof(typename KeySaver::TSizeType); + sv.data_end_off = cur_off; + const_iterator n; + for (n = begin(); n != end(); ++n) { + sv.data_end_off += static_cast<typename KeySaver::TSizeType>(ks.GetRecordSize(*n)); + } + typename KeySaver::TSizeType* sb = stHash ? (typename KeySaver::TSizeType*)(stHash->buckets()) : nullptr; + if (stHash) + sv.data_end_off += static_cast<typename KeySaver::TSizeType>(sb[buckets.size()] - sb[0]); + //saver.Align(sizeof(char*)); + stream->Write(&sv, sizeof(sv)); + + size_type i; + //save vector + for (i = 0; i < buckets.size(); ++i) { + node* cur = buckets[i]; + stream->Write(&cur_off, sizeof(cur_off)); + if (cur) { + while (!((uintptr_t)cur & 1)) { + cur_off += static_cast<typename KeySaver::TSizeType>(ks.GetRecordSize(cur->val)); + cur = cur->next; + } + } + if (stHash) + cur_off += static_cast<typename KeySaver::TSizeType>(sb[i + 1] - sb[i]); + } + stream->Write(&cur_off, sizeof(cur_off)); // end mark + for (i = 0; i < buckets.size(); ++i) { + node* cur = buckets[i]; + if (cur) { + while (!((uintptr_t)cur & 1)) { + ks.SaveRecord(stream, cur->val); + cur = cur->next; + } + } + if (stHash) + stream->Write((const char*)stHash + sb[i], sb[i + 1] - sb[i]); + } + return 0; +} diff --git a/library/cpp/on_disk/st_hash/static_hash.h b/library/cpp/on_disk/st_hash/static_hash.h new file mode 100644 index 0000000000..ca7a6ccd36 --- /dev/null +++ b/library/cpp/on_disk/st_hash/static_hash.h @@ -0,0 +1,420 @@ +#pragma once + +#include "save_stl.h" +#include "sthash_iterators.h" + +#include <util/generic/hash.h> +#include <util/generic/vector.h> +#include <util/generic/buffer.h> +#include <util/generic/cast.h> +#include <util/generic/yexception.h> // for save/load only +#include <util/stream/file.h> +#include <util/stream/buffer.h> +#include <utility> + +#include <memory> +#include <algorithm> +#include <functional> + +#include <cstdlib> +#include <cstddef> + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4624) // 'destructor could not be generated because a base class destructor is inaccessible' +#endif + +template <class HashType, class KeySaver> +inline void SaveHashToStreamEx(HashType& hash, IOutputStream* stream) { + KeySaver ks; + if (hash.save_for_st(stream, ks)) + ythrow yexception() << "Could not save hash to stream"; +} + +template <class HashType> +inline void SaveHashToStream(HashType& hash, IOutputStream* stream) { + typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, ui64> KeySaver; + return SaveHashToStreamEx<HashType, KeySaver>(hash, stream); +} + +template <class HashType, class KeySaver> +inline void SaveHashToFileEx(HashType& hash, const char* fileName) { + TFileOutput output(fileName); + SaveHashToStreamEx<HashType, KeySaver>(hash, &output); +} + +template <class HashType> +inline void SaveHashToFile(HashType& hash, const char* fileName) { + typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, ui64> KeySaver; + return SaveHashToFileEx<HashType, KeySaver>(hash, fileName); +} + +template <class HashType> +inline void SaveHashSetToFile(HashType& hash, const char* fileName) { + typedef TSthashSetWriter<typename HashType::key_type, ui64> KeySaver; + return SaveHashToFileEx<HashType, KeySaver>(hash, fileName); +} + +template <class HashType> +inline void SaveHashToFile32(HashType& hash, const char* fileName) { + typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, ui32> KeySaver; + return SaveHashToFileEx<HashType, KeySaver>(hash, fileName); +} + +template <class HashType, class KeySaver> +inline void SaveHashToBufferEx(HashType& hash, TBuffer& buffer, sthash<int, int, THash<int>, TEqualTo<int>, typename KeySaver::TSizeType>* stHash = nullptr) { + TBufferOutput stream(buffer); + KeySaver ks; + if (hash.save_for_st(&stream, ks, stHash)) + ythrow yexception() << "Could not save hash to memory"; +} + +template <class HashType> +inline void SaveHashToBuffer(HashType& hash, TBuffer& buffer) { + typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, ui64> KeySaver; + SaveHashToBufferEx<HashType, KeySaver>(hash, buffer); +} + +/** + * Some hack to save both THashMap and sthash. + * THashMap and sthash must have same bucket_count(). + */ +template <class HashType, class StHashType> +inline void SaveHashToBuffer(HashType& hash, TBuffer& buffer, StHashType* stHash) { + typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, ui64> KeySaver; + typedef sthash<int, int, THash<int>, TEqualTo<int>, typename KeySaver::TSizeType>* SH; + + SH sh = reinterpret_cast<SH>(stHash); + SaveHashToBufferEx<HashType, KeySaver>(hash, buffer, sh); +} + +template <class HashType> +inline void SaveHashToBuffer32(HashType& hash, TBuffer& buffer) { + typedef TSthashWriter<typename HashType::key_type, typename HashType::mapped_type, ui32> KeySaver; + SaveHashToBufferEx<HashType, KeySaver>(hash, buffer); +} + +template <class Iter, typename size_type_f = ui64> +class sthashtable { +public: + typedef typename Iter::TKeyType key_type; + typedef typename Iter::TValueType value_type; + typedef typename Iter::THasherType hasher; + typedef typename Iter::TKeyEqualType key_equal; + + typedef size_type_f size_type; + typedef ptrdiff_t difference_type; + typedef const value_type* const_pointer; + typedef const value_type& const_reference; + + typedef Iter const_iterator; + + const hasher hash_funct() const { + return hash; + } + const key_equal key_eq() const { + return equals; + } + +private: + const hasher hash; + const key_equal equals; + +private: + const_iterator iter_at_bucket(size_type bucket) const { + return (const_iterator)(((char*)this + buckets()[bucket])); + } + + const_iterator iter_at_bucket_or_end(size_type bucket) const { + if (bucket < num_buckets) + return (const_iterator)(((char*)this + buckets()[bucket])); + else + return end(); + } + + const size_type num_buckets; + const size_type num_elements; + const size_type data_end_off; + +protected: //shut up gcc warning + // we can't construct/destroy this object at all! + sthashtable(); + sthashtable(const sthashtable& ht); + ~sthashtable(); + +public: + // const size_type *buckets; + const size_type* buckets() const { + return (size_type*)((char*)this + sizeof(*this)); + } + const size_type buckets(size_type n) const { + return buckets()[n]; + } + + size_type size() const { + return num_elements; + } + size_type max_size() const { + return size_type(-1); + } + bool empty() const { + return size() == 0; + } + + const_iterator begin() const { + return num_buckets ? iter_at_bucket(0) : end(); + } + + const_iterator end() const { + return (const_iterator)(((char*)this + data_end_off)); + } + +public: + size_type size_in_bytes() const { + return data_end_off; + } + + size_type bucket_count() const { + return num_buckets; + } + + size_type elems_in_bucket(size_type bucket) const { + size_type result = 0; + const_iterator first = iter_at_bucket(bucket); + const_iterator last = iter_at_bucket_or_end(bucket + 1); + + for (; first != last; ++first) + ++result; + return result; + } + + template <class TheKey> + const_iterator find(const TheKey& key) const { + size_type n = bkt_num_key(key); + const_iterator first(iter_at_bucket(n)), last(iter_at_bucket_or_end(n + 1)); + for (; + first != last && !first.KeyEquals(equals, key); + ++first) { + } + if (first != last) + return first; + return end(); + } + + size_type count(const key_type& key) const { + const size_type n = bkt_num_key(key); + size_type result = 0; + const_iterator first = iter_at_bucket(n); + const_iterator last = iter_at_bucket_or_end(n + 1); + + for (; first != last; ++first) + if (first.KeyEquals(equals, key)) + ++result; + return result; + } + + std::pair<const_iterator, const_iterator> equal_range(const key_type& key) const; + +private: + template <class TheKey> + size_type bkt_num_key(const TheKey& key) const { + return hash(key) % num_buckets; + } +}; + +template <class I, class size_type_f> +std::pair<I, I> sthashtable<I, size_type_f>::equal_range(const key_type& key) const { + typedef std::pair<const_iterator, const_iterator> pii; + const size_type n = bkt_num_key(key); + const_iterator first = iter_at_bucket(n); + const_iterator last = iter_at_bucket_or_end(n + 1); + + for (; first != last; ++first) { + if (first.KeyEquals(equals, key)) { + const_iterator cur = first; + ++cur; + for (; cur != last; ++cur) + if (!cur.KeyEquals(equals, key)) + return pii(const_iterator(first), + const_iterator(cur)); + return pii(const_iterator(first), + const_iterator(last)); + } + } + return pii(end(), end()); +} + +/* end __SGI_STL_HASHTABLE_H */ + +template <class Key, class T, class HashFcn /*= hash<Key>*/, + class EqualKey = TEqualTo<Key>, typename size_type_f = ui64> +class sthash { +private: + typedef sthashtable<TSthashIterator<const Key, const T, HashFcn, EqualKey>, size_type_f> ht; + ht rep; + +public: + typedef typename ht::key_type key_type; + typedef typename ht::value_type value_type; + typedef typename ht::hasher hasher; + typedef typename ht::key_equal key_equal; + typedef T mapped_type; + + typedef typename ht::size_type size_type; + typedef typename ht::difference_type difference_type; + typedef typename ht::const_pointer const_pointer; + typedef typename ht::const_reference const_reference; + + typedef typename ht::const_iterator const_iterator; + + const hasher hash_funct() const { + return rep.hash_funct(); + } + const key_equal key_eq() const { + return rep.key_eq(); + } + +public: + size_type size() const { + return rep.size(); + } + size_type max_size() const { + return rep.max_size(); + } + bool empty() const { + return rep.empty(); + } + + const_iterator begin() const { + return rep.begin(); + } + const_iterator end() const { + return rep.end(); + } + +public: + template <class TheKey> + const_iterator find(const TheKey& key) const { + return rep.find(key); + } + template <class TheKey> + bool has(const TheKey& key) const { + return rep.find(key) != rep.end(); + } + + size_type count(const key_type& key) const { + return rep.count(key); + } + + std::pair<const_iterator, const_iterator> equal_range(const key_type& key) const { + return rep.equal_range(key); + } + + size_type size_in_bytes() const { + return rep.size_in_bytes(); + } + + size_type bucket_count() const { + return rep.bucket_count(); + } + size_type max_bucket_count() const { + return rep.max_bucket_count(); + } + size_type elems_in_bucket(size_type n) const { + return rep.elems_in_bucket(n); + } + + const size_type* buckets() const { + return rep.buckets(); + } + const size_type buckets(size_type n) const { + return rep.buckets()[n]; + } +}; + +template <class Key, class HashFcn, + class EqualKey = TEqualTo<Key>, typename size_type_f = ui64> +class sthash_set: public sthash<Key, TEmptyValue, HashFcn, EqualKey, size_type_f> { + typedef sthash<Key, TEmptyValue, HashFcn, EqualKey, size_type_f> Base; + +public: + using Base::const_iterator; + using Base::hasher; + using Base::key_equal; + using Base::key_type; + using Base::size_type; + using Base::value_type; +}; + +template <class Key, class T, class HashFcn /*= hash<Key>*/, + class EqualKey = TEqualTo<Key>, typename size_type_f = ui64> +class sthash_mm { +private: + typedef sthashtable<TSthashIterator<const Key, T, HashFcn, EqualKey>, size_type_f> ht; + ht rep; + +public: + typedef typename ht::key_type key_type; + typedef typename ht::value_type value_type; + typedef typename ht::hasher hasher; + typedef typename ht::key_equal key_equal; + typedef T mapped_type; + + typedef typename ht::size_type size_type; + typedef typename ht::difference_type difference_type; + typedef typename ht::const_pointer const_pointer; + typedef typename ht::const_reference const_reference; + + typedef typename ht::const_iterator const_iterator; + + const hasher hash_funct() const { + return rep.hash_funct(); + } + const key_equal key_eq() const { + return rep.key_eq(); + } + +public: + size_type size() const { + return rep.size(); + } + size_type max_size() const { + return rep.max_size(); + } + bool empty() const { + return rep.empty(); + } + + const_iterator begin() const { + return rep.begin(); + } + const_iterator end() const { + return rep.end(); + } + + const_iterator find(const key_type& key) const { + return rep.find(key); + } + + size_type count(const key_type& key) const { + return rep.count(key); + } + + std::pair<const_iterator, const_iterator> equal_range(const key_type& key) const { + return rep.equal_range(key); + } + + size_type bucket_count() const { + return rep.bucket_count(); + } + size_type max_bucket_count() const { + return rep.max_bucket_count(); + } + size_type elems_in_bucket(size_type n) const { + return rep.elems_in_bucket(n); + } +}; + +#ifdef _MSC_VER +#pragma warning(pop) +#endif diff --git a/library/cpp/on_disk/st_hash/static_hash_map.h b/library/cpp/on_disk/st_hash/static_hash_map.h new file mode 100644 index 0000000000..5dc50abd39 --- /dev/null +++ b/library/cpp/on_disk/st_hash/static_hash_map.h @@ -0,0 +1,59 @@ +#pragma once + +#include "static_hash.h" + +#include <library/cpp/deprecated/mapped_file/mapped_file.h> + +#include <util/system/filemap.h> + +template <class SH> +struct sthash_mapped_c { + typedef SH H; + typedef typename H::const_iterator const_iterator; + TMappedFile M; + H* hsh; + sthash_mapped_c() + : M() + , hsh(nullptr) + { + } + sthash_mapped_c(const char* fname, bool precharge) + : M() + , hsh(nullptr) + { + Open(fname, precharge); + } + void Open(const char* fname, bool precharge) { + M.init(fname); + if (precharge) + M.precharge(); + hsh = (H*)M.getData(); + if (M.getSize() < sizeof(H) || (ssize_t)M.getSize() != hsh->end().Data - (char*)hsh) + ythrow yexception() << "Could not map hash: " << fname << " is damaged"; + } + H* operator->() { + return hsh; + } + const H* operator->() const { + return hsh; + } + H* GetSthash() { + return hsh; + } + const H* GetSthash() const { + return hsh; + } +}; + +template <class Key, class T, class Hash> +struct sthash_mapped: public sthash_mapped_c<sthash<Key, T, Hash>> { + typedef sthash<Key, T, Hash> H; + sthash_mapped(const char* fname, bool precharge) + : sthash_mapped_c<H>(fname, precharge) + { + } + sthash_mapped() + : sthash_mapped_c<H>() + { + } +}; diff --git a/library/cpp/on_disk/st_hash/sthash_iterators.h b/library/cpp/on_disk/st_hash/sthash_iterators.h new file mode 100644 index 0000000000..6a9ebdd6c3 --- /dev/null +++ b/library/cpp/on_disk/st_hash/sthash_iterators.h @@ -0,0 +1,334 @@ +#pragma once + +#include "save_stl.h" + +#include <util/system/align.h> + +/** + This file provides functionality for saving some relatively simple THashMap object + to disk in a form that can be mapped read-only (via mmap) at any address. + That saved object is accessed via pointer to sthash object (that must have + the same parameters as original THashMap object) + + If either key or value are variable-sized (i.e. contain pointers), user must + write his own instantiation of TSthashIterator (read iterator for sthash) and + TSthashWriter (write iterator for THashMap). + An example for <const char *, B> pair is in here. +**/ + +// TEmptyValue and SizeOfEx are helpers for sthash_set +struct TEmptyValue { + TEmptyValue() = default; +}; + +template <class T> +inline size_t SizeOfEx() { + return sizeof(T); +} + +template <> +inline size_t SizeOfEx<TEmptyValue>() { + return 0; +} +template <> +inline size_t SizeOfEx<const TEmptyValue>() { + return 0; +} + +template <class TKey, class TValue, class HashFcn, class EqualKey> +struct TSthashIterator { + // Implementation for simple types + typedef const TKey TKeyType; + typedef const TValue TValueType; + typedef EqualKey TKeyEqualType; + typedef HashFcn THasherType; + + const char* Data; + TSthashIterator() + : Data(nullptr) + { + } + explicit TSthashIterator(const char* data) + : Data(data) + { + } + void operator++() { + Data += GetLength(); + } + + bool operator!=(const TSthashIterator& that) const { + return Data != that.Data; + } + bool operator==(const TSthashIterator& that) const { + return Data == that.Data; + } + TKey& Key() const { + return *(TKey*)Data; + } + TValue& Value() { + return *(TValue*)(Data + sizeof(TKey)); + } + const TValue& Value() const { + return *(const TValue*)(Data + sizeof(TKey)); + } + + template <class AnotherKeyType> + bool KeyEquals(const EqualKey& eq, const AnotherKeyType& key) const { + return eq(*(TKey*)Data, key); + } + + size_t GetLength() const { + return sizeof(TKey) + SizeOfEx<TValue>(); + } +}; + +template <class Key, class Value, typename size_type_o = ui64> +struct TSthashWriter { + typedef size_type_o TSizeType; + size_t GetRecordSize(const std::pair<const Key, const Value>&) const { + return sizeof(Key) + SizeOfEx<Value>(); + } + int SaveRecord(IOutputStream* stream, const std::pair<const Key, const Value>& record) const { + stream->Write(&record.first, sizeof(Key)); + stream->Write(&record.second, SizeOfEx<Value>()); + return 0; + } +}; + +// Remember that this simplified implementation makes a copy of `key' in std::make_pair. +// It can also waste some memory on undesired alignment. +template <class Key, typename size_type_o = ui64> +struct TSthashSetWriter: public TSthashWriter<Key, TEmptyValue, size_type_o> { + typedef TSthashWriter<Key, TEmptyValue, size_type_o> MapWriter; + size_t GetRecordSize(const Key& key) const { + return MapWriter::GetRecordSize(std::make_pair(key, TEmptyValue())); + } + int SaveRecord(IOutputStream* stream, const Key& key) const { + return MapWriter::SaveRecord(stream, std::make_pair(key, TEmptyValue())); + } +}; + +// we can't save something with pointers without additional tricks + +template <class A, class B, class HashFcn, class EqualKey> +struct TSthashIterator<A*, B, HashFcn, EqualKey> {}; + +template <class A, class B, class HashFcn, class EqualKey> +struct TSthashIterator<A, B*, HashFcn, EqualKey> {}; + +template <class A, class B, typename size_type_o> +struct TSthashWriter<A*, B*, size_type_o> {}; + +template <class A, class B, typename size_type_o> +struct TSthashWriter<A*, B, size_type_o> {}; + +template <class A, class B, typename size_type_o> +struct TSthashWriter<A, B*, size_type_o> {}; + +template <class T> +inline size_t AlignForChrKey() { + return 4; // TODO: change this (requeres rebuilt of a few existing files) +} + +template <> +inline size_t AlignForChrKey<TEmptyValue>() { + return 1; +} + +template <> +inline size_t AlignForChrKey<const TEmptyValue>() { + return AlignForChrKey<TEmptyValue>(); +} + +// !! note that for char*, physical placement of key and value is swapped +template <class TValue, class HashFcn, class EqualKey> +struct TSthashIterator<const char* const, TValue, HashFcn, EqualKey> { + typedef const TValue TValueType; + typedef const char* TKeyType; + typedef EqualKey TKeyEqualType; + typedef HashFcn THasherType; + + const char* Data; + TSthashIterator() + : Data(nullptr) + { + } + TSthashIterator(const char* data) + : Data(data) + { + } + void operator++() { + Data += GetLength(); + } + + bool operator!=(const TSthashIterator& that) const { + return Data != that.Data; + } + bool operator==(const TSthashIterator& that) const { + return Data == that.Data; + } + const char* Key() const { + return Data + SizeOfEx<TValue>(); + } + TValue& Value() { + return *(TValue*)Data; + } + const TValue& Value() const { + return *(const TValue*)Data; + } + + template <class K> + bool KeyEquals(const EqualKey& eq, const K& k) const { + return eq(Data + SizeOfEx<TValue>(), k); + } + + size_t GetLength() const { + size_t length = strlen(Data + SizeOfEx<TValue>()) + 1 + SizeOfEx<TValue>(); + length = AlignUp(length, AlignForChrKey<TValue>()); + return length; + } +}; + +template <class Value, typename size_type_o> +struct TSthashWriter<const char*, Value, size_type_o> { + typedef size_type_o TSizeType; + size_t GetRecordSize(const std::pair<const char*, const Value>& record) const { + size_t length = strlen(record.first) + 1 + SizeOfEx<Value>(); + length = AlignUp(length, AlignForChrKey<Value>()); + return length; + } + int SaveRecord(IOutputStream* stream, const std::pair<const char*, const Value>& record) const { + const char* alignBuffer = "qqqq"; + stream->Write(&record.second, SizeOfEx<Value>()); + size_t length = strlen(record.first) + 1; + stream->Write(record.first, length); + length = AlignUpSpace(length, AlignForChrKey<Value>()); + if (length) + stream->Write(alignBuffer, length); + return 0; + } +}; + +template <class TKey, class HashFcn, class EqualKey> +struct TSthashIterator<TKey, const char* const, HashFcn, EqualKey> { + typedef const TKey TKeyType; + typedef const char* TValueType; + typedef EqualKey TKeyEqualType; + typedef HashFcn THasherType; + + const char* Data; + TSthashIterator() + : Data(nullptr) + { + } + TSthashIterator(const char* data) + : Data(data) + { + } + void operator++() { + Data += GetLength(); + } + + bool operator!=(const TSthashIterator& that) const { + return Data != that.Data; + } + bool operator==(const TSthashIterator& that) const { + return Data == that.Data; + } + TKey& Key() { + return *(TKey*)Data; + } + const char* Value() const { + return Data + sizeof(TKey); + } + + template <class K> + bool KeyEquals(const EqualKey& eq, const K& k) const { + return eq(*(TKey*)Data, k); + } + + size_t GetLength() const { + size_t length = strlen(Data + sizeof(TKey)) + 1 + sizeof(TKey); + length = AlignUp(length, (size_t)4); + return length; + } +}; + +template <class Key, typename size_type_o> +struct TSthashWriter<Key, const char*, size_type_o> { + typedef size_type_o TSizeType; + size_t GetRecordSize(const std::pair<const Key, const char*>& record) const { + size_t length = strlen(record.second) + 1 + sizeof(Key); + length = AlignUp(length, (size_t)4); + return length; + } + int SaveRecord(IOutputStream* stream, const std::pair<const Key, const char*>& record) const { + const char* alignBuffer = "qqqq"; + stream->Write(&record.first, sizeof(Key)); + size_t length = strlen(record.second) + 1; + stream->Write(record.second, length); + length = AlignUpSpace(length, (size_t)4); + if (length) + stream->Write(alignBuffer, length); + return 0; + } +}; + +template <class HashFcn, class EqualKey> +struct TSthashIterator<const char* const, const char* const, HashFcn, EqualKey> { + typedef const char* TKeyType; + typedef const char* TValueType; + typedef EqualKey TKeyEqualType; + typedef HashFcn THasherType; + + const char* Data; + TSthashIterator() + : Data(nullptr) + { + } + TSthashIterator(const char* data) + : Data(data) + { + } + void operator++() { + Data += GetLength(); + } + + bool operator!=(const TSthashIterator& that) const { + return Data != that.Data; + } + bool operator==(const TSthashIterator& that) const { + return Data == that.Data; + } + const char* Key() const { + return Data; + } + const char* Value() const { + return Data + strlen(Data) + 1; + } + + template <class K> + bool KeyEquals(const EqualKey& eq, const K& k) const { + return eq(Data, k); + } + + size_t GetLength() const { + size_t length = strlen(Data) + 1; + length += strlen(Data + length) + 1; + return length; + } +}; + +template <typename size_type_o> +struct TSthashWriter<const char*, const char*, size_type_o> { + typedef size_type_o TSizeType; + size_t GetRecordSize(const std::pair<const char*, const char*>& record) const { + size_t size = strlen(record.first) + strlen(record.second) + 2; + return size; + } + int SaveRecord(IOutputStream* stream, const std::pair<const char*, const char*>& record) const { + stream->Write(record.first, strlen(record.first) + 1); + stream->Write(record.second, strlen(record.second) + 1); + return 0; + } +}; diff --git a/library/cpp/on_disk/st_hash/ya.make b/library/cpp/on_disk/st_hash/ya.make new file mode 100644 index 0000000000..8c6d05711c --- /dev/null +++ b/library/cpp/on_disk/st_hash/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + fake.cpp + save_stl.h + static_hash.h + static_hash_map.h + sthash_iterators.h +) + +PEERDIR( + library/cpp/deprecated/mapped_file +) + +END() diff --git a/library/cpp/remmap/remmap.cpp b/library/cpp/remmap/remmap.cpp new file mode 100644 index 0000000000..ce72af7352 --- /dev/null +++ b/library/cpp/remmap/remmap.cpp @@ -0,0 +1,138 @@ +#include <util/system/info.h> +#include <util/system/defaults.h> + +#if defined(_win_) +#include <util/system/winint.h> +#elif defined(_unix_) +#include <sys/types.h> +#include <sys/mman.h> + +#ifndef MAP_NOCORE +#define MAP_NOCORE 0 +#endif +#else +#error todo +#endif + +#include "remmap.h" + +static const size_t REMMAP_PAGESIZE = NSystemInfo::GetPageSize(); + +#if defined(_unix_) +TRemmapAllocation::TRemmapAllocation() + : Ptr_(nullptr) + , Size_(0) +{ +} + +TRemmapAllocation::TRemmapAllocation(size_t size, char* base) + : Ptr_(nullptr) + , Size_(0) +{ + Alloc(size, base); +} + +char* TRemmapAllocation::Alloc(size_t size, char* base) { + assert(Ptr_ == nullptr); + + if (!size) + return nullptr; + + const size_t HUGESIZE = size_t(16) << 30; + Ptr_ = CommonMMap(HUGESIZE, base); + + if (Ptr_ != (char*)MAP_FAILED) + munmap((void*)Ptr_, HUGESIZE); + else + Ptr_ = nullptr; + + Ptr_ = CommonMMap(AlignUp(size, REMMAP_PAGESIZE), Ptr_); + if (Ptr_ == (char*)MAP_FAILED) + Ptr_ = nullptr; + + Size_ = Ptr_ ? size : 0; + return Ptr_; +} + +char* TRemmapAllocation::Realloc(size_t newsize) { + if (Ptr_ == nullptr) + return Alloc(newsize); + + size_t realSize = AlignUp(Size_, REMMAP_PAGESIZE); + size_t needSize = AlignUp(newsize, REMMAP_PAGESIZE); + + if (needSize > realSize) { + char* part = Ptr_ + realSize; + char* bunch = CommonMMap(needSize - realSize, part); + if (bunch != (char*)MAP_FAILED && bunch != part) + munmap(bunch, needSize - realSize); + if (bunch == (char*)MAP_FAILED || bunch != part) + return FullRealloc(newsize); + } else if (needSize < realSize) + munmap(Ptr_ + needSize, realSize - needSize); + + if ((Size_ = newsize) == 0) + Ptr_ = nullptr; + + return Ptr_; +} + +void TRemmapAllocation::Dealloc() { + if (Ptr_ != nullptr) + munmap(Ptr_, AlignUp(Size_, REMMAP_PAGESIZE)); + Ptr_ = nullptr; + Size_ = 0; +} + +char* TRemmapAllocation::FullRealloc(size_t newsize) { + char* newPtr = CommonMMap(newsize); + Y_ABORT_UNLESS(newPtr != MAP_FAILED, "mmap failed"); + + size_t useful = Min(Size_, newsize), cur = 0; + + for (; cur + REMMAP_PAGESIZE < useful; cur += REMMAP_PAGESIZE) { + memcpy((void*)&newPtr[cur], (void*)&Ptr_[cur], REMMAP_PAGESIZE); + munmap((void*)&Ptr_[cur], REMMAP_PAGESIZE); + } + + memcpy((void*)&newPtr[cur], (void*)&Ptr_[cur], useful - cur); + munmap((void*)&Ptr_[cur], AlignUp(Size_ - cur, REMMAP_PAGESIZE)); + + Size_ = newsize; + return (Ptr_ = newPtr); +} + +inline char* TRemmapAllocation::CommonMMap(size_t size, char* base) { + return (char*)mmap((void*)base, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANON, -1, 0); +} + +#else +TRemmapAllocation::TRemmapAllocation() + : Allocation_(0, false, NULL) +{ +} + +TRemmapAllocation::TRemmapAllocation(size_t size, char* base) + : Allocation_(size, false, (void*)base) +{ +} + +char* TRemmapAllocation::Alloc(size_t size, char* base) { + return (char*)Allocation_.Alloc(size, (void*)base); +} + +char* TRemmapAllocation::Realloc(size_t newsize) { + return FullRealloc(newsize); +} + +void TRemmapAllocation::Dealloc() { + Allocation_.Dealloc(); +} + +char* TRemmapAllocation::FullRealloc(size_t newsize) { + TMappedAllocation other(newsize); + memcpy(other.Ptr(), Allocation_.Ptr(), Min(other.MappedSize(), Allocation_.MappedSize())); + Allocation_.swap(other); + return Data(); +} +#endif diff --git a/library/cpp/remmap/remmap.h b/library/cpp/remmap/remmap.h new file mode 100644 index 0000000000..7cb738f7ae --- /dev/null +++ b/library/cpp/remmap/remmap.h @@ -0,0 +1,64 @@ +#pragma once + +#include <util/system/yassert.h> +#include <util/system/align.h> +#include <util/system/info.h> +#include <util/system/filemap.h> +#include <util/memory/alloc.h> +#include <util/generic/noncopyable.h> + +class TRemmapAllocation : TNonCopyable { +public: + TRemmapAllocation(); + TRemmapAllocation(size_t size, char* base = nullptr); + + ~TRemmapAllocation() { + Dealloc(); + } + + char* Alloc(size_t size, char* base = nullptr); + char* Realloc(size_t newsize); + void Dealloc(); + char* FullRealloc(size_t newsize); + +#if defined(_unix_) +private: + inline char* CommonMMap(size_t size, char* base = nullptr); + + char* Ptr_; + size_t Size_; + +public: + inline void* Ptr() const { + return (void*)Ptr_; + } + inline char* Data(ui32 pos = 0) const { + return Ptr_ + pos; + } + inline size_t Size() const { + return Size_; + } + inline void swap(TRemmapAllocation& other) { + DoSwap(Ptr_, other.Ptr_); + DoSwap(Size_, other.Size_); + } + +#else +private: + TMappedAllocation Allocation_; + +public: + inline void* Ptr() const { + return Allocation_.Ptr(); + } + inline char* Data(ui32 pos = 0) const { + return Allocation_.Data(pos); + } + inline size_t Size() const { + return Allocation_.MappedSize(); + } + inline void swap(TRemmapAllocation& other) { + Allocation_.swap(other.Allocation_); + } +#endif +}; diff --git a/library/cpp/remmap/ya.make b/library/cpp/remmap/ya.make new file mode 100644 index 0000000000..281df6443a --- /dev/null +++ b/library/cpp/remmap/ya.make @@ -0,0 +1,7 @@ +LIBRARY() + +SRCS( + remmap.cpp +) + +END() diff --git a/library/cpp/sqlite3/sqlite.cpp b/library/cpp/sqlite3/sqlite.cpp new file mode 100644 index 0000000000..98e498f76b --- /dev/null +++ b/library/cpp/sqlite3/sqlite.cpp @@ -0,0 +1,288 @@ +#include "sqlite.h" + +#include <util/generic/singleton.h> +#include <util/generic/scope.h> + +#include <cstdlib> + +using namespace NSQLite; + +namespace { + struct TSQLiteInit { + inline TSQLiteInit() { + int ret = sqlite3_config(SQLITE_CONFIG_MULTITHREAD); + + if (ret != SQLITE_OK) { + ythrow TSQLiteError(ret) << "init failure"; + } + } + + static inline void Ensure() { + Singleton<TSQLiteInit>(); + } + }; +} + +namespace NSQLite { + TSQLiteError::TSQLiteError(sqlite3* hndl) + : ErrorCode(sqlite3_errcode(hndl)) + { + *this << sqlite3_errmsg(hndl) << ". "; + } + + TSQLiteError::TSQLiteError(int rc) + : ErrorCode(rc) + { + *this << sqlite3_errstr(rc) << " (" << rc << "). "; + } + + TSQLiteDB::TSQLiteDB(const TString& path) { + TSQLiteInit::Ensure(); + + sqlite3* db = nullptr; + const int rc = sqlite3_open(path.data(), &db); + + H_.Reset(db); + + if (rc) { + ythrow TSQLiteError(Handle()) << "can not init db " << path.Quote(); + } + } + + TSQLiteDB::TSQLiteDB(const TString& path, int flags) { + TSQLiteInit::Ensure(); + + sqlite3* db = nullptr; + const int rc = sqlite3_open_v2(path.data(), &db, flags, nullptr); + + H_.Reset(db); + + if (rc) { + ythrow TSQLiteError(Handle()) << "can not init db " << path.Quote(); + } + } + + sqlite3* TSQLiteDB::Handle() const noexcept { + return H_.Get(); + } + + size_t TSQLiteDB::RowsAffected() const noexcept { + return static_cast<size_t>(sqlite3_changes(H_.Get())); + } + + TSQLiteStatement::TSQLiteStatement(TSQLiteDB& db, const TString& s) + : S_(s) + { + if (!S_.empty() && S_[S_.size() - 1] != ';') { + S_ += ';'; + } + + sqlite3_stmt* st = nullptr; + const char* tail = nullptr; + const int rc = sqlite3_prepare_v2(db.Handle(), S_.data(), S_.size() + 1, &st, &tail); + + H_.Reset(st); + + if (rc != SQLITE_OK) { + ythrow TSQLiteError(db.Handle()) << "can not prepare " << S_.Quote(); + } + } + + void TSQLiteStatement::Execute() { + while (Step()) { + } + + Reset(); + } + + TSQLiteStatement& TSQLiteStatement::Bind(size_t idx, i64 val) { + sqlite3_bind_int64(Handle(), idx, val); + return *this; + } + + TSQLiteStatement& TSQLiteStatement::Bind(size_t idx, int val) { + sqlite3_bind_int(Handle(), idx, val); + return *this; + } + + TSQLiteStatement& TSQLiteStatement::Bind(size_t idx) { + sqlite3_bind_null(Handle(), idx); + return *this; + } + + TSQLiteStatement& TSQLiteStatement::Bind(size_t idx, double val) { + sqlite3_bind_double(Handle(), idx, val); + return *this; + } + + void TSQLiteStatement::BindText(size_t idx, const char* text, size_t len, TFreeFunc func) { + sqlite3_bind_text(Handle(), idx, text, len, func); + } + + TSQLiteStatement& TSQLiteStatement::Bind(size_t idx, TStringBuf str) { + BindText(idx, str.data(), str.size(), SQLITE_STATIC); + return *this; + } + + TSQLiteStatement& TSQLiteStatement::BindBlob(size_t idx, TStringBuf blob) { + sqlite3_bind_blob(Handle(), idx, blob.data(), blob.size(), SQLITE_STATIC); + return *this; + } + + size_t TSQLiteStatement::BoundNamePosition(TStringBuf name) const noexcept { + return sqlite3_bind_parameter_index(Handle(), name.data()); + } + + size_t TSQLiteStatement::BoundParameterCount() const noexcept { + return sqlite3_bind_parameter_count(Handle()); + } + + const char* TSQLiteStatement::BoundParameterName(size_t idx) const noexcept { + return sqlite3_bind_parameter_name(Handle(), idx); + } + + sqlite3_stmt* TSQLiteStatement::Handle() const noexcept { + return H_.Get(); + } + + bool TSQLiteStatement::Step() { + const int rc = sqlite3_step(Handle()); + + switch (rc) { + case SQLITE_ROW: + return true; + + case SQLITE_DONE: + return false; + + default: + break; + } + + char* stmt = rc == SQLITE_CONSTRAINT ? sqlite3_expanded_sql(Handle()) : nullptr; + Y_DEFER { + if (stmt != nullptr) { + sqlite3_free(reinterpret_cast<void*>(stmt)); + stmt = nullptr; + } + }; + if (stmt != nullptr) { + ythrow TSQLiteError(rc) << "step failed: " << stmt; + } else { + ythrow TSQLiteError(rc) << "step failed"; + } + } + + i64 TSQLiteStatement::ColumnInt64(size_t idx) { + return sqlite3_column_int64(Handle(), idx); + } + + double TSQLiteStatement::ColumnDouble(size_t idx) { + return sqlite3_column_double(Handle(), idx); + } + + TStringBuf TSQLiteStatement::ColumnText(size_t idx) { + return reinterpret_cast<const char*>(sqlite3_column_text(Handle(), idx)); + } + + TStringBuf TSQLiteStatement::ColumnBlob(size_t idx) { + const void* blob = sqlite3_column_blob(Handle(), idx); + size_t size = sqlite3_column_bytes(Handle(), idx); + return TStringBuf(static_cast<const char*>(blob), size); + } + + void TSQLiteStatement::ColumnAccept(size_t idx, ISQLiteColumnVisitor& visitor) { + const auto columnType = sqlite3_column_type(Handle(), idx); + switch (columnType) { + case SQLITE_INTEGER: + visitor.OnColumnInt64(ColumnInt64(idx)); + break; + case SQLITE_FLOAT: + visitor.OnColumnDouble(ColumnDouble(idx)); + break; + case SQLITE_TEXT: + visitor.OnColumnText(ColumnText(idx)); + break; + case SQLITE_BLOB: + visitor.OnColumnBlob(ColumnBlob(idx)); + break; + case SQLITE_NULL: + visitor.OnColumnNull(); + break; + } + } + + size_t TSQLiteStatement::ColumnCount() const noexcept { + return static_cast<size_t>(sqlite3_column_count(Handle())); + } + + TStringBuf TSQLiteStatement::ColumnName(size_t idx) const noexcept { + return sqlite3_column_name(Handle(), idx); + } + + void TSQLiteStatement::Reset() { + const int rc = sqlite3_reset(Handle()); + + if (rc != SQLITE_OK) { + ythrow TSQLiteError(rc) << "reset failed"; + } + } + + void TSQLiteStatement::ResetHard() { + (void)sqlite3_reset(Handle()); + } + + void TSQLiteStatement::ClearBindings() noexcept { + // No error is documented. + // sqlite3.c's code always returns SQLITE_OK. + (void)sqlite3_clear_bindings(Handle()); + } + + TSQLiteTransaction::TSQLiteTransaction(TSQLiteDB& db) + : Db(&db) + { + Execute("BEGIN TRANSACTION"); + } + + TSQLiteTransaction::~TSQLiteTransaction() { + if (Db) { + Rollback(); + } + } + + void TSQLiteTransaction::Commit() { + Execute("COMMIT TRANSACTION"); + Db = nullptr; + } + + void TSQLiteTransaction::Rollback() { + Execute("ROLLBACK TRANSACTION"); + Db = nullptr; + } + + void TSQLiteTransaction::Execute(const TString& query) { + Y_ENSURE(Db, "Transaction is already ended"); + TSQLiteStatement st(*Db, query); + st.Execute(); + } + + TSimpleDB::TSimpleDB(const TString& path) + : TSQLiteDB(path) + , Start_(*this, "begin transaction") + , End_(*this, "end transaction") + { + } + + void TSimpleDB::Execute(const TString& statement) { + TSQLiteStatement(*this, statement).Execute(); + } + + void TSimpleDB::Acquire() { + Start_.Execute(); + } + + void TSimpleDB::Release() { + End_.Execute(); + } + +} diff --git a/library/cpp/sqlite3/sqlite.h b/library/cpp/sqlite3/sqlite.h new file mode 100644 index 0000000000..8b35e2606a --- /dev/null +++ b/library/cpp/sqlite3/sqlite.h @@ -0,0 +1,136 @@ +#pragma once + +#include <util/generic/yexception.h> +#include <util/generic/ptr.h> + +#include <contrib/libs/sqlite3/sqlite3.h> + +namespace NSQLite { + class TSQLiteError: public yexception { + public: + TSQLiteError(sqlite3* hndl); + TSQLiteError(int rc); + + int GetErrorCode() const { + return ErrorCode; + } + + private: + int ErrorCode; + }; + + template <class T, int (*Func)(T*)> + struct TCFree { + static void Destroy(T* t) { + Func(t); + } + }; + + class TSQLiteDB { + public: + TSQLiteDB(const TString& path, int flags); + TSQLiteDB(const TString& path); + + sqlite3* Handle() const noexcept; + size_t RowsAffected() const noexcept; + + private: + THolder<sqlite3, TCFree<sqlite3, sqlite3_close>> H_; + }; + + class ISQLiteColumnVisitor { + public: + virtual ~ISQLiteColumnVisitor() = default; + + virtual void OnColumnInt64(i64 value) = 0; + virtual void OnColumnDouble(double value) = 0; + virtual void OnColumnText(TStringBuf value) = 0; + virtual void OnColumnBlob(TStringBuf value) = 0; + virtual void OnColumnNull() = 0; + }; + + class TSQLiteStatement { + public: + TSQLiteStatement(TSQLiteDB& db, const TString& s); + + void Execute(); + TSQLiteStatement& Bind(size_t idx, i64 val); + TSQLiteStatement& Bind(size_t idx, int val); + TSQLiteStatement& Bind(size_t idx); + TSQLiteStatement& Bind(size_t idx, double val); + TSQLiteStatement& Bind(size_t idx, TStringBuf str); + TSQLiteStatement& BindBlob(size_t idx, TStringBuf blob); + template <typename Value> + TSQLiteStatement& Bind(TStringBuf name, Value val) { + size_t idx = BoundNamePosition(name); + Y_ASSERT(idx > 0); + return Bind(idx, val); + } + TSQLiteStatement& BindBlob(TStringBuf name, TStringBuf blob) { + size_t idx = BoundNamePosition(name); + Y_ASSERT(idx > 0); + return BindBlob(idx, blob); + } + TSQLiteStatement& Bind(TStringBuf name) { + size_t idx = BoundNamePosition(name); + Y_ASSERT(idx > 0); + return Bind(idx); + } + size_t BoundNamePosition(TStringBuf name) const noexcept; + size_t BoundParameterCount() const noexcept; + const char* BoundParameterName(size_t idx) const noexcept; + + sqlite3_stmt* Handle() const noexcept; + bool Step(); + i64 ColumnInt64(size_t idx); + double ColumnDouble(size_t idx); + TStringBuf ColumnText(size_t idx); + TStringBuf ColumnBlob(size_t idx); + void ColumnAccept(size_t idx, ISQLiteColumnVisitor& visitor); + size_t ColumnCount() const noexcept; + TStringBuf ColumnName(size_t idx) const noexcept; + void Reset(); + // Ignore last error on this statement + void ResetHard(); + void ClearBindings() noexcept; + + private: + typedef void (*TFreeFunc)(void*); + void BindText(size_t col, const char* text, size_t len, TFreeFunc func); + + private: + TString S_; + THolder<sqlite3_stmt, TCFree<sqlite3_stmt, sqlite3_finalize>> H_; + }; + + /** + * Forces user to commit transaction explicitly, to not get exception in destructor (with all consequences of it). + */ + class TSQLiteTransaction: private TNonCopyable { + private: + TSQLiteDB* Db; + + public: + TSQLiteTransaction(TSQLiteDB& db); + ~TSQLiteTransaction(); + + void Commit(); + void Rollback(); + + private: + void Execute(const TString& query); + }; + + class TSimpleDB: public TSQLiteDB { + public: + TSimpleDB(const TString& path); + + void Execute(const TString& statement); + void Acquire(); + void Release(); + + private: + TSQLiteStatement Start_; + TSQLiteStatement End_; + }; +} diff --git a/library/cpp/sqlite3/ya.make b/library/cpp/sqlite3/ya.make new file mode 100644 index 0000000000..15417e278d --- /dev/null +++ b/library/cpp/sqlite3/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + sqlite.cpp +) + +PEERDIR( + contrib/libs/sqlite3 +) + +END() + +RECURSE_FOR_TESTS(ut) diff --git a/library/cpp/streams/growing_file_input/growing_file_input.cpp b/library/cpp/streams/growing_file_input/growing_file_input.cpp new file mode 100644 index 0000000000..0bbfa5ade9 --- /dev/null +++ b/library/cpp/streams/growing_file_input/growing_file_input.cpp @@ -0,0 +1,40 @@ +#include "growing_file_input.h" + +#include <util/datetime/base.h> +#include <util/generic/yexception.h> + +TGrowingFileInput::TGrowingFileInput(const TString& path) + : File_(path, OpenExisting | RdOnly | Seq) +{ + if (!File_.IsOpen()) { + ythrow TIoException() << "file " << path << " not open"; + } + + File_.Seek(0, sEnd); +} + +TGrowingFileInput::TGrowingFileInput(const TFile& file) + : File_(file) +{ + if (!File_.IsOpen()) { + ythrow TIoException() << "file (" << file.GetName() << ") not open"; + } + + File_.Seek(0, sEnd); +} + +size_t TGrowingFileInput::DoRead(void* buf, size_t len) { + for (int sleepTime = 1;;) { + size_t rr = File_.Read(buf, len); + + if (rr != 0) { + return rr; + } + + NanoSleep((ui64)sleepTime * 1000000); + + if (sleepTime < 2000) { + sleepTime <<= 1; + } + } +} diff --git a/library/cpp/streams/growing_file_input/growing_file_input.h b/library/cpp/streams/growing_file_input/growing_file_input.h new file mode 100644 index 0000000000..9054a5f3da --- /dev/null +++ b/library/cpp/streams/growing_file_input/growing_file_input.h @@ -0,0 +1,23 @@ +#pragma once + +#include <util/stream/input.h> +#include <util/system/file.h> + +/** + * Growing file input stream. + * + * File descriptor offsets to the end of the file, when the object is created. + * + * Read function waites for reading at least one byte. + */ +class TGrowingFileInput: public IInputStream { +public: + TGrowingFileInput(const TFile& file); + TGrowingFileInput(const TString& path); + +private: + size_t DoRead(void* buf, size_t len) override; + +private: + TFile File_; +}; diff --git a/library/cpp/streams/growing_file_input/ya.make b/library/cpp/streams/growing_file_input/ya.make new file mode 100644 index 0000000000..69c56fea46 --- /dev/null +++ b/library/cpp/streams/growing_file_input/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + growing_file_input.cpp +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/library/cpp/string_utils/subst_buf/substbuf.cpp b/library/cpp/string_utils/subst_buf/substbuf.cpp new file mode 100644 index 0000000000..f23cb24b19 --- /dev/null +++ b/library/cpp/string_utils/subst_buf/substbuf.cpp @@ -0,0 +1 @@ +#include "substbuf.h" diff --git a/library/cpp/string_utils/subst_buf/substbuf.h b/library/cpp/string_utils/subst_buf/substbuf.h new file mode 100644 index 0000000000..357ee68ae3 --- /dev/null +++ b/library/cpp/string_utils/subst_buf/substbuf.h @@ -0,0 +1,63 @@ +#pragma once + +#include <util/generic/vector.h> +#include <util/generic/strbuf.h> +#include <util/string/subst.h> + +/// Заменяет в строке одни подстроки на другие. +template <class TBuf, class TPool> +size_t SubstGlobal(TBuf& s, const TBuf& from, const TBuf& to, TPool& pool) { + if (from.empty()) + return 0; + + TVector<size_t> offs; + for (size_t off = 0; (off = s.find(from, off)) != TBuf::npos; off += from.length()) + offs.push_back(off); + if (offs.empty()) + return 0; + + size_t dstSize = s.size() + ssize_t(offs.size()) * ssize_t(to.size() - from.size()); + const size_t charTypeSz = sizeof(typename TBuf::char_type); + typename TBuf::char_type* dst = (typename TBuf::char_type*)pool.Allocate((dstSize + 1) * charTypeSz); + dst[dstSize] = 0; + + typename TBuf::char_type* p = dst; + size_t lastSrc = 0; + for (auto off : offs) { + memcpy(p, s.data() + lastSrc, (off - lastSrc) * charTypeSz); + p += off - lastSrc; + lastSrc = off + from.size(); + memcpy(p, to.data(), to.size() * charTypeSz); + p += to.size(); + } + memcpy(p, s.data() + lastSrc, (s.size() - lastSrc) * charTypeSz); + p += s.size() - lastSrc; + Y_ASSERT(p - dst == (ssize_t)dstSize); + + s = TBuf(dst, dstSize); + return offs.size(); +} + +template <class TPool> +size_t SubstGlobal(TStringBuf& s, const TStringBuf& from, const TStringBuf& to, TPool& pool) { + return SubstGlobal<TStringBuf, TPool>(s, from, to, pool); +} + +/// Заменяет в строке одни подстроки на другие. +template <class TBuf, class TPool> +inline size_t SubstGlobal(TBuf& s, typename TBuf::char_type from, typename TBuf::char_type to, TPool& pool) { + size_t result = 0; + size_t off = s.find(from); + if (off == TBuf::npos) + return 0; + + s = TBuf(pool.Append(s), s.size()); + + for (typename TBuf::char_type* it = const_cast<typename TBuf::char_type*>(s.begin()) + off; it != s.end(); ++it) { + if (*it == from) { + *it = to; + ++result; + } + } + return result; +} diff --git a/library/cpp/string_utils/subst_buf/ya.make b/library/cpp/string_utils/subst_buf/ya.make new file mode 100644 index 0000000000..8b8793f5b3 --- /dev/null +++ b/library/cpp/string_utils/subst_buf/ya.make @@ -0,0 +1,7 @@ +LIBRARY() + +SRCS( + substbuf.cpp +) + +END() diff --git a/library/cpp/ucompress/README.md b/library/cpp/ucompress/README.md new file mode 100644 index 0000000000..5a6e9d8f42 --- /dev/null +++ b/library/cpp/ucompress/README.md @@ -0,0 +1 @@ +Compatible implementation of library/python/compress (also known as "uc" - uber compressor: tools/uc, ya tool uc). diff --git a/library/cpp/ucompress/common.h b/library/cpp/ucompress/common.h new file mode 100644 index 0000000000..d59cde9cf1 --- /dev/null +++ b/library/cpp/ucompress/common.h @@ -0,0 +1,8 @@ +#pragma once + + +namespace NUCompress { + // These limitations come from original implementation - library/python/compress + using TBlockLen = ui32; + constexpr TBlockLen MaxCompressedLen = 100000000; +} diff --git a/library/cpp/ucompress/reader.cpp b/library/cpp/ucompress/reader.cpp new file mode 100644 index 0000000000..45a8ca8da2 --- /dev/null +++ b/library/cpp/ucompress/reader.cpp @@ -0,0 +1,58 @@ +#include "reader.h" +#include "common.h" + +#include <library/cpp/blockcodecs/codecs.h> +#include <library/cpp/json/json_reader.h> + +#include <util/system/byteorder.h> + + +using namespace NUCompress; + +TDecodedInput::TDecodedInput(IInputStream* in) + : S_(in) +{ + Y_ENSURE_EX(S_, TBadArgumentException() << "Null output stream"); +} + +TDecodedInput::~TDecodedInput() = default; + +size_t TDecodedInput::DoUnboundedNext(const void** ptr) { + if (!C_) { + TBlockLen blockLen = 0; + S_->LoadOrFail(&blockLen, sizeof(blockLen)); + blockLen = LittleToHost(blockLen); + Y_ENSURE(blockLen <= MaxCompressedLen, "broken stream"); + + TString buf = TString::Uninitialized(blockLen); + S_->LoadOrFail(buf.Detach(), blockLen); + + NJson::TJsonValue hdr; + Y_ENSURE(NJson::ReadJsonTree(buf, &hdr), "cannot parse header, suspect old format"); + + auto& codecName = hdr["codec"].GetString(); + Y_ENSURE(codecName, "header does not have codec info"); + + // Throws TNotFound + C_ = NBlockCodecs::Codec(codecName); + Y_ASSERT(C_); + } + + TBlockLen blockLen = 0; + size_t actualRead = S_->Load(&blockLen, sizeof(blockLen)); + if (!actualRead) { + // End of stream + return 0; + } + Y_ENSURE(actualRead == sizeof(blockLen), "broken stream: cannot read block length"); + blockLen = LittleToHost(blockLen); + Y_ENSURE(blockLen <= MaxCompressedLen, "broken stream"); + + TBuffer block; + block.Resize(blockLen); + S_->LoadOrFail(block.Data(), blockLen); + + C_->Decode(block, D_); + *ptr = D_.Data(); + return D_.Size(); +} diff --git a/library/cpp/ucompress/reader.h b/library/cpp/ucompress/reader.h new file mode 100644 index 0000000000..5a5d1c9a89 --- /dev/null +++ b/library/cpp/ucompress/reader.h @@ -0,0 +1,25 @@ +#pragma once + +#include <util/generic/buffer.h> +#include <util/stream/walk.h> + + +namespace NBlockCodecs { + struct ICodec; +} + +namespace NUCompress { + class TDecodedInput: public IWalkInput { + public: + TDecodedInput(IInputStream* in); + ~TDecodedInput() override; + + private: + size_t DoUnboundedNext(const void** ptr) override; + + private: + IInputStream* const S_; + const NBlockCodecs::ICodec* C_ = nullptr; + TBuffer D_; + }; +} diff --git a/library/cpp/ucompress/writer.cpp b/library/cpp/ucompress/writer.cpp new file mode 100644 index 0000000000..40f8b12108 --- /dev/null +++ b/library/cpp/ucompress/writer.cpp @@ -0,0 +1,95 @@ +#include "writer.h" +#include "common.h" + +#include <library/cpp/blockcodecs/codecs.h> +#include <library/cpp/json/writer/json.h> + +#include <util/generic/scope.h> +#include <util/generic/yexception.h> +#include <util/system/byteorder.h> + + +using namespace NUCompress; + +TCodedOutput::TCodedOutput(IOutputStream* out, const NBlockCodecs::ICodec* c, size_t bufLen) + : C_(c) + , D_(bufLen) + , S_(out) +{ + Y_ENSURE_EX(C_, TBadArgumentException() << "Null codec"); + Y_ENSURE_EX(S_, TBadArgumentException() << "Null output stream"); + D_.Resize(bufLen); + Y_ENSURE_EX(C_->MaxCompressedLength(D_) <= MaxCompressedLen, TBadArgumentException() << "Too big buffer size: " << bufLen); + D_.Clear(); +} + +TCodedOutput::~TCodedOutput() { + try { + Finish(); + } catch (...) { + } +} + +void TCodedOutput::DoWrite(const void* buf, size_t len) { + Y_ENSURE(S_, "Stream finished already"); + const char* in = static_cast<const char*>(buf); + + while (len) { + const size_t avail = D_.Avail(); + if (len < avail) { + D_.Append(in, len); + return; + } + + D_.Append(in, avail); + Y_ASSERT(!D_.Avail()); + in += avail; + len -= avail; + + FlushImpl(); + } +} + +void TCodedOutput::FlushImpl() { + if (!HdrWritten) { + NJsonWriter::TBuf jBuf; + jBuf.BeginObject(); + jBuf.WriteKey("codec"); + jBuf.WriteString(C_->Name()); + jBuf.EndObject(); + + TString jStr = jBuf.Str() + '\n'; + const TBlockLen lenToSave = HostToLittle(jStr.length()); + S_->Write(&lenToSave, sizeof(lenToSave)); + S_->Write(jStr.Detach(), jStr.length()); + HdrWritten = true; + } + + O_.Reserve(C_->MaxCompressedLength(D_)); + const size_t oLen = C_->Compress(D_, O_.Data()); + Y_ASSERT(oLen <= MaxCompressedLen); + + const TBlockLen lenToSave = HostToLittle(oLen); + S_->Write(&lenToSave, sizeof(lenToSave)); + S_->Write(O_.Data(), oLen); + + D_.Clear(); + O_.Clear(); +} + +void TCodedOutput::DoFlush() { + if (S_ && D_) { + FlushImpl(); + } +} + +void TCodedOutput::DoFinish() { + if (S_) { + Y_DEFER { + S_ = nullptr; + }; + FlushImpl(); + // Write zero-length block as EOF marker. + FlushImpl(); + } +} diff --git a/library/cpp/ucompress/writer.h b/library/cpp/ucompress/writer.h new file mode 100644 index 0000000000..4d3ae71093 --- /dev/null +++ b/library/cpp/ucompress/writer.h @@ -0,0 +1,31 @@ +#pragma once + +#include <util/generic/buffer.h> +#include <util/stream/output.h> + + +namespace NBlockCodecs { + struct ICodec; +} + +namespace NUCompress { + class TCodedOutput: public IOutputStream { + public: + TCodedOutput(IOutputStream* out, const NBlockCodecs::ICodec* c, size_t bufLen = 16 << 20); + ~TCodedOutput() override; + + private: + void DoWrite(const void* buf, size_t len) override; + void DoFlush() override; + void DoFinish() override; + + void FlushImpl(); + + private: + const NBlockCodecs::ICodec* const C_; + TBuffer D_; + TBuffer O_; + IOutputStream* S_; + bool HdrWritten = false; + }; +} diff --git a/library/cpp/ucompress/ya.make b/library/cpp/ucompress/ya.make new file mode 100644 index 0000000000..6582dd9a41 --- /dev/null +++ b/library/cpp/ucompress/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +PEERDIR( + library/cpp/blockcodecs + library/cpp/json +) + +SRCS( + reader.cpp + writer.cpp +) + +END() + +RECURSE( + tests + ut +) diff --git a/library/cpp/zipatch/reader.cpp b/library/cpp/zipatch/reader.cpp new file mode 100644 index 0000000000..03ac365da1 --- /dev/null +++ b/library/cpp/zipatch/reader.cpp @@ -0,0 +1,173 @@ +#include "reader.h" + +#include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_value.h> + +#include <util/generic/hash.h> +#include <util/memory/tempbuf.h> + +#include <contrib/libs/libarchive/libarchive/archive.h> +#include <contrib/libs/libarchive/libarchive/archive_entry.h> + +using namespace NJson; + +namespace NZipatch { + +class TReader::TImpl { + + using TEntry = archive_entry; + +public: + TImpl() { + if ((Archive_ = archive_read_new()) == nullptr) { + ythrow yexception() << "can't create archive object"; + } + } + + TImpl(const TFsPath& path) + : TImpl() + { + archive_read_support_filter_all(Archive_); + archive_read_support_format_zip(Archive_); + + if (ARCHIVE_OK != archive_read_open_filename(Archive_, TString(path).c_str(), 10240)) { + ythrow yexception() << "can't open archive path = " << path; + } + + Read(); + } + + TImpl(const TStringBuf buf) + : TImpl() + { + archive_read_support_filter_all(Archive_); + archive_read_support_format_zip(Archive_); + + if (ARCHIVE_OK != archive_read_open_memory(Archive_, buf.data(), buf.size())) { + ythrow yexception() << "can't open in-memory archive"; + } + + Read(); + } + + ~TImpl() { + for (const auto& item : Files_) { + archive_entry_free(item.second.first); + } + if (Archive_) { + archive_read_free(Archive_); + } + } + + void Enumerate(TOnEvent cb) const { + for (const auto& item : Actions_) { + TEvent event; + + event.Action = GetTypeFromString(item["type"].GetStringSafe(TString())); + event.Path = item["path"].GetStringSafe(TString()); + event.Executable = item["executable"].GetBooleanSafe(false); + event.Symlink = false; + + if (event.Action == Copy || event.Action == Move) { + event.Source.Path = item["orig_path"].GetStringSafe(TString()); + event.Source.Revision = item["orig_revision"].GetUIntegerRobust(); + } + if (event.Action == StoreFile) { + auto fi = Files_.find(event.Path); + if (fi == Files_.end()) { + ythrow yexception() << "can't find file; path = " << event.Path; + } + + event.Data = fi->second.second; + event.Symlink = archive_entry_filetype(fi->second.first) == AE_IFLNK; + } + + if (event.Path) { + cb(event); + } + } + } + +private: + EAction GetTypeFromString(const TString& type) const { + if (type == "store_file") { + return StoreFile; + } + if (type == "mkdir") { + return MkDir; + } + if (type == "remove_file" || type == "remove_tree") { + return Remove; + } + if (type == "svn_copy") { + return Copy; + } + return Unknown; + } + + void Read() { + TEntry* current = nullptr; + + while (archive_read_next_header(Archive_, ¤t) == ARCHIVE_OK) { + const TStringBuf path(archive_entry_pathname(current)); + + if (path == "actions.json") { + TJsonValue value; + ReadJsonFastTree(GetData(current), &value, true); + + for (const auto& item : value.GetArraySafe()) { + Actions_.push_back(item); + } + } else if (AsciiHasPrefix(path, "files/")) { + TEntry* entry = archive_entry_clone(current); + + Files_.emplace(path.substr(6), std::make_pair(entry, GetData(current))); + } + } + + archive_read_close(Archive_); + } + + TString GetData(TEntry* current) const { + if (archive_entry_filetype(current) == AE_IFLNK) { + return archive_entry_symlink(current); + } + + if (const auto size = archive_entry_size(current)) { + TTempBuf data(size); + + if (archive_read_data(Archive_, data.Data(), size) != size) { + ythrow yexception() << "can't read entry"; + } + + return TString(data.Data(), size); + } + + return TString(); + } + +private: + struct archive* Archive_; + TVector<TJsonValue> Actions_; + THashMap<TString, std::pair<TEntry*, TString>> Files_; +}; + +TReader::TReader(const TFsPath& path) + : Impl_(new TImpl(path)) +{ +} + +TReader::TReader(const TStringBuf buf) + : Impl_(new TImpl(buf)) +{ +} + +TReader::~TReader() +{ } + +void TReader::Enumerate(TOnEvent cb) const { + Impl_->Enumerate(cb); +} + +} // namespace NZipatch + diff --git a/library/cpp/zipatch/reader.h b/library/cpp/zipatch/reader.h new file mode 100644 index 0000000000..a94bc79b71 --- /dev/null +++ b/library/cpp/zipatch/reader.h @@ -0,0 +1,48 @@ +#pragma once + +#include <util/folder/path.h> +#include <util/generic/ptr.h> + +namespace NZipatch { + +class TReader { +public: + enum EAction { + Unknown = 0, + Copy, + MkDir, + Move, + Remove, + StoreFile, + }; + + struct TSource { + TString Path; + ui64 Revision; + }; + + struct TEvent { + EAction Action; + TString Path; + TStringBuf Data; + TSource Source; + bool Executable; + bool Symlink; + }; + + using TOnEvent = std::function<void(const TEvent&)>; + +public: + TReader(const TFsPath& path); + TReader(const TStringBuf buf); + ~TReader(); + + void Enumerate(TOnEvent cb) const; + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + +} // namespace NZipatch + diff --git a/library/cpp/zipatch/writer.cpp b/library/cpp/zipatch/writer.cpp new file mode 100644 index 0000000000..a9ca451b01 --- /dev/null +++ b/library/cpp/zipatch/writer.cpp @@ -0,0 +1,232 @@ +#include "writer.h" + +#include <library/cpp/json/json_value.h> +#include <library/cpp/json/json_writer.h> + +#include <util/string/join.h> + +#include <contrib/libs/libarchive/libarchive/archive.h> +#include <contrib/libs/libarchive/libarchive/archive_entry.h> + +using namespace NJson; + +namespace NZipatch { + +class TWriter::TImpl { +public: + TImpl(const TFsPath& path) + : Actions_(new TJsonValue(JSON_ARRAY)) + , Meta_(new TJsonValue(JSON_MAP)) + , Revprops_(new TJsonValue(JSON_MAP)) + , Archive_(nullptr) + { + Archive_ = archive_write_new(); + if (!Archive_) { + ythrow yexception() << "can't create archive object"; + } + archive_write_set_format_zip(Archive_); + archive_write_zip_set_compression_deflate(Archive_); + + if (ARCHIVE_OK != archive_write_open_filename(Archive_, TString(path).c_str())) { + ythrow yexception() << "can't open archive path = " << path; + } + } + + ~TImpl() { + if (Actions_ || Meta_ || Revprops_) { + Finish(); + } + if (Archive_) { + archive_write_free(Archive_); + } + } + + void Finish() { + if (Actions_) { + if (Archive_) { + WriteEntry("actions.json", WriteJson(Actions_.Get(), true, false)); + } + + Actions_.Destroy(); + } + + if (Meta_) { + if (Archive_) { + WriteEntry("meta.json", WriteJson(Meta_.Get(), true)); + } + + Meta_.Destroy(); + } + + if (Revprops_) { + if (Archive_) { + WriteEntry("revprops.json", WriteJson(Revprops_.Get(), true)); + } + + Revprops_.Destroy(); + } + + if (Archive_) { + archive_write_close(Archive_); + } + } + + void Copy(const TString& path, const TOrigin& origin) { + Y_ASSERT(origin.Path); + Y_ASSERT(origin.Revision); + + if (Actions_) { + TJsonValue item; + item["type"] = "svn_copy"; + item["path"] = path; + item["orig_path"] = origin.Path; + item["orig_revision"] = origin.Revision; + Actions_->AppendValue(item); + } + } + + void MkDir(const TString& path) { + if (Actions_) { + TJsonValue item; + item["type"] = "mkdir"; + item["path"] = path; + Actions_->AppendValue(item); + } + } + + void RemoveFile(const TString& path) { + if (Actions_) { + TJsonValue item; + item["type"] = "remove_file"; + item["path"] = path; + Actions_->AppendValue(item); + } + } + + void RemoveTree(const TString& path) { + if (Actions_) { + TJsonValue item; + item["type"] = "remove_tree"; + item["path"] = path; + Actions_->AppendValue(item); + } + } + + void StoreFile( + const TString& path, + const TString& data, + const bool execute, + const bool symlink, + const TMaybe<bool> binaryHint, + const TMaybe<bool> encrypted) + { + if (Actions_) { + const TString file = Join("/", "files", path); + TJsonValue item; + item["type"] = "store_file"; + item["executable"] = execute; + item["path"] = path; + item["file"] = file; + if (binaryHint.Defined()) { + item["binary_hint"] = *binaryHint; + } + if (encrypted.Defined()) { + item["encrypted"] = *encrypted; + } + Actions_->AppendValue(item); + WriteEntry(file, data, symlink); + } + } + + void SetBaseSvnRevision(ui64 revision) { + if (Meta_) { + (*Meta_)["base_svn_revision"] = revision; + } + } + + void AddRevprop(const TString& prop, const TString& value) { + if (Revprops_) { + (*Revprops_)[prop] = value; + } + } + +private: + void WriteEntry( + const TString& path, + const TString& data, + const bool symlink = false) + { + struct archive_entry* const entry = archive_entry_new(); + // Write header. + archive_entry_set_pathname(entry, path.c_str()); + archive_entry_set_size(entry, data.size()); + archive_entry_set_filetype(entry, symlink ? AE_IFLNK : AE_IFREG); + archive_entry_set_perm(entry, 0644); + if (symlink) { + archive_entry_set_symlink(entry, data.c_str()); + } + archive_write_header(Archive_, entry); + // Write data. + // If entry is symlink then entry size become zero. + if (archive_entry_size(entry) > 0) { + archive_write_data(Archive_, data.data(), data.size()); + } + archive_entry_free(entry); + } + +private: + THolder<NJson::TJsonValue> Actions_; + THolder<NJson::TJsonValue> Meta_; + THolder<NJson::TJsonValue> Revprops_; + struct archive* Archive_; +}; + +TWriter::TWriter(const TFsPath& path) + : Impl_(new TImpl(path)) +{ +} + +TWriter::~TWriter() +{ } + +void TWriter::Finish() { + Impl_->Finish(); +} + +void TWriter::SetBaseSvnRevision(ui64 revision) { + Impl_->SetBaseSvnRevision(revision); +} + +void TWriter::AddRevprop(const TString& prop, const TString& value) { + Impl_->AddRevprop(prop, value); +} + +void TWriter::Copy(const TString& path, const TOrigin& origin) { + Impl_->Copy(path, origin); +} + +void TWriter::MkDir(const TString& path) { + Impl_->MkDir(path); +} + +void TWriter::RemoveFile(const TString& path) { + Impl_->RemoveFile(path); +} + +void TWriter::RemoveTree(const TString& path) { + Impl_->RemoveTree(path); +} + +void TWriter::StoreFile( + const TString& path, + const TString& data, + const bool execute, + const bool symlink, + const TMaybe<bool> binaryHint, + const TMaybe<bool> encrypted) +{ + Impl_->StoreFile(path, data, execute, symlink, binaryHint, encrypted); +} + +} // namespace NZipatch + diff --git a/library/cpp/zipatch/writer.h b/library/cpp/zipatch/writer.h new file mode 100644 index 0000000000..75cbe49777 --- /dev/null +++ b/library/cpp/zipatch/writer.h @@ -0,0 +1,51 @@ +#pragma once + +#include <util/folder/path.h> +#include <util/generic/ptr.h> +#include <util/generic/maybe.h> + +namespace NZipatch { + +class TWriter { +public: + struct TOrigin { + TString Path; + ui64 Revision; + + inline TOrigin(const TString& path, const ui64 revision) + : Path(path) + , Revision(revision) + { } + }; + + TWriter(const TFsPath& path); + ~TWriter(); + + void Finish(); + + void SetBaseSvnRevision(ui64 revision); + + void AddRevprop(const TString& prop, const TString& value); + + void Copy(const TString& path, const TOrigin& origin); + + void MkDir(const TString& path); + + void RemoveFile(const TString& path); + + void RemoveTree(const TString& path); + + void StoreFile(const TString& path, + const TString& data, + const bool execute, + const bool symlink, + const TMaybe<bool> binaryHint = Nothing(), + const TMaybe<bool> encrypted = Nothing()); + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + +} // namespace NZipatch + diff --git a/library/cpp/zipatch/ya.make b/library/cpp/zipatch/ya.make new file mode 100644 index 0000000000..f8fd6006b2 --- /dev/null +++ b/library/cpp/zipatch/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + reader.cpp + writer.cpp +) + +PEERDIR( + contrib/libs/libarchive + library/cpp/json +) + +GENERATE_ENUM_SERIALIZATION(reader.h) + +END() + |