diff options
author | vvvv <vvvv@yandex-team.com> | 2024-11-01 15:41:40 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.com> | 2024-11-01 15:55:52 +0300 |
commit | 3325f745e67f7f442790822b5c9c5e9996708be7 (patch) | |
tree | f7318d68bbe8990092715436444b05297ce35777 /yql/essentials | |
parent | 6dce3f1c71786f2694b73b1a5155efc58f4557dd (diff) | |
download | ydb-3325f745e67f7f442790822b5c9c5e9996708be7.tar.gz |
Moved yql/utils YQL-19206
Также была выделена жирная зависимость из yql/utils в yql/utils/network, в результате library/cpp/getopt была добавлена явно в те проекты, которые ее ранее наследовали, а не указывали явно
commit_hash:36aa4c41f807b4cbbf70a3ed7ac0a1a5079bb75d
Diffstat (limited to 'yql/essentials')
118 files changed, 6877 insertions, 9 deletions
diff --git a/yql/essentials/public/issue/ya.make b/yql/essentials/public/issue/ya.make index 0179b23bbb..79f4d3e539 100644 --- a/yql/essentials/public/issue/ya.make +++ b/yql/essentials/public/issue/ya.make @@ -14,7 +14,7 @@ PEERDIR( library/cpp/resource contrib/ydb/public/api/protos yql/essentials/public/issue/protos - contrib/ydb/library/yql/utils + yql/essentials/utils ) GENERATE_ENUM_SERIALIZATION(yql_warning.h) diff --git a/yql/essentials/public/issue/yql_issue.cpp b/yql/essentials/public/issue/yql_issue.cpp index 3fc8e3767c..50589e9976 100644 --- a/yql/essentials/public/issue/yql_issue.cpp +++ b/yql/essentials/public/issue/yql_issue.cpp @@ -1,7 +1,7 @@ #include "yql_issue.h" #include "yql_issue_id.h" -#include <contrib/ydb/library/yql/utils/utf8.h> +#include <yql/essentials/utils/utf8.h> #include <library/cpp/colorizer/output.h> @@ -307,4 +307,4 @@ void Out<NYql::TIssue>(IOutputStream& out, const NYql::TIssue& error) { template <> void Out<NYql::TIssues>(IOutputStream& out, const NYql::TIssues& error) { error.PrintTo(out); -}
\ No newline at end of file +} diff --git a/yql/essentials/public/result_format/ya.make b/yql/essentials/public/result_format/ya.make index 82a206be86..3181d30365 100644 --- a/yql/essentials/public/result_format/ya.make +++ b/yql/essentials/public/result_format/ya.make @@ -13,7 +13,7 @@ PEERDIR( library/cpp/yson/node library/cpp/string_utils/base64 yql/essentials/public/issue - contrib/ydb/library/yql/utils + yql/essentials/utils ) END() diff --git a/yql/essentials/public/result_format/yql_codec_results.cpp b/yql/essentials/public/result_format/yql_codec_results.cpp index d375131b3d..911993c0c4 100644 --- a/yql/essentials/public/result_format/yql_codec_results.cpp +++ b/yql/essentials/public/result_format/yql_codec_results.cpp @@ -2,7 +2,7 @@ #include <library/cpp/string_utils/base64/base64.h> -#include <contrib/ydb/library/yql/utils/utf8.h> +#include <yql/essentials/utils/utf8.h> namespace NYql { namespace NResult { diff --git a/yql/essentials/public/result_format/yql_restricted_yson.cpp b/yql/essentials/public/result_format/yql_restricted_yson.cpp index da89a3c436..405411aebf 100644 --- a/yql/essentials/public/result_format/yql_restricted_yson.cpp +++ b/yql/essentials/public/result_format/yql_restricted_yson.cpp @@ -1,7 +1,7 @@ #include "yql_restricted_yson.h" -#include <contrib/ydb/library/yql/utils/parse_double.h> -#include <contrib/ydb/library/yql/utils/yql_panic.h> +#include <yql/essentials/utils/parse_double.h> +#include <yql/essentials/utils/yql_panic.h> #include <library/cpp/yson/detail.h> #include <library/cpp/yson/parser.h> diff --git a/yql/essentials/public/result_format/yql_result_format_data.cpp b/yql/essentials/public/result_format/yql_result_format_data.cpp index 5e1e89b74d..a4aeb2545d 100644 --- a/yql/essentials/public/result_format/yql_result_format_data.cpp +++ b/yql/essentials/public/result_format/yql_result_format_data.cpp @@ -3,8 +3,8 @@ #include "yql_result_format_impl.h" #include "yql_restricted_yson.h" -#include <contrib/ydb/library/yql/utils/parse_double.h> -#include <contrib/ydb/library/yql/utils/utf8.h> +#include <yql/essentials/utils/parse_double.h> +#include <yql/essentials/utils/utf8.h> #include <library/cpp/yson/node/node_builder.h> #include <library/cpp/string_utils/base64/base64.h> diff --git a/yql/essentials/utils/backtrace/backtrace.cpp b/yql/essentials/utils/backtrace/backtrace.cpp new file mode 100644 index 0000000000..938ac90501 --- /dev/null +++ b/yql/essentials/utils/backtrace/backtrace.cpp @@ -0,0 +1,215 @@ +#include "backtrace.h" + +#include "backtrace_lib.h" +#include "symbolizer.h" + +#include <library/cpp/deprecated/atomic/atomic.h> +#include <library/cpp/malloc/api/malloc.h> + +#include <util/generic/string.h> +#include <util/generic/xrange.h> +#include <util/generic/yexception.h> +#include <util/stream/format.h> +#include <util/stream/output.h> +#include <util/system/backtrace.h> +#include <util/system/type_name.h> +#include <util/system/execpath.h> +#include <util/system/platform.h> +#include <util/system/mlock.h> + +#ifdef _linux_ +#include <signal.h> +#endif + +#include <functional> +#include <vector> +#include <sstream> +#include <iostream> + +#ifndef _win_ + +bool SetSignalHandler(int signo, void (*handler)(int)) { + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_flags = SA_RESETHAND; + sa.sa_handler = handler; + sigfillset(&sa.sa_mask); + return sigaction(signo, &sa, nullptr) != -1; +} + +namespace { +#if defined(_linux_) && defined(_x86_64_) + bool SetSignalAction(int signo, void (*handler)(int, siginfo_t*, void*)) { + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_flags = SA_RESETHAND | SA_SIGINFO; + sa.sa_sigaction = (decltype(sa.sa_sigaction))handler; + sigfillset(&sa.sa_mask); + return sigaction(signo, &sa, nullptr) != -1; + } +#endif +} // namespace +#endif // _win_ + +TAtomic BacktraceStarted = 0; + +void SetFatalSignalHandler(void (*handler)(int)) { + Y_UNUSED(handler); +#ifndef _win_ + for (int signo: {SIGSEGV, SIGILL, SIGABRT, SIGFPE}) { + if (!SetSignalHandler(signo, handler)) { + ythrow TSystemError() << "Cannot set handler for signal " << strsignal(signo); + } + } +#endif +} + +#if defined(_linux_) && defined(_x86_64_) +void SetFatalSignalAction(void (*sigaction)(int, siginfo_t*, void*)) +{ + for (int signo: {SIGSEGV, SIGILL, SIGABRT, SIGFPE}) { + if (!SetSignalAction(signo, sigaction)) { + ythrow TSystemError() << "Cannot set sigaction for signal " << strsignal(signo); + } + } +} +#endif + +namespace { + std::vector<std::function<void(int)>> Before, After; + bool KikimrSymbolize = false; + NYql::NBacktrace::TCollectedFrame Frames[NYql::NBacktrace::Limit]; + + void CallCallbacks(decltype(Before)& where, int signum) { + for (const auto &fn: where) { + if (fn) { + fn(signum); + } + } + } + + void PrintFrames(IOutputStream* out, const NYql::NBacktrace::TCollectedFrame* frames, size_t cnt); + + void DoBacktrace(IOutputStream* out, void* data) { + auto cnt = NYql::NBacktrace::CollectFrames(Frames, data); + PrintFrames(out, Frames, cnt); + } + + void DoBacktrace(IOutputStream* out, void** stack, size_t cnt) { + Y_UNUSED(NYql::NBacktrace::CollectFrames(Frames, stack, cnt)); + PrintFrames(out, Frames, cnt); + } + + + void SignalHandler(int signum) { + CallCallbacks(Before, signum); + + if (!NMalloc::IsAllocatorCorrupted) { + if (!AtomicTryLock(&BacktraceStarted)) { + return; + } + + UnlockAllMemory(); + DoBacktrace(&Cerr, nullptr); + } + + CallCallbacks(After, signum); + raise(signum); + } + +#if defined(_linux_) && defined(_x86_64_) + void SignalAction(int signum, siginfo_t*, void* context) { + Y_UNUSED(SignalHandler); + CallCallbacks(Before, signum); + + if (!NMalloc::IsAllocatorCorrupted) { + if (!AtomicTryLock(&BacktraceStarted)) { + return; + } + + UnlockAllMemory(); + DoBacktrace(&Cerr, context); + } + + CallCallbacks(After, signum); + raise(signum); + } +#endif +} + +namespace NYql { + namespace NBacktrace { + THashMap<TString, TString> Mapping; + + void SetModulesMapping(const THashMap<TString, TString>& mapping) { + Mapping = mapping; + } + + void AddBeforeFatalCallback(const std::function<void(int)>& before) { + Before.push_back(before); + } + + void AddAfterFatalCallback(const std::function<void(int)>& after) { + After.push_back(after); + } + + void RegisterKikimrFatalActions() { +#if defined(_linux_) && defined(_x86_64_) + SetFatalSignalAction(SignalAction); +#else + SetFatalSignalHandler(SignalHandler); +#endif + } + + void EnableKikimrSymbolize() { + KikimrSymbolize = true; + } + + void KikimrBackTrace() { + FormatBackTrace(&Cerr); + } + + void KikimrBackTraceFormatImpl(IOutputStream* out) { + KikimrSymbolize = true; + UnlockAllMemory(); + DoBacktrace(out, nullptr); + } + + void KikimrBacktraceFormatImpl(IOutputStream* out, void* const* stack, size_t stackSize) { + KikimrSymbolize = true; + DoBacktrace(out, (void**)stack, stackSize); + } + + } +} + +void EnableKikimrBacktraceFormat() { + SetFormatBackTraceFn(NYql::NBacktrace::KikimrBacktraceFormatImpl); +} + +namespace { + NYql::NBacktrace::TStackFrame SFrames[NYql::NBacktrace::Limit]; + void PrintFrames(IOutputStream* out, const NYql::NBacktrace::TCollectedFrame* frames, size_t count) { + auto& outp = *out; + Y_UNUSED(SFrames); +#if defined(_linux_) && defined(_x86_64_) + if (KikimrSymbolize) { + for (size_t i = 0; i < count; ++i) { + SFrames[i] = NYql::NBacktrace::TStackFrame{frames[i].File, frames[i].Address}; + } + NYql::NBacktrace::Symbolize(SFrames, count, out); + return; + } +#endif + outp << "StackFrames: " << count << "\n"; + for (size_t i = 0; i < count; ++i) { + auto& frame = frames[i]; + auto fileName = frame.File; + if (!strcmp(fileName, "/proc/self/exe")) { + fileName = "EXE"; + } + auto it = NYql::NBacktrace::Mapping.find(fileName); + outp << "StackFrame: " << (it == NYql::NBacktrace::Mapping.end() ? fileName : it->second) << " " << frame.Address << " 0\n"; + } + } +}
\ No newline at end of file diff --git a/yql/essentials/utils/backtrace/backtrace.h b/yql/essentials/utils/backtrace/backtrace.h new file mode 100644 index 0000000000..cd843d8cb4 --- /dev/null +++ b/yql/essentials/utils/backtrace/backtrace.h @@ -0,0 +1,34 @@ +#pragma once + +#include <util/system/types.h> +#include <util/generic/string.h> +#include <util/generic/hash.h> + +#include <functional> + +bool SetSignalHandler(int signo, void (*handler)(int)); +class IOutputStream; + +void EnableKikimrBacktraceFormat(); + +namespace NYql { + +namespace NBacktrace { +const int Limit = 400; + +void RegisterKikimrFatalActions(); +void AddAfterFatalCallback(const std::function<void(int)>& after); +void AddBeforeFatalCallback(const std::function<void(int)>& before); +void EnableKikimrSymbolize(); + +void KikimrBackTrace(); +void KikimrBackTraceFormatImpl(IOutputStream*); +void KikimrBacktraceFormatImpl(IOutputStream* out, void* const* stack, size_t stackSize); + +void SetModulesMapping(const THashMap<TString, TString>& mapping); + +TString Symbolize(const TString& input, const THashMap<TString, TString>& mapping); + +} /* namespace Backtrace */ + +} /* namespace NYql */ diff --git a/yql/essentials/utils/backtrace/backtrace_dummy.cpp b/yql/essentials/utils/backtrace/backtrace_dummy.cpp new file mode 100644 index 0000000000..ec54e55dca --- /dev/null +++ b/yql/essentials/utils/backtrace/backtrace_dummy.cpp @@ -0,0 +1,11 @@ +#include "backtrace_lib.h" + +#include <util/system/backtrace.h> + +namespace NYql { + namespace NBacktrace { + size_t CollectBacktrace(void** addresses, size_t limit, void*) { + return BackTrace(addresses, limit); + } + } +}
\ No newline at end of file diff --git a/yql/essentials/utils/backtrace/backtrace_lib.cpp b/yql/essentials/utils/backtrace/backtrace_lib.cpp new file mode 100644 index 0000000000..c0c7b2c667 --- /dev/null +++ b/yql/essentials/utils/backtrace/backtrace_lib.cpp @@ -0,0 +1,78 @@ +#include "backtrace_lib.h" + +#include <util/generic/hash.h> +#include <util/system/execpath.h> + +#include <algorithm> + +#if defined(_linux_) && defined(_x86_64_) +#include <dlfcn.h> +#include <link.h> +#endif + +namespace { + const size_t Limit = 400; + void* Stack[Limit]; + + struct TDllInfo { + const char* Path; + ui64 BaseAddress; + }; + + const size_t MaxDLLCnt = 100; + TDllInfo DLLs[MaxDLLCnt]; + size_t DLLCount = 0; + +#if defined(_linux_) && defined(_x86_64_) + int DlIterCallback(struct dl_phdr_info *info, size_t, void *data) { + if (*info->dlpi_name) { + if (DLLCount + 1 < MaxDLLCnt) { + reinterpret_cast<std::remove_reference_t<decltype(DLLs[0])>*>(data)[DLLCount++] = { info->dlpi_name, (ui64)info->dlpi_addr }; + } + } + return 0; + } +#endif + bool comp(const TDllInfo& a, const TDllInfo& b) { + return strcmp(a.Path, b.Path) < 0; + } + +} + +namespace NYql { + namespace NBacktrace { + TCollectedFrame::TCollectedFrame(uintptr_t addr) { + File = GetPersistentExecPath().c_str(); + Address = addr; +#if defined(_linux_) && defined(_x86_64_) + Dl_info dlInfo; + memset(&dlInfo, 0, sizeof(dlInfo)); + auto ret = dladdr(reinterpret_cast<void*>(addr), &dlInfo); + if (ret) { + auto it = std::lower_bound(DLLs, DLLs + DLLCount, std::remove_reference_t<decltype(DLLs[0])> {dlInfo.dli_fname, {}}, comp); + if (it != DLLs + DLLCount && !strcmp(it->Path, dlInfo.dli_fname)) { + File = it->Path; + Address -= it->BaseAddress; + } + } +#endif + } + + size_t CollectFrames(TCollectedFrame* frames, void* data) { +#if defined(_linux_) && defined(_x86_64_) + DLLCount = 0; + dl_iterate_phdr(DlIterCallback, &DLLs); +#endif + std::stable_sort(DLLs, DLLs + DLLCount, comp); + size_t cnt = CollectBacktrace(Stack, Limit, data); + return CollectFrames(frames, Stack, cnt); + } + + size_t CollectFrames(TCollectedFrame* frames, void** stack, size_t cnt) { + for (size_t i = 0; i < cnt; ++i) { + new (frames + i)TCollectedFrame(reinterpret_cast<uintptr_t>(stack[i])); + } + return cnt; + } + } +}
\ No newline at end of file diff --git a/yql/essentials/utils/backtrace/backtrace_lib.h b/yql/essentials/utils/backtrace/backtrace_lib.h new file mode 100644 index 0000000000..3404716da6 --- /dev/null +++ b/yql/essentials/utils/backtrace/backtrace_lib.h @@ -0,0 +1,18 @@ +#pragma once + +#include <util/generic/string.h> +#include <util/generic/vector.h> + +namespace NYql { + namespace NBacktrace { + size_t CollectBacktrace(void** addresses, size_t limit, void* data); + struct TCollectedFrame { + TCollectedFrame(uintptr_t addr); + TCollectedFrame() = default; + const char* File; + size_t Address; + }; + size_t CollectFrames(TCollectedFrame* frames, void* data); + size_t CollectFrames(TCollectedFrame* frames, void** stack, size_t cnt); + } +}
\ No newline at end of file diff --git a/yql/essentials/utils/backtrace/backtrace_linux.cpp b/yql/essentials/utils/backtrace/backtrace_linux.cpp new file mode 100644 index 0000000000..c9a1bd5a22 --- /dev/null +++ b/yql/essentials/utils/backtrace/backtrace_linux.cpp @@ -0,0 +1,62 @@ +#include "backtrace_lib.h" + +#include <libunwind.h> +#include <signal.h> + +#include <util/system/backtrace.h> + +namespace { + size_t BackTrace(void** p, size_t len, ucontext_t* con) { + unw_context_t context; + unw_cursor_t cursor; + if (unw_getcontext(&context)) { + return 0; + } + + if (unw_init_local(&cursor, &context)) { + return 0; + } + const sigcontext* signal_mcontext = (const sigcontext*)&(con->uc_mcontext); + unw_set_reg(&cursor, UNW_X86_64_RSI, signal_mcontext->rsi); + unw_set_reg(&cursor, UNW_X86_64_RDI, signal_mcontext->rdi); + unw_set_reg(&cursor, UNW_X86_64_RBP, signal_mcontext->rbp); + unw_set_reg(&cursor, UNW_X86_64_RAX, signal_mcontext->rax); + unw_set_reg(&cursor, UNW_X86_64_RBX, signal_mcontext->rbx); + unw_set_reg(&cursor, UNW_X86_64_RCX, signal_mcontext->rcx); + unw_set_reg(&cursor, UNW_X86_64_R8, signal_mcontext->r8); + unw_set_reg(&cursor, UNW_X86_64_R9, signal_mcontext->r9); + unw_set_reg(&cursor, UNW_X86_64_R10, signal_mcontext->r10); + unw_set_reg(&cursor, UNW_X86_64_R11, signal_mcontext->r11); + unw_set_reg(&cursor, UNW_X86_64_R12, signal_mcontext->r12); + unw_set_reg(&cursor, UNW_X86_64_R13, signal_mcontext->r13); + unw_set_reg(&cursor, UNW_X86_64_R14, signal_mcontext->r14); + unw_set_reg(&cursor, UNW_X86_64_R15, signal_mcontext->r15); + unw_set_reg(&cursor, UNW_X86_64_RSP, signal_mcontext->rsp); + + unw_set_reg(&cursor, UNW_REG_SP, signal_mcontext->rsp); + unw_set_reg(&cursor, UNW_REG_IP, signal_mcontext->rip); + + size_t pos = 0; + p[pos++] = (void*)signal_mcontext->rip; + while (pos < len && unw_step(&cursor) > 0) { + unw_word_t ip = 0; + unw_get_reg(&cursor, UNW_REG_IP, &ip); + if (unw_is_signal_frame(&cursor)) { + continue; + } + p[pos++] = (void*)ip; + } + return pos; + } +} + +namespace NYql { + namespace NBacktrace { + size_t CollectBacktrace(void** addresses, size_t limit, void* data) { + if (!data) { + return BackTrace(addresses, limit); + } + return BackTrace(addresses, limit, reinterpret_cast<ucontext_t*>(data)); + } + } +}
\ No newline at end of file diff --git a/yql/essentials/utils/backtrace/backtrace_ut.cpp b/yql/essentials/utils/backtrace/backtrace_ut.cpp new file mode 100644 index 0000000000..822060ed32 --- /dev/null +++ b/yql/essentials/utils/backtrace/backtrace_ut.cpp @@ -0,0 +1,28 @@ +#include "backtrace.h" +#include <util/generic/vector.h> +#include <util/generic/string.h> +#include <library/cpp/testing/unittest/registar.h> +namespace { + Y_NO_INLINE void TestTrace394() { + TStringStream ss; + NYql::NBacktrace::KikimrBackTraceFormatImpl(&ss); +#if !defined(_hardening_enabled_) && !defined(_win_) + UNIT_ASSERT_STRING_CONTAINS(ss.Str(), "(anonymous namespace)::TestTrace394"); +#endif + } + Y_NO_INLINE void TestTrace39114() { + TStringStream ss; + NYql::NBacktrace::KikimrBackTraceFormatImpl(&ss); +#if !defined(_hardening_enabled_) && !defined(_win_) + UNIT_ASSERT_STRING_CONTAINS(ss.Str(), "(anonymous namespace)::TestTrace39114"); +#endif + } +} + +Y_UNIT_TEST_SUITE(TEST_BACKTRACE_AND_SYMBOLIZE) { + Y_UNIT_TEST(TEST_NO_KIKIMR) { + NYql::NBacktrace::EnableKikimrSymbolize(); + TestTrace394(); + TestTrace39114(); + } +} diff --git a/yql/essentials/utils/backtrace/symbolize.cpp b/yql/essentials/utils/backtrace/symbolize.cpp new file mode 100644 index 0000000000..360ff408c7 --- /dev/null +++ b/yql/essentials/utils/backtrace/symbolize.cpp @@ -0,0 +1,62 @@ +#include "backtrace.h" + +#include "symbolizer.h" + +#include <util/string/split.h> +#include <util/stream/str.h> + +namespace NYql { + + namespace NBacktrace { + TString Symbolize(const TString& input, const THashMap<TString, TString>& mapping) { +#if defined(__linux__) && defined(__x86_64__) + TString output; + TStringOutput out(output); + + i64 stackSize = -1; + TVector<TStackFrame> frames; + TVector<TString> usedFilenames; + for (TStringBuf line: StringSplitter(input).SplitByString("\n")) { + if (line.StartsWith("StackFrames:")) { + TVector<TString> parts; + Split(TString(line), " ", parts); + if (parts.size() > 1) { + TryFromString<i64>(parts[1], stackSize); + frames.reserve(stackSize); + } + } else if (line.StartsWith("StackFrame:")) { + TVector<TString> parts; + Split(TString(line), " ", parts); + TString modulePath; + ui64 address; + ui64 offset; + if (parts.size() > 3) { + modulePath = parts[1]; + TryFromString<ui64>(parts[2], address); + TryFromString<ui64>(parts[3], offset); + auto it = mapping.find(modulePath); + if (it != mapping.end()) { + modulePath = it->second; + } + usedFilenames.emplace_back(std::move(modulePath)); + frames.emplace_back(TStackFrame{usedFilenames.back().c_str(), address - offset}); + } + } else { + out << line << "\n"; + } + } + + if (stackSize == 0) { + out << "Empty stack trace\n"; + } + Symbolize(frames.data(), frames.size(), &out); + return output; +#else + Y_UNUSED(mapping); + return input; +#endif + } + + } /* namespace NBacktrace */ + +} /* namespace NYql */ diff --git a/yql/essentials/utils/backtrace/symbolizer.h b/yql/essentials/utils/backtrace/symbolizer.h new file mode 100644 index 0000000000..0d32ba25d3 --- /dev/null +++ b/yql/essentials/utils/backtrace/symbolizer.h @@ -0,0 +1,15 @@ +#pragma once +#include "backtrace.h" + +#include <util/generic/string.h> +#include <util/generic/vector.h> + +namespace NYql { + namespace NBacktrace { + struct TStackFrame { + const char* File; + size_t Address; + }; + void Symbolize(const TStackFrame* frames, size_t count, IOutputStream* out); + } +}
\ No newline at end of file diff --git a/yql/essentials/utils/backtrace/symbolizer_linux.cpp b/yql/essentials/utils/backtrace/symbolizer_linux.cpp new file mode 100644 index 0000000000..222bc5df2e --- /dev/null +++ b/yql/essentials/utils/backtrace/symbolizer_linux.cpp @@ -0,0 +1,150 @@ +#include "symbolizer.h" + +#include <contrib/libs/backtrace/backtrace.h> + +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/system/type_name.h> +#include <util/system/execpath.h> +#include <util/string/builder.h> + +#include <mutex> +#include <numeric> +#include <algorithm> + +#include <util/stream/mem.h> + +#ifdef __GNUC__ + #include <cxxabi.h> +#endif + +namespace { +const size_t MaxStrLen = 512; +const size_t MaxDemangleLen = 1024 * 1024; +char Buff[MaxDemangleLen]; + +class TNoThrowingMemoryOutput : public TMemoryOutput { +public: + TNoThrowingMemoryOutput(void* c, size_t l) : TMemoryOutput(c, l) {} + void Truncate() { + *(Buf_ - 1) = '.'; + *(Buf_ - 2) = '.'; + *(Buf_ - 3) = '.'; + } + + void DoWrite(const void* buf, size_t len) override { + bool truncated = Buf_ + len > End_; + if (truncated) { + len = std::min(len, (size_t)(End_ - Buf_)); + } + memcpy(Buf_, buf, len); + Buf_ += len; + if (truncated) { + Truncate(); + } + } + + void DoWriteC(char c) override { + if (Buf_ == End_) { + Truncate(); + } else { + *Buf_++ = c; + } + } +}; + +void HandleLibBacktraceError(void* data, const char* msg, int) { + if (!data) { + Cerr << msg; + return; + } + TNoThrowingMemoryOutput out(data, MaxStrLen - 1); + out << msg; +} + +const char* Demangle(const char* name) { +#ifndef __GNUC__ + return name; +#else + int status; + size_t len = MaxDemangleLen - 1; + const char* res = __cxxabiv1::__cxa_demangle(name, Buff, &len, &status); + + if (!res) { + return name; + } + return res; +#endif +} + +int HandleLibBacktraceFrame(void* data, uintptr_t, const char* filename, int lineno, const char* function) { + TNoThrowingMemoryOutput out(data, MaxStrLen - 1); + const char* fileName = filename ? filename : "???"; + const char* functionName = function ? Demangle(function) : "???"; + out << functionName << " at " << fileName << ":" << lineno << ":0"; + return 0; +} +} + +namespace NYql { + namespace NBacktrace { + namespace { + std::mutex Mutex; + char* Result[Limit]; + size_t Order[Limit]; + char TmpBuffer[MaxStrLen * Limit]{}; + auto CreateState(const char* filename) { + return backtrace_create_state( + filename, + 0, + HandleLibBacktraceError, + nullptr + ); + } + } + + void Symbolize(const TStackFrame* frames, size_t count, IOutputStream* out) { + if (!count) { + return; + } + memset(TmpBuffer, 0, sizeof(TmpBuffer)); + Result[0] = TmpBuffer; + for (size_t i = 1; i < Limit; ++i) { + Result[i] = Result[i - 1] + MaxStrLen; + } + const std::lock_guard lock{Mutex}; + + std::iota(Order, Order + count, 0u); + std::sort(Order, Order + count, [&frames](auto a, auto b) { return strcmp(frames[a].File, frames[b].File) < 0; }); + + struct backtrace_state* state = nullptr; + for (size_t i = 0; i < count; ++i) { + if (!i || frames[Order[i - 1]].File != frames[Order[i]].File) { + state = CreateState(frames[Order[i]].File); + } + + if (!state) { + Result[Order[i]] = nullptr; // File not found + continue; + } + + int status = backtrace_pcinfo( + state, + reinterpret_cast<uintptr_t>(frames[Order[i]].Address) - 1, // last byte of the call instruction + HandleLibBacktraceFrame, + HandleLibBacktraceError, + reinterpret_cast<void*>(Result[Order[i]])); + if (0 != status) { + break; + } + } + for (size_t i = 0; i < count; ++i) { + if (Result[i]) { + *out << Result[i] << "\n"; + } else { + *out << "File `" << frames[i].File << "` not found\n"; + } + } + } + } +}
\ No newline at end of file diff --git a/yql/essentials/utils/backtrace/ut/ya.make b/yql/essentials/utils/backtrace/ut/ya.make new file mode 100644 index 0000000000..c54c56275a --- /dev/null +++ b/yql/essentials/utils/backtrace/ut/ya.make @@ -0,0 +1,10 @@ +UNITTEST_FOR(yql/essentials/utils/backtrace) + + +IF (OS_LINUX AND ARCH_X86_64) + SRCS( + backtrace_ut.cpp + ) +ENDIF() + +END() diff --git a/yql/essentials/utils/backtrace/ya.make b/yql/essentials/utils/backtrace/ya.make new file mode 100644 index 0000000000..30c2dd1cc1 --- /dev/null +++ b/yql/essentials/utils/backtrace/ya.make @@ -0,0 +1,34 @@ +LIBRARY() + +SRCS( + backtrace.cpp + backtrace_lib.cpp + symbolize.cpp +) + +PEERDIR( + library/cpp/deprecated/atomic +) + + +IF (OS_LINUX AND ARCH_X86_64) + SRCS( + backtrace_linux.cpp + symbolizer_linux.cpp + ) + + PEERDIR( + contrib/libs/backtrace + contrib/libs/libunwind + ) + ADDINCL(contrib/libs/libunwind/include) + +ELSE() + SRCS( + backtrace_dummy.cpp + ) +ENDIF() + +END() + +RECURSE_FOR_TESTS(ut)
\ No newline at end of file diff --git a/yql/essentials/utils/cast.h b/yql/essentials/utils/cast.h new file mode 100644 index 0000000000..dac8508092 --- /dev/null +++ b/yql/essentials/utils/cast.h @@ -0,0 +1,15 @@ +#pragma once +#include "yql_panic.h" + +namespace NYql { + +template<class T, class F> +[[nodiscard]] +inline T EnsureDynamicCast(F from) { + YQL_ENSURE(from, "source should not be null"); + T result = dynamic_cast<T>(from); + YQL_ENSURE(result, "dynamic_cast failed"); + return result; +} + +} // namespace NYql diff --git a/yql/essentials/utils/debug_info.cpp b/yql/essentials/utils/debug_info.cpp new file mode 100644 index 0000000000..ff56166695 --- /dev/null +++ b/yql/essentials/utils/debug_info.cpp @@ -0,0 +1,50 @@ +#include "debug_info.h" + +#include <util/system/thread.h> +#include <util/system/tls.h> +#include <util/stream/file.h> +#include <util/generic/string.h> + +#include <string.h> + + +namespace NYql { + +static const size_t OPERATION_ID_MAX_LENGTH = 24; +static const size_t THREAD_NAME_MAX_LENGTH = 16; + + +struct TDebugInfo { + char OperationId[OPERATION_ID_MAX_LENGTH + 1]; +}; + +Y_POD_THREAD(TDebugInfo) TlsDebugInfo; + + +void SetCurrentOperationId(const char* operationId) { + size_t len = strlcpy( + (&TlsDebugInfo)->OperationId, + operationId, + OPERATION_ID_MAX_LENGTH); + + const char* threadName = nullptr; + if (len > THREAD_NAME_MAX_LENGTH) { + threadName = operationId + (len - THREAD_NAME_MAX_LENGTH + 1); + } else { + threadName = operationId; + } + TThread::SetCurrentThreadName(threadName); +} + +long GetRunnigThreadsCount() { + TString procStat = TFileInput("/proc/self/stat").ReadAll(); + long num_threads = -2; // Number of threads in this process (since Linux 2.6) + + int n = sscanf(procStat.data(), + "%*d %*s %*c %*d %*d %*d %*d %*d %*u %*u %*u %*u %*u %*u %*u %*d %*d %*d %*d %ld", + &num_threads); + + return n == 1 ? num_threads : -2; +} + +} // namespace NYql diff --git a/yql/essentials/utils/debug_info.h b/yql/essentials/utils/debug_info.h new file mode 100644 index 0000000000..3e2a55140b --- /dev/null +++ b/yql/essentials/utils/debug_info.h @@ -0,0 +1,10 @@ +#pragma once + + +namespace NYql { + +void SetCurrentOperationId(const char* operationId); + +long GetRunnigThreadsCount(); + +} // namespace NYql diff --git a/yql/essentials/utils/exceptions.cpp b/yql/essentials/utils/exceptions.cpp new file mode 100644 index 0000000000..8a6d09fc9d --- /dev/null +++ b/yql/essentials/utils/exceptions.cpp @@ -0,0 +1,37 @@ +#include "exceptions.h" + +#include <util/string/builder.h> + +namespace NYql { + +TCodeLineException::TCodeLineException(ui32 code) + : SourceLocation("", 0) + , Code(code) +{} + +TCodeLineException::TCodeLineException(const TSourceLocation& sl, const TCodeLineException& t) + : yexception(t) + , SourceLocation(sl) + , Code(t.Code) +{} + +const char* TCodeLineException::GetRawMessage() const { + return yexception::what(); +} + +const char* TCodeLineException::what() const noexcept { + try { + if (!Message) { + Message = TStringBuilder{} << SourceLocation << TStringBuf(": ") << yexception::what(); + } + return Message.c_str(); + } catch(...) { + return "Unexpected exception in TCodeLineException::what()"; + } +} + +TCodeLineException operator+(const TSourceLocation& sl, TCodeLineException&& t) { + return TCodeLineException(sl, t); +} + +} // namespace NFq
\ No newline at end of file diff --git a/yql/essentials/utils/exceptions.h b/yql/essentials/utils/exceptions.h new file mode 100644 index 0000000000..8df5307da2 --- /dev/null +++ b/yql/essentials/utils/exceptions.h @@ -0,0 +1,33 @@ +#pragma once + +#include <util/generic/yexception.h> + +namespace NYql { + +// This exception can separate code line and file name from the error message +struct TCodeLineException: public yexception { + + TSourceLocation SourceLocation; + mutable TString Message; + ui32 Code; + + TCodeLineException(ui32 code); + + TCodeLineException(const TSourceLocation& sl, const TCodeLineException& t); + + virtual const char* what() const noexcept override; + + const char* GetRawMessage() const; + +}; + +TCodeLineException operator+(const TSourceLocation& sl, TCodeLineException&& t); + +#define YQL_ENSURE_CODELINE(CONDITION, CODE, ...) \ + do { \ + if (Y_UNLIKELY(!(CONDITION))) { \ + ythrow TCodeLineException(CODE) << __VA_ARGS__; \ + } \ + } while (0) + +} // namespace NYql
\ No newline at end of file diff --git a/yql/essentials/utils/failure_injector/failure_injector.cpp b/yql/essentials/utils/failure_injector/failure_injector.cpp new file mode 100644 index 0000000000..4f4f9c1eb6 --- /dev/null +++ b/yql/essentials/utils/failure_injector/failure_injector.cpp @@ -0,0 +1,64 @@ +#include "failure_injector.h" + +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/utils/yql_panic.h> + +#include <util/generic/singleton.h> + +namespace NYql { + +void TFailureInjector::Activate() { + Singleton<TFailureInjector>()->ActivateImpl(); +} + +void TFailureInjector::Set(std::string_view name, ui64 skip, ui64 countOfFails) { + Singleton<TFailureInjector>()->SetImpl(name, skip, countOfFails); +} + +void TFailureInjector::Reach(std::string_view name, std::function<void()> action) { + Singleton<TFailureInjector>()->ReachImpl(name, action); +} + +void TFailureInjector::ActivateImpl() { + Enabled_.store(true); + YQL_LOG(DEBUG) << "TFailureInjector::Activate"; +} + +THashMap<TString, TFailureInjector::TFailureSpec> TFailureInjector::GetCurrentState() { + return Singleton<TFailureInjector>()->GetCurrentStateImpl(); +} + +THashMap<TString, TFailureInjector::TFailureSpec> TFailureInjector::GetCurrentStateImpl() { + THashMap<TString, TFailureInjector::TFailureSpec> copy; + with_lock(Lock) { + copy = FailureSpecs; + } + return copy; +} + +void TFailureInjector::ReachImpl(std::string_view name, std::function<void()> action) { + if (!Enabled_.load()) { + return; + } + with_lock(Lock) { + if (auto failureSpec = FailureSpecs.FindPtr(name)) { + YQL_LOG(DEBUG) << "TFailureInjector::Reach: " << name << ", Skip=" << failureSpec->Skip << ", Fails=" << failureSpec->CountOfFails; + if (failureSpec->Skip > 0) { + --failureSpec->Skip; + } else if (failureSpec->CountOfFails > 0) { + YQL_LOG(DEBUG) << "TFailureInjector::OnReach: " << name; + --failureSpec->CountOfFails; + action(); + } + } + } +} + +void TFailureInjector::SetImpl(std::string_view name, ui64 skip, ui64 countOfFails) { + with_lock(Lock) { + YQL_ENSURE(countOfFails > 0, "failure " << name << ", 'countOfFails' must be positive"); + FailureSpecs[TString{name}] = TFailureSpec{skip, countOfFails}; + } +} + +} // NYql diff --git a/yql/essentials/utils/failure_injector/failure_injector.h b/yql/essentials/utils/failure_injector/failure_injector.h new file mode 100644 index 0000000000..6db79fc8c0 --- /dev/null +++ b/yql/essentials/utils/failure_injector/failure_injector.h @@ -0,0 +1,36 @@ +#pragma once + +#include <util/generic/hash.h> +#include <util/generic/string.h> +#include <util/system/mutex.h> + +#include <string_view> + +namespace NYql { + +class TFailureInjector { +public: + struct TFailureSpec { + ui64 Skip; + ui64 CountOfFails; + }; + + static void Activate(); + + static void Set(std::string_view name, ui64 skip, ui64 countOfFails); + static void Reach(std::string_view name, std::function<void()> action); + static THashMap<TString, TFailureSpec> GetCurrentState(); + +private: + void ActivateImpl(); + + void SetImpl(std::string_view name, ui64 skip, ui64 countOfFails); + void ReachImpl(std::string_view name, std::function<void()> action); + THashMap<TString, TFailureSpec> GetCurrentStateImpl(); + + std::atomic<bool> Enabled_ = false; + THashMap<TString, TFailureSpec> FailureSpecs; + TMutex Lock; +}; + +} // NYql diff --git a/yql/essentials/utils/failure_injector/failure_injector_ut.cpp b/yql/essentials/utils/failure_injector/failure_injector_ut.cpp new file mode 100644 index 0000000000..4d94b5cce7 --- /dev/null +++ b/yql/essentials/utils/failure_injector/failure_injector_ut.cpp @@ -0,0 +1,87 @@ +#include "failure_injector.h" + +#include <yql/essentials/utils/log/log.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/datetime/base.h> + +#include <chrono> + +using namespace NYql; +using namespace NYql::NLog; +using namespace std::chrono; + +// do nothing +void OnReach(std::atomic<bool>& called) { + called.store(true); +} + +void SetUpLogger() { + TString logType = "cout"; + NLog::InitLogger(logType, false); + NLog::EComponentHelpers::ForEach([](NLog::EComponent component) { + NLog::YqlLogger().SetComponentLevel(component, ELevel::DEBUG); + }); +} + +Y_UNIT_TEST_SUITE(TFailureInjectorTests) { + Y_UNIT_TEST(BasicFailureTest) { + SetUpLogger(); + std::atomic<bool> called; + called.store(false); + auto behavior = [&called] { OnReach(called); }; + TFailureInjector::Reach("misc_failure", behavior); + UNIT_ASSERT_EQUAL(false, called.load()); + TFailureInjector::Activate(); + TFailureInjector::Set("misc_failure", 0, 1); + TFailureInjector::Reach("misc_failure", behavior); + UNIT_ASSERT_EQUAL(true, called.load()); + } + + Y_UNIT_TEST(CheckSkipTest) { + SetUpLogger(); + std::atomic<bool> called; + called.store(false); + auto behavior = [&called] { OnReach(called); }; + TFailureInjector::Activate(); + TFailureInjector::Set("misc_failure", 1, 1); + + TFailureInjector::Reach("misc_failure", behavior); + UNIT_ASSERT_EQUAL(false, called.load()); + TFailureInjector::Reach("misc_failure", behavior); + UNIT_ASSERT_EQUAL(true, called.load()); + } + + Y_UNIT_TEST(CheckFailCountTest) { + SetUpLogger(); + int called = 0; + auto behavior = [&called] { ++called; }; + TFailureInjector::Activate(); + TFailureInjector::Set("misc_failure", 1, 2); + + TFailureInjector::Reach("misc_failure", behavior); + UNIT_ASSERT_EQUAL(0, called); + TFailureInjector::Reach("misc_failure", behavior); + UNIT_ASSERT_EQUAL(1, called); + TFailureInjector::Reach("misc_failure", behavior); + UNIT_ASSERT_EQUAL(2, called); + TFailureInjector::Reach("misc_failure", behavior); + UNIT_ASSERT_EQUAL(2, called); + TFailureInjector::Reach("misc_failure", behavior); + UNIT_ASSERT_EQUAL(2, called); + } + + Y_UNIT_TEST(SlowDownTest) { + SetUpLogger(); + TFailureInjector::Activate(); + TFailureInjector::Set("misc_failure", 0, 1); + + auto start = system_clock::now(); + TFailureInjector::Reach("misc_failure", [] { ::Sleep(TDuration::Seconds(5)); }); + auto finish = system_clock::now(); + auto duration = duration_cast<std::chrono::seconds>(finish - start); + YQL_LOG(DEBUG) << "Duration :" << duration.count(); + UNIT_ASSERT_GE(duration.count(), 5); + } +} diff --git a/yql/essentials/utils/failure_injector/ut/ya.make b/yql/essentials/utils/failure_injector/ut/ya.make new file mode 100644 index 0000000000..579a466287 --- /dev/null +++ b/yql/essentials/utils/failure_injector/ut/ya.make @@ -0,0 +1,15 @@ +IF (OS_LINUX OR OS_DARWIN) + UNITTEST_FOR(yql/essentials/utils/failure_injector) + + SIZE(SMALL) + + SRCS( + failure_injector_ut.cpp + ) + + PEERDIR( + yql/essentials/utils/log + ) + + END() +ENDIF() diff --git a/yql/essentials/utils/failure_injector/ya.make b/yql/essentials/utils/failure_injector/ya.make new file mode 100644 index 0000000000..e10dfdaecb --- /dev/null +++ b/yql/essentials/utils/failure_injector/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + failure_injector.cpp +) + +PEERDIR( + yql/essentials/utils + yql/essentials/utils/log +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yql/essentials/utils/fetch/fetch.cpp b/yql/essentials/utils/fetch/fetch.cpp new file mode 100644 index 0000000000..d35baeb097 --- /dev/null +++ b/yql/essentials/utils/fetch/fetch.cpp @@ -0,0 +1,182 @@ +#include "fetch.h" + +#include <yql/essentials/utils/log/log.h> + +#include <library/cpp/openssl/io/stream.h> +#include <library/cpp/http/misc/httpcodes.h> +#include <library/cpp/charset/ci_string.h> + +#include <util/network/socket.h> +#include <util/string/cast.h> +#include <util/generic/strbuf.h> + +namespace NYql { + +namespace { + +THttpURL ParseURL(const TStringBuf addr, NUri::TParseFlags features) { + THttpURL url; + THttpURL::TParsedState parsedState = url.Parse(addr, features, nullptr, 65536); + if (THttpURL::ParsedOK != parsedState) { + ythrow yexception() << "Bad URL: \"" << addr << "\", " << HttpURLParsedStateToString(parsedState); + } + return url; +} + +class TFetchResultImpl: public IFetchResult { +public: + TFetchResultImpl(const THttpURL& url, const THttpHeaders& additionalHeaders, TDuration timeout) { + TString host = url.Get(THttpURL::FieldHost); + TString path = url.PrintS(THttpURL::FlagPath | THttpURL::FlagQuery); + const char* p = url.Get(THttpURL::FieldPort); + ui16 port = 80; + bool https = false; + + if (url.Get(THttpURL::FieldScheme) == TStringBuf("https")) { + port = 443; + https = true; + } + + if (p) { + port = FromString<ui16>(p); + } + + TString req; + { + TStringOutput rqs(req); + TStringBuf userAgent = "User-Agent: Mozilla/5.0 (compatible; YQL/1.0)"; + + IOutputStream::TPart request[] = { + IOutputStream::TPart("GET ", 4), + IOutputStream::TPart(path.data(), path.size()), + IOutputStream::TPart(" HTTP/1.1", 9), + IOutputStream::TPart::CrLf(), + IOutputStream::TPart("Host: ", 6), + IOutputStream::TPart(host.data(), host.size()), + IOutputStream::TPart::CrLf(), + IOutputStream::TPart(userAgent.data(), userAgent.size()), + IOutputStream::TPart::CrLf(), + }; + rqs.Write(request, Y_ARRAY_SIZE(request)); + if (!additionalHeaders.Empty()) { + additionalHeaders.OutTo(&rqs); + } + rqs << "\r\n"; + } + + Socket.Reset(new TSocket(TNetworkAddress(host, port), timeout)); + SocketInput.Reset(new TSocketInput(*Socket)); + SocketOutput.Reset(new TSocketOutput(*Socket)); + + Socket->SetSocketTimeout(timeout.Seconds(), timeout.MilliSeconds() % 1000); + + if (https) { + Ssl.Reset(new TOpenSslClientIO(SocketInput.Get(), SocketOutput.Get())); + } + + { + THttpOutput ho(Ssl ? (IOutputStream*)Ssl.Get() : (IOutputStream*)SocketOutput.Get()); + (ho << req).Finish(); + } + HttpInput.Reset(new THttpInput(Ssl ? (IInputStream*)Ssl.Get() : (IInputStream*)SocketInput.Get())); + } + + THttpInput& GetStream() override { + return *HttpInput; + } + + unsigned GetRetCode() override { + return ParseHttpRetCode(HttpInput->FirstLine()); + } + + THttpURL GetRedirectURL(const THttpURL& baseUrl) override { + for (auto i = HttpInput->Headers().Begin(); i != HttpInput->Headers().End(); ++i) { + if (0 == TCiString::compare(i->Name(), TStringBuf("location"))) { + THttpURL target = ParseURL(i->Value(), THttpURL::FeaturesAll | NUri::TFeature::FeatureConvertHostIDN); + if (!target.IsValidAbs()) { + target.Merge(baseUrl); + } + return target; + } + } + ythrow yexception() << "Unknown redirect location from " << baseUrl.PrintS(); + } + + static TFetchResultPtr Fetch(const THttpURL& url, const THttpHeaders& additionalHeaders, const TDuration& timeout) { + return new TFetchResultImpl(url, additionalHeaders, timeout); + } + +private: + THolder<TSocket> Socket; + THolder<TSocketInput> SocketInput; + THolder<TSocketOutput> SocketOutput; + THolder<TOpenSslClientIO> Ssl; + THolder<THttpInput> HttpInput; +}; + +inline bool IsRedirectCode(unsigned code) { + switch (code) { + case HTTP_MOVED_PERMANENTLY: + case HTTP_FOUND: + case HTTP_SEE_OTHER: + case HTTP_TEMPORARY_REDIRECT: + return true; + } + return false; +} + +inline bool IsRetryCode(unsigned code) { + switch (code) { + case HTTP_REQUEST_TIME_OUT: + case HTTP_AUTHENTICATION_TIMEOUT: + case HTTP_TOO_MANY_REQUESTS: + case HTTP_GATEWAY_TIME_OUT: + case HTTP_SERVICE_UNAVAILABLE: + return true; + } + return false; +} + +} // unnamed + +THttpURL ParseURL(const TStringBuf addr) { + return ParseURL(addr, THttpURL::FeaturesAll | NUri::TFeature::FeatureConvertHostIDN | NUri::TFeature::FeatureNoRelPath); +} + +TFetchResultPtr Fetch(const THttpURL& url, const THttpHeaders& additionalHeaders, const TDuration& timeout, size_t retries, size_t redirects) { + THttpURL currentUrl = url; + for (size_t fetchNum = 0; fetchNum < redirects; ++fetchNum) { + unsigned responseCode = 0; + TFetchResultPtr fr; + size_t fetchTry = 0; + do { + fr = TFetchResultImpl::Fetch(currentUrl, additionalHeaders, timeout); + responseCode = fr->GetRetCode(); + } while (IsRetryCode(responseCode) && ++fetchTry < retries); + + if (responseCode >= 200 && responseCode < 300) { + return fr; + } + + if (responseCode == HTTP_NOT_MODIFIED) { + return fr; + } + + if (IsRedirectCode(responseCode)) { + currentUrl = fr->GetRedirectURL(currentUrl); + YQL_LOG(INFO) << "Got redirect to " << currentUrl.PrintS(); + continue; + } + + TString errorBody; + try { + errorBody = fr->GetStream().ReadAll(); + } catch (...) { + } + + ythrow yexception() << "Failed to fetch url '" << currentUrl.PrintS() << "' with code " << responseCode << ", body: " << errorBody; + } + ythrow yexception() << "Failed to fetch url '" << currentUrl.PrintS() << "': too many redirects"; +} + +} // NYql diff --git a/yql/essentials/utils/fetch/fetch.h b/yql/essentials/utils/fetch/fetch.h new file mode 100644 index 0000000000..d9e1c3c1a5 --- /dev/null +++ b/yql/essentials/utils/fetch/fetch.h @@ -0,0 +1,24 @@ +#pragma once + +#include <library/cpp/uri/http_url.h> +#include <library/cpp/http/io/headers.h> +#include <library/cpp/http/io/stream.h> + +#include <util/datetime/base.h> +#include <util/generic/string.h> +#include <util/generic/ptr.h> + +namespace NYql { + +struct IFetchResult: public TThrRefBase { + virtual THttpInput& GetStream() = 0; + virtual unsigned GetRetCode() = 0; + virtual THttpURL GetRedirectURL(const THttpURL& baseUrl) = 0; +}; + +using TFetchResultPtr = TIntrusivePtr<IFetchResult>; + +THttpURL ParseURL(const TStringBuf addr); +TFetchResultPtr Fetch(const THttpURL& url, const THttpHeaders& additionalHeaders = {}, const TDuration& timeout = TDuration::Max(), size_t retries = 3, size_t redirects = 10); + +} // NYql diff --git a/yql/essentials/utils/fetch/ya.make b/yql/essentials/utils/fetch/ya.make new file mode 100644 index 0000000000..e95b4fde60 --- /dev/null +++ b/yql/essentials/utils/fetch/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + fetch.cpp +) + +PEERDIR( + library/cpp/charset + library/cpp/http/io + library/cpp/http/misc + library/cpp/openssl/io + library/cpp/uri + yql/essentials/utils/log +) + +END() diff --git a/yql/essentials/utils/fp_bits.h b/yql/essentials/utils/fp_bits.h new file mode 100644 index 0000000000..ffce6e161b --- /dev/null +++ b/yql/essentials/utils/fp_bits.h @@ -0,0 +1,122 @@ +#pragma once +#include <util/system/defaults.h> + +#include <cmath> +#include <limits> + +namespace NYql { +/* + +double + +visual c++ + +fff8000000000000 0.0/0.0 +7ff8000000000001 snan +7ff8000000000000 qnan + +gcc/clang + +7ff8000000000000 0.0/0.0 +7ff4000000000000 snan +7ff8000000000000 qnan + +float + +visual c++ + +ffc00000 0.0f/0.0f +7fc00001 snan +7fc00000 qnan + +gcc/clang + +7fc00000 0.0f/0.0f +7fa00000 snan +7fc00000 qnan + +*/ + +template <typename T> +struct TFpTraits { + static constexpr bool Supported = false; +}; + +template <> +struct TFpTraits<float> { + static constexpr bool Supported = std::numeric_limits<float>::is_iec559; + using TIntegral = ui32; + static constexpr TIntegral SignMask = (1u << 31); + static constexpr TIntegral Mantissa = 23; + static constexpr TIntegral Exp = 8; + static constexpr TIntegral MaxMantissa = (1u << Mantissa) - 1; + static constexpr TIntegral MaxExp = (1u << Exp) - 1; + static constexpr TIntegral QNan = 0x7fc00000u; +}; + +template <> +struct TFpTraits<double> { + static constexpr bool Supported = std::numeric_limits<double>::is_iec559; + using TIntegral = ui64; + static constexpr TIntegral SignMask = (1ull << 63); + static constexpr TIntegral Mantissa = 52; + static constexpr TIntegral Exp = 11; + static constexpr TIntegral MaxMantissa = (1ull << Mantissa) - 1; + static constexpr TIntegral MaxExp = (1ull << Exp) - 1; + static constexpr TIntegral QNan = 0x7ff8000000000000ull; +}; + +template <typename T, bool> +struct TCanonizeFpBitsImpl { +}; + +template <typename T> +struct TCanonizeFpBitsImpl<T, true> { + static void Do(void* buffer) { + using TTraits = TFpTraits<T>; + using TIntegral = typename TTraits::TIntegral; + const TIntegral value = *(TIntegral*)(buffer); + if (value == TTraits::SignMask) { + *(TIntegral*)buffer = 0; + return; + } + + const TIntegral exp = (value >> TTraits::Mantissa) & TTraits::MaxExp; + // inf or nan + if (exp == TTraits::MaxExp) { + if (value & TTraits::MaxMantissa) { + // quiet nan + *(TIntegral*)buffer = TTraits::QNan; + } + } + } +}; + +template <typename T> +struct TCanonizeFpBitsImpl<T, false> { + static void Do(void* buffer) { + using TNumTraits = std::numeric_limits<T>; + const T value = *(T*)buffer; + switch (std::fpclassify(value)) { + case FP_NAN: + static_assert(TNumTraits::has_quiet_NaN, "no QNAN"); + *(T*)buffer = TNumTraits::quiet_NaN(); + break; + case FP_ZERO: + *(T*)buffer = T(0); + break; + } + } +}; + +/* + Canonize floating point number bits. + Converts minus zero to zero and all NaNs to quiet NaN. + @param buffer[in/out] - aligned buffer with floating point number +*/ +template <typename T> +void CanonizeFpBits(void* buffer) { + return TCanonizeFpBitsImpl<T, TFpTraits<T>::Supported>::Do(buffer); +} + +} diff --git a/yql/essentials/utils/fp_bits_ut.cpp b/yql/essentials/utils/fp_bits_ut.cpp new file mode 100644 index 0000000000..d6d94b56f4 --- /dev/null +++ b/yql/essentials/utils/fp_bits_ut.cpp @@ -0,0 +1,105 @@ +#include "fp_bits.h" +#include <library/cpp/testing/unittest/registar.h> + +#include <util/system/valgrind.h> + +namespace NYql { + +namespace { +template <typename T> +void CanonizeFpBitsTest() { + enum EValues { + Zero, + MZero, + One, + Half, + Two, + PMin, + PMax, + DNorm, + PInf, + NInf, + QNan, + SNan, + INan, + Max + }; + + T values[EValues::Max]; + T newValues[EValues::Max]; + int valuesClass[EValues::Max]; + values[Zero] = T(0); + values[MZero] = -values[Zero]; + values[One] = T(1); + values[Half] = values[One] / 2; + values[Two] = values[One] * 2; + values[PMin] = std::numeric_limits<T>::min(); + values[DNorm] = values[PMin] / 2; + values[PMax] = std::numeric_limits<T>::max(); + values[PInf] = std::numeric_limits<T>::infinity(); + values[NInf] = -std::numeric_limits<T>::infinity(); + values[QNan] = std::numeric_limits<T>::quiet_NaN(); + values[SNan] = std::numeric_limits<T>::signaling_NaN(); + values[INan] = std::numeric_limits<T>::infinity() / std::numeric_limits<T>::infinity(); + + for (int v = Zero; v < Max; ++v) { + int cls; + if (v <= MZero) { + cls = FP_ZERO; + } else if (v < DNorm) { + cls = FP_NORMAL; + } else if (v == DNorm) { + cls = FP_SUBNORMAL; + } else if (v < QNan) { + cls = FP_INFINITE; + } else { + cls = FP_NAN; + } + + valuesClass[v] = cls; + } + + for (int v = Zero; v < Max; ++v) { + UNIT_ASSERT_VALUES_EQUAL(std::fpclassify(values[v]), valuesClass[v]); + } + + for (int v = Zero; v < Max; ++v) { + newValues[v] = values[v]; + CanonizeFpBits<T>(&newValues[v]); + } + + for (int v = Zero; v < Max; ++v) { + UNIT_ASSERT_VALUES_EQUAL(std::fpclassify(newValues[v]), valuesClass[v]); + } + + for (int v = Zero; v < Max; ++v) { + int originalV = v; + if (v == MZero) { + originalV = Zero; + } else if (v >= QNan) { + originalV = QNan; + } + + UNIT_ASSERT(std::memcmp((const void*)&newValues[v], (const void*)&values[originalV], std::min(size_t(10), sizeof(T))) == 0); + } +} +} + +Y_UNIT_TEST_SUITE(TFpBits) { + Y_UNIT_TEST(CanonizeFloat) { + CanonizeFpBitsTest<float>(); + } + + Y_UNIT_TEST(CanonizeDouble) { + CanonizeFpBitsTest<double>(); + } + + Y_UNIT_TEST(CanonizeLongDouble) { + if (NValgrind::ValgrindIsOn()) { + return; // TODO KIKIMR-3431 + } + CanonizeFpBitsTest<long double>(); + } +} + +} diff --git a/yql/essentials/utils/future_action.cpp b/yql/essentials/utils/future_action.cpp new file mode 100644 index 0000000000..26deb8de7e --- /dev/null +++ b/yql/essentials/utils/future_action.cpp @@ -0,0 +1 @@ +#include "future_action.h" diff --git a/yql/essentials/utils/future_action.h b/yql/essentials/utils/future_action.h new file mode 100644 index 0000000000..d4a8dd69aa --- /dev/null +++ b/yql/essentials/utils/future_action.h @@ -0,0 +1,59 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> + +namespace NYql { +/* +Let's call 'action' attached functor to future which should be called in the main thread. +This trick allows us to execute attached actions not in the future's execution thread but rather in the main thread +and preserve single-threaded logic. +*/ + +/* +Make ready future with constant action inside. +*/ +template <typename T> +NThreading::TFuture<std::function<T()>> MakeFutureWithConstantAction(const T& value) { + return NThreading::MakeFuture<std::function<T()>>([value]() { + return value; + }); +} + +template <typename T, typename A> +auto AddActionToFuture(NThreading::TFuture<T> f, const A& action) { + using V = decltype(action(std::declval<T>())); + + return f.Apply([action](NThreading::TFuture<T> f) { + std::function<V()> r = [f, action]() { + return action(f.GetValue()); + }; + + return r; + }); +} + +/* +Apply continuation with constant action +*/ +template <typename T, typename V> +NThreading::TFuture<std::function<V()>> AddConstantActionToFuture(NThreading::TFuture<T> f, const V& value) { + return AddActionToFuture(f, [value](const T&) { return value; }); +} + +/* +Transform action result by applying mapper +*/ +template <typename R, typename TMapper, typename ...Args> +auto MapFutureAction(NThreading::TFuture<std::function<R(Args&&...)>> f, const TMapper& mapper) { + using V = decltype(mapper(std::declval<R>())); + + return f.Apply([mapper](NThreading::TFuture<std::function<R(Args&&...)>> f) { + std::function<V(Args&&...)> r = [f, mapper](Args&& ...args) { + return mapper(f.GetValue()(std::forward<Args>(args)...)); + }; + + return r; + }); +} + +} diff --git a/yql/essentials/utils/hash.cpp b/yql/essentials/utils/hash.cpp new file mode 100644 index 0000000000..b0bc284d4f --- /dev/null +++ b/yql/essentials/utils/hash.cpp @@ -0,0 +1,21 @@ +#include "hash.h" +#include <util/system/getpid.h> +#include <util/system/env.h> + +namespace NYql { + +#ifndef NDEBUG +size_t VaryingHash(size_t src) { + struct TPid { + size_t Value; + + TPid() + : Value(GetEnv("YQL_MUTATE_HASHCODE") ? IntHash(GetPID()) : 0) + {} + }; + + return Singleton<TPid>()->Value ^ src; +} +#endif + +} diff --git a/yql/essentials/utils/hash.h b/yql/essentials/utils/hash.h new file mode 100644 index 0000000000..45cd95b777 --- /dev/null +++ b/yql/essentials/utils/hash.h @@ -0,0 +1,87 @@ +#pragma once +#include <unordered_set> +#include <unordered_map> +#include <util/generic/hash.h> +#include <util/generic/hash_multi_map.h> +#include <util/generic/hash_set.h> + +namespace NYql { + +#ifndef NDEBUG +size_t VaryingHash(size_t src); +#else +inline size_t VaryingHash(size_t src) { + return src; +} +#endif + +template <typename T, typename THasher = ::THash<T>> +struct TVaryingHash { + THasher Underlying; + + TVaryingHash() = default; + TVaryingHash(const TVaryingHash&) = default; + TVaryingHash(const THasher& underlying) + : Underlying(underlying) + {} + + TVaryingHash& operator=(const TVaryingHash& other) = default; + + size_t operator()(const T& elem) const { + return VaryingHash(Underlying(elem)); + } +}; + +template <class TKey, + class TValue, + class THasher = std::hash<TKey>, + class TEqual = std::equal_to<TKey>, + class TAlloc = std::allocator<std::pair<const TKey, TValue>>> +using TVaryingUnorderedMap = std::unordered_map<TKey, TValue, TVaryingHash<TKey, THasher>, TEqual, TAlloc>; + +template <class TKey, + class TValue, + class THasher = std::hash<TKey>, + class TEqual = std::equal_to<TKey>, + class TAlloc = std::allocator<std::pair<const TKey, TValue>>> +using TVaryingUnorderedMultiMap = std::unordered_multimap<TKey, TValue, TVaryingHash<TKey, THasher>, TEqual, TAlloc>; + +template <class TKey, + class THasher = std::hash<TKey>, + class TEqual = std::equal_to<TKey>, + class TAlloc = std::allocator<TKey>> +using TVaryingUnorderedSet = std::unordered_set<TKey, TVaryingHash<TKey, THasher>, TEqual, TAlloc>; + +template <class TKey, + class THasher = std::hash<TKey>, + class TEqual = std::equal_to<TKey>, + class TAlloc = std::allocator<TKey>> +using TVaryingUnorderedMultiSet = std::unordered_multiset<TKey, TVaryingHash<TKey, THasher>, TEqual, TAlloc>; + +template <class TKey, + class TValue, + class THasher = THash<TKey>, + class TEqual = TEqualTo<TKey>, + class TAlloc = std::allocator<std::pair<const TKey, TValue>>> +using TVaryingHashMap = THashMap<TKey, TValue, TVaryingHash<TKey, THasher>, TEqual, TAlloc>; + +template <class TKey, + class TValue, + class THasher = THash<TKey>, + class TEqual = TEqualTo<TKey>, + class TAlloc = std::allocator<std::pair<const TKey, TValue>>> +using TVaryingHashMultiMap = THashMultiMap<TKey, TValue, TVaryingHash<TKey, THasher>, TEqual, TAlloc>; + +template <class TKey, + class THasher = THash<TKey>, + class TEqual = TEqualTo<TKey>, + class TAlloc = std::allocator<TKey>> +using TVaryingHashSet = THashSet<TKey, TVaryingHash<TKey, THasher>, TEqual, TAlloc>; + +template <class TKey, + class THasher = THash<TKey>, + class TEqual = TEqualTo<TKey>, + class TAlloc = std::allocator<TKey>> +using TVaryingHashMultiSet = THashMultiSet<TKey, TVaryingHash<TKey, THasher>, TEqual, TAlloc>; + +} // namespace NYql diff --git a/yql/essentials/utils/limiting_allocator.cpp b/yql/essentials/utils/limiting_allocator.cpp new file mode 100644 index 0000000000..0ff84f9037 --- /dev/null +++ b/yql/essentials/utils/limiting_allocator.cpp @@ -0,0 +1,35 @@ +#include "limiting_allocator.h" + +#include <util/memory/pool.h> +#include <util/generic/yexception.h> + +namespace { +class TLimitingAllocator : public IAllocator { +public: + TLimitingAllocator(size_t limit, IAllocator* allocator) : Alloc_(allocator), Limit_(limit) {}; + TBlock Allocate(size_t len) override final { + if (Allocated_ + len > Limit_) { + throw std::runtime_error("Out of memory"); + } + Allocated_ += len; + return Alloc_->Allocate(len); + } + + void Release(const TBlock& block) override final { + Y_ENSURE(Allocated_ >= block.Len); + Allocated_ -= block.Len; + Alloc_->Release(block); + } + +private: + IAllocator* Alloc_; + size_t Allocated_ = 0; + const size_t Limit_; +}; +} + +namespace NYql { +std::unique_ptr<IAllocator> MakeLimitingAllocator(size_t limit, IAllocator* underlying) { + return std::make_unique<TLimitingAllocator>(limit, underlying); +} +} diff --git a/yql/essentials/utils/limiting_allocator.h b/yql/essentials/utils/limiting_allocator.h new file mode 100644 index 0000000000..7d94aa7f2a --- /dev/null +++ b/yql/essentials/utils/limiting_allocator.h @@ -0,0 +1,8 @@ +#pragma once + +#include <util/memory/pool.h> +#include <memory> + +namespace NYql { +std::unique_ptr<IAllocator> MakeLimitingAllocator(size_t limit, IAllocator* underlying); +} diff --git a/yql/essentials/utils/log/context.cpp b/yql/essentials/utils/log/context.cpp new file mode 100644 index 0000000000..c8ad460445 --- /dev/null +++ b/yql/essentials/utils/log/context.cpp @@ -0,0 +1,89 @@ +#include "context.h" +#include "log.h" + +#include <util/thread/singleton.h> + + +namespace NYql { +namespace NLog { +namespace { + +struct TThrowedLogContext { + TString LocationWithLogContext; // separated with ': ' +}; + +} // namspace + +void OutputLogCtx(IOutputStream* out, bool withBraces, bool skipSessionId) { + const NImpl::TLogContextListItem* ctxList = NImpl::GetLogContextList(); + + if (ctxList->HasNext()) { + if (withBraces) { + (*out) << '{'; + } + + // skip header stub element + NImpl::TLogContextListItem* ctxItem = ctxList->Next; + + bool isFirst = true; + while (ctxItem != ctxList) { + for (const TString& name: *ctxItem) { + if (!skipSessionId && !name.empty()) { + if (!isFirst) { + (*out) << '/'; + } + (*out) << name; + isFirst = false; + } + skipSessionId = false; + } + ctxItem = ctxItem->Next; + } + + if (withBraces) { + (*out) << TStringBuf("} "); + } + } +} + +NImpl::TLogContextListItem* NImpl::GetLogContextList() { + return FastTlsSingleton<NImpl::TLogContextListItem>(); +} + +std::pair<TString, TString> CurrentLogContextPath() { + TString sessionId; + const NImpl::TLogContextListItem* possibleRootLogCtx = NImpl::GetLogContextList()->Next; + if (auto rootLogCtx = dynamic_cast<const NImpl::TLogContextSessionItem*>(possibleRootLogCtx)) { + if (rootLogCtx->HasSessionId()) { + sessionId = (*rootLogCtx->begin()); + } + } + + TStringStream ss; + OutputLogCtx(&ss, false, !sessionId.empty()); + return std::make_pair(sessionId, ss.Str()); +} + +TString ThrowedLogContextPath() { + TThrowedLogContext* tlc = FastTlsSingleton<TThrowedLogContext>(); + return std::move(tlc->LocationWithLogContext); +} + + +TAutoPtr<TLogElement> TContextPreprocessor::Preprocess( + TAutoPtr<TLogElement> element) +{ + OutputLogCtx(element.Get(), true); + return element; +} + +void TYqlLogContextLocation::SetThrowedLogContextPath() const { + TStringStream ss; + ss << Location_ << TStringBuf(": "); + OutputLogCtx(&ss, true); + TThrowedLogContext* tlc = FastTlsSingleton<TThrowedLogContext>(); + tlc->LocationWithLogContext = ss.Str(); +} + +} // namespace NLog +} // namespace NYql diff --git a/yql/essentials/utils/log/context.h b/yql/essentials/utils/log/context.h new file mode 100644 index 0000000000..75f2e28364 --- /dev/null +++ b/yql/essentials/utils/log/context.h @@ -0,0 +1,259 @@ +#pragma once + +#include <util/system/defaults.h> +#include <util/generic/strbuf.h> +#include <util/stream/output.h> +#include <util/system/src_location.h> +#include <util/system/yassert.h> +#include <array> + +// continues existing contexts chain + +#define YQL_LOG_CTX_SCOPE(...) \ + auto Y_CAT(c, __LINE__) = ::NYql::NLog::MakeCtx(__VA_ARGS__); \ + Y_UNUSED(Y_CAT(c, __LINE__)) + +#define YQL_LOG_CTX_BLOCK(...) \ + if (auto Y_GENERATE_UNIQUE_ID(c) = ::NYql::NLog::MakeCtx(__VA_ARGS__)) { \ + goto Y_CAT(YQL_LOG_CTX_LABEL, __LINE__); \ + } else Y_CAT(YQL_LOG_CTX_LABEL, __LINE__): + + +// starts new contexts chain, after leaving current scope restores +// previous contexts chain + +#define YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId, ...) \ + auto Y_CAT(c, __LINE__) = ::NYql::NLog::MakeRootCtx(sessionId, ##__VA_ARGS__); \ + Y_UNUSED(Y_CAT(c, __LINE__)) + +#define YQL_LOG_CTX_ROOT_SCOPE(...) \ + auto Y_CAT(c, __LINE__) = ::NYql::NLog::MakeRootCtx("", __VA_ARGS__); \ + Y_UNUSED(Y_CAT(c, __LINE__)) + +#define YQL_LOG_CTX_ROOT_BLOCK(...) \ + if (auto Y_GENERATE_UNIQUE_ID(c) = ::NYql::NLog::MakeRootCtx(__VA_ARGS__)) { \ + goto Y_CAT(YQL_LOG_CTX_LABEL, __LINE__); \ + } else Y_CAT(YQL_LOG_CTX_LABEL, __LINE__): + + +// adds current contexts path to exception message before throwing it + +#define YQL_LOG_CTX_THROW throw ::NYql::NLog::TYqlLogContextLocation(__LOCATION__) + + +class TLogElement; + +namespace NYql { +namespace NLog { +namespace NImpl { + +/** + * @brief Represents item of logging context list. + */ +class TLogContextListItem { +public: + TLogContextListItem* Next; + TLogContextListItem* Prev; + size_t NamesCount; + + explicit TLogContextListItem(size_t namesCount = 0, size_t headerSize = 0) + : Next(this) + , Prev(this) + , NamesCount(namesCount) + , HeaderSize_(headerSize) + { + // initialize HeaderSize_ if child didn't + if (headerSize == 0) { + HeaderSize_ = sizeof(*this); + } + } + + virtual ~TLogContextListItem() { + } + + const TString* begin() const { + auto* ptr = reinterpret_cast<const ui8*>(this); + return reinterpret_cast<const TString*>(ptr + HeaderSize_); + } + + const TString* end() const { + return begin() + NamesCount; + } + + bool HasNext() const { + return Next != this; + } + + void LinkBefore(TLogContextListItem* item) { + Y_DEBUG_ABORT_UNLESS(!HasNext()); + Next = item; + Prev = item->Prev; + Prev->Next = this; + Next->Prev = this; + } + + void Unlink() { + if (!HasNext()) return; + + Prev->Next = Next; + Next->Prev = Prev; + Next = Prev = this; + } + +private: + // Additional memory before Names_ used in child class + size_t HeaderSize_; +}; + +/** + * @brief Returns pointer to thread local log context list. + */ +TLogContextListItem* GetLogContextList(); + +/** + * @brief Context element with stored SessionId. +*/ +class TLogContextSessionItem : public TLogContextListItem { +public: + TLogContextSessionItem(size_t size, bool hasSessionId_) + : TLogContextListItem(size, sizeof(*this)) { + HasSessionId_ = hasSessionId_; + } + + bool HasSessionId() const { + return HasSessionId_; + } + +private: + bool HasSessionId_; +}; + +} // namspace NImpl + +/** + * @brief YQL logger context element. Each element can contains several names. + */ +template <size_t Size> +class TLogContext: public NImpl::TLogContextListItem { +public: + template <typename... TArgs> + TLogContext(TArgs... args) + : TLogContextListItem(Size) + , Names_{{ TString{std::forward<TArgs>(args)}... }} + { + LinkBefore(NImpl::GetLogContextList()); + } + + ~TLogContext() { + Unlink(); + } + + explicit inline operator bool() const noexcept { + return true; + } + +private: + std::array<TString, Size> Names_; +}; + +/** + * @brief Special Root context elements which replaces previous log context + * list head by itself and restores previous one on destruction. + */ +template <size_t Size> +class TRootLogContext: public NImpl::TLogContextSessionItem { +public: + template <typename... TArgs> + TRootLogContext(const TString& sessionId, TArgs... args) + : TLogContextSessionItem(Size, !sessionId.empty()) + , Names_{{ sessionId, TString{std::forward<TArgs>(args)}... }} + { + NImpl::TLogContextListItem* ctxList = NImpl::GetLogContextList(); + PrevLogContextHead_.Prev = ctxList->Prev; + PrevLogContextHead_.Next = ctxList->Next; + ctxList->Next = ctxList->Prev = ctxList; + LinkBefore(ctxList); + } + + ~TRootLogContext() { + Unlink(); + NImpl::TLogContextListItem* ctxList = NImpl::GetLogContextList(); + ctxList->Prev = PrevLogContextHead_.Prev; + ctxList->Next = PrevLogContextHead_.Next; + } + + explicit inline operator bool() const noexcept { + return true; + } + +private: + std::array<TString, Size> Names_; + NImpl::TLogContextListItem PrevLogContextHead_; +}; + +/** + * @brief Helper function to construct TLogContext from variable + * arguments list. + */ +template <typename... TArgs> +inline auto MakeCtx(TArgs&&... args) -> TLogContext<sizeof...(args)> { + return TLogContext<sizeof...(args)>(std::forward<TArgs>(args)...); +} + +template <typename... TArgs> +inline auto MakeRootCtx(const TString& sessionId, TArgs&&... args) -> TRootLogContext<sizeof...(args) + 1> { + return TRootLogContext<sizeof...(args) + 1>(sessionId, std::forward<TArgs>(args)...); +} + +inline auto MakeRootCtx(const std::pair<TString, TString>& ctx) -> TRootLogContext<2> { + return TRootLogContext<2>(ctx.first, ctx.second); +} + +/** + * @brief Returns pair with sessionId and + * current logger contexts path as string. Each element + * is separated with '/'. + */ +std::pair<TString, TString> CurrentLogContextPath(); + +/** + * @brief If last throwing exception was performed with YQL_LOG_CTX_THROW + * macro this function returns location and context of that throw point. + */ +TString ThrowedLogContextPath(); + +/** + * @brief Adds context preffix before logging message. + */ +struct TContextPreprocessor { + static TAutoPtr<TLogElement> Preprocess(TAutoPtr<TLogElement> element); +}; + +/** + * @brief Outputs current logger context into stream + */ +void OutputLogCtx(IOutputStream* out, bool withBraces, bool skipSessionId = false); + +/** + * @brief Outputs current logger context into exception message. + */ +class TYqlLogContextLocation { +public: + TYqlLogContextLocation(const TSourceLocation& location) + : Location_(location.File, location.Line) + { + } + + void SetThrowedLogContextPath() const; + + template <class T> + inline T&& operator+(T&& t) { + SetThrowedLogContextPath(); + return std::forward<T>(t); + } + +private: + TSourceLocation Location_; +}; + +} // namespace NLog +} // namespace NYql diff --git a/yql/essentials/utils/log/log.cpp b/yql/essentials/utils/log/log.cpp new file mode 100644 index 0000000000..4a20ae504c --- /dev/null +++ b/yql/essentials/utils/log/log.cpp @@ -0,0 +1,368 @@ +#include "log.h" + +#include <yql/essentials/utils/log/proto/logger_config.pb.h> + +#include <library/cpp/logger/stream.h> +#include <library/cpp/logger/system.h> +#include <library/cpp/logger/composite.h> +#include <util/datetime/systime.h> +#include <util/generic/strbuf.h> +#include <util/stream/format.h> +#include <util/system/getpid.h> +#include <util/system/mutex.h> +#include <util/system/progname.h> +#include <util/system/thread.i> + +#include <stdio.h> +#include <time.h> + +static TMutex g_InitLoggerMutex; +static int g_LoggerInitialized = 0; + +namespace { + +class TLimitedLogBackend final : public TLogBackend { +public: + TLimitedLogBackend(TAutoPtr<TLogBackend> b, TAtomic& flag, ui64 limit) noexcept + : Backend(b) + , Flag(flag) + , Limit(limit) + { + } + + ~TLimitedLogBackend() final { + } + + void ReopenLog() final { + Backend->ReopenLog(); + } + + void WriteData(const TLogRecord& rec) final { + const auto remaining = AtomicGet(Limit); + const bool final = remaining > 0 && AtomicSub(Limit, rec.Len) <= 0; + if (remaining > 0 || rec.Priority <= TLOG_WARNING) { + Backend->WriteData(rec); + } + if (final) { + AtomicSet(Flag, 1); + } + } + +private: + THolder<TLogBackend> Backend; + TAtomic& Flag; + TAtomic Limit; +}; + +// Conversions between NYql::NProto::TLoggingConfig enums and NYql::NLog enums + +NYql::NLog::ELevel ConvertLevel(NYql::NProto::TLoggingConfig::ELevel level) { + using namespace NYql::NProto; + using namespace NYql::NLog; + switch (level) { + case TLoggingConfig::FATAL: return ELevel::FATAL; + case TLoggingConfig::ERROR: return ELevel::ERROR; + case TLoggingConfig::WARN: return ELevel::WARN; + case TLoggingConfig::INFO: return ELevel::INFO; + case TLoggingConfig::DEBUG: return ELevel::DEBUG; + case TLoggingConfig::TRACE: return ELevel::TRACE; + } + + ythrow yexception() << "unknown log level: " + << TLoggingConfig::ELevel_Name(level); +} + +NYql::NLog::EComponent ConvertComponent(NYql::NProto::TLoggingConfig::EComponent c) { + using namespace NYql::NProto; + using namespace NYql::NLog; + switch (c) { + case TLoggingConfig::DEFAULT: return EComponent::Default; + case TLoggingConfig::CORE: return EComponent::Core; + case TLoggingConfig::CORE_EVAL: return EComponent::CoreEval; + case TLoggingConfig::CORE_PEEPHOLE: return EComponent::CorePeepHole; + case TLoggingConfig::CORE_EXECUTION: return EComponent::CoreExecution; + case TLoggingConfig::SQL: return EComponent::Sql; + case TLoggingConfig::PROVIDER_COMMON: return EComponent::ProviderCommon; + case TLoggingConfig::PROVIDER_CONFIG: return EComponent::ProviderConfig; + case TLoggingConfig::PROVIDER_RESULT: return EComponent::ProviderResult; + case TLoggingConfig::PROVIDER_YT: return EComponent::ProviderYt; + case TLoggingConfig::PROVIDER_KIKIMR: return EComponent::ProviderKikimr; + case TLoggingConfig::PROVIDER_KQP: return EComponent::ProviderKqp; + case TLoggingConfig::PROVIDER_RTMR: return EComponent::ProviderRtmr; + case TLoggingConfig::PERFORMANCE: return EComponent::Perf; + case TLoggingConfig::NET: return EComponent::Net; + case TLoggingConfig::PROVIDER_STAT: return EComponent::ProviderStat; + case TLoggingConfig::PROVIDER_SOLOMON: return EComponent::ProviderSolomon; + case TLoggingConfig::PROVIDER_DQ: return EComponent::ProviderDq; + case TLoggingConfig::PROVIDER_CLICKHOUSE: return EComponent::ProviderClickHouse; + case TLoggingConfig::PROVIDER_YDB: return EComponent::ProviderYdb; + case TLoggingConfig::PROVIDER_PQ: return EComponent::ProviderPq; + case TLoggingConfig::PROVIDER_S3: return EComponent::ProviderS3; + case TLoggingConfig::CORE_DQ: return EComponent::CoreDq; + case TLoggingConfig::HTTP_GATEWAY: return EComponent::HttpGateway; + case TLoggingConfig::PROVIDER_GENERIC: return EComponent::ProviderGeneric; + case TLoggingConfig::PROVIDER_PG: return EComponent::ProviderPg; + } + + ythrow yexception() << "unknown log component: " + << TLoggingConfig::EComponent_Name(c); +} + +TString ConvertDestinationType(NYql::NProto::TLoggingConfig::ELogTo c) { + switch (c) { + case NYql::NProto::TLoggingConfig::STDOUT: return "cout"; + case NYql::NProto::TLoggingConfig::STDERR: return "cerr"; + case NYql::NProto::TLoggingConfig::CONSOLE: return "console"; + default : { + ythrow yexception() << "unsupported ELogTo destination in Convert"; + } + } + + ythrow yexception() << "unknown ELogTo destination"; +} + +NYql::NProto::TLoggingConfig::TLogDestination CreateLogDestination(const TString& c) { + NYql::NProto::TLoggingConfig::TLogDestination destination; + if (c == "cout") { + destination.SetType(NYql::NProto::TLoggingConfig::STDOUT); + } else if (c == "cerr") { + destination.SetType(NYql::NProto::TLoggingConfig::STDERR); + } else if (c == "console") { + destination.SetType(NYql::NProto::TLoggingConfig::CONSOLE); + } else { + destination.SetType(NYql::NProto::TLoggingConfig::FILE); + destination.SetTarget(c); + } + return destination; +} + +} // namspace + +namespace NYql { +namespace NLog { + +void WriteLocalTime(IOutputStream* out) { + struct timeval now; + gettimeofday(&now, nullptr); + + struct tm tm; + time_t seconds = static_cast<time_t>(now.tv_sec); + localtime_r(&seconds, &tm); + + char buf[sizeof("2016-01-02 03:04:05.006")]; + int n = strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S.", &tm); + snprintf(buf + n, sizeof(buf) - n, "%03" PRIu32, static_cast<ui32>(now.tv_usec) / 1000); + + out->Write(buf, sizeof(buf) - 1); +} + +/** + * TYqlLogElement + * automaticaly adds new line char + */ +class TYqlLogElement: public TLogElement { +public: + TYqlLogElement(const TLog* parent, ELevel level) + : TLogElement(parent, ELevelHelpers::ToLogPriority(level)) + { + } + + ~TYqlLogElement() { + *this << '\n'; + } +}; + +TYqlLog::TYqlLog() + : TLog() + , ProcName_() + , ProcId_() + , WriteTruncMsg_(0) {} + +TYqlLog::TYqlLog(const TString& logType, const TComponentLevels& levels) + : TLog(logType) + , ProcName_(GetProgramName()) + , ProcId_(GetPID()) + , WriteTruncMsg_(0) +{ + for (size_t component = 0; component < levels.size(); ++component) { + SetComponentLevel(EComponentHelpers::FromInt(component), levels[component]); + } +} + +TYqlLog::TYqlLog(TAutoPtr<TLogBackend> backend, const TComponentLevels& levels) + : TLog(backend) + , ProcName_(GetProgramName()) + , ProcId_(GetPID()) + , WriteTruncMsg_(0) +{ + for (size_t component = 0; component < levels.size(); ++component) { + SetComponentLevel(EComponentHelpers::FromInt(component), levels[component]); + } +} + +void TYqlLog::UpdateProcInfo(const TString& procName) { + ProcName_ = procName; + ProcId_ = GetPID(); +} + +TAutoPtr<TLogElement> TYqlLog::CreateLogElement( + EComponent component, ELevel level, + TStringBuf file, int line) const +{ + const bool writeMsg = AtomicCas(&WriteTruncMsg_, 0, 1); + auto element = MakeHolder<TYqlLogElement>(this, writeMsg ? ELevel::FATAL : level); + if (writeMsg) { + WriteLogPrefix(element.Get(), EComponent::Default, ELevel::FATAL, __FILE__, __LINE__); + *element << "Log is truncated by limit\n"; + *element << ELevelHelpers::ToLogPriority(level); + } + + WriteLogPrefix(element.Get(), component, level, file, line); + return element.Release(); +} + +void TYqlLog::WriteLogPrefix(IOutputStream* out, EComponent component, ELevel level, TStringBuf file, int line) const { + // LOG FORMAT: + // {datetime} {level} {procname}(pid={pid}, tid={tid}) [{component}] {source_location}: {message}\n + // + + WriteLocalTime(out); + *out << ' ' + << ELevelHelpers::ToString(level) << ' ' + << ProcName_ << TStringBuf("(pid=") << ProcId_ + << TStringBuf(", tid=") +#ifdef _unix_ + << Hex(SystemCurrentThreadIdImpl()) +#else + << SystemCurrentThreadIdImpl() +#endif + << TStringBuf(") [") << EComponentHelpers::ToString(component) + << TStringBuf("] ") + << file.RAfter(LOCSLASH_C) << ':' << line << TStringBuf(": "); +} + +void TYqlLog::SetMaxLogLimit(ui64 limit) { + auto backend = TLog::ReleaseBackend(); + TLog::ResetBackend(THolder(new TLimitedLogBackend(backend, WriteTruncMsg_, limit))); +} + +void InitLogger(const TString& logType, bool startAsDaemon) { + NProto::TLoggingConfig config; + *config.AddLogDest() = CreateLogDestination(logType); + + InitLogger(config, startAsDaemon); +} + +void InitLogger(const NProto::TLoggingConfig& config, bool startAsDaemon) { + with_lock(g_InitLoggerMutex) { + ++g_LoggerInitialized; + if (g_LoggerInitialized > 1) { + return; + } + + TComponentLevels levels; + if (config.HasAllComponentsLevel()) { + levels.fill(ConvertLevel(config.GetAllComponentsLevel())); + } else { + levels.fill(ELevel::INFO); + } + + for (const auto& cmpLevel: config.GetLevels()) { + auto component = ConvertComponent(cmpLevel.GetC()); + auto level = ConvertLevel(cmpLevel.GetL()); + levels[EComponentHelpers::ToInt(component)] = level; + } + TLoggerOperator<TYqlLog>::Set(new TYqlLog("null", levels)); + + std::vector<THolder<TLogBackend>> backends; + + // Set stderr log destination if none was described in config + if (config.LogDestSize() == 0) { + backends.emplace_back(CreateLogBackend("cerr", LOG_MAX_PRIORITY, false)); + } + + for (const auto& logDest : config.GetLogDest()) { + // Generate the backend we need and temporary store it + switch (logDest.GetType()) { + case NProto::TLoggingConfig::STDERR: + case NProto::TLoggingConfig::STDOUT: + case NProto::TLoggingConfig::CONSOLE: { + if (!startAsDaemon) { + backends.emplace_back(CreateLogBackend(ConvertDestinationType(logDest.GetType()), LOG_MAX_PRIORITY, false)); + } + break; + } + case NProto::TLoggingConfig::FILE: { + backends.emplace_back(CreateLogBackend(logDest.GetTarget(), LOG_MAX_PRIORITY, false)); + break; + } + case NProto::TLoggingConfig::SYSLOG: { + backends.emplace_back(MakeHolder<TSysLogBackend>(GetProgramName().data(), TSysLogBackend::TSYSLOG_LOCAL1)); + break; + } + default: { + break; + } + } + } + + // Combine created backends and set them for logger + auto& logger = TLoggerOperator<TYqlLog>::Log(); + if (backends.size() == 1) { + logger.ResetBackend(std::move(backends[0])); + } else if (backends.size() > 1) { + THolder<TCompositeLogBackend> compositeBackend = MakeHolder<TCompositeLogBackend>(); + for (auto& backend : backends) { + compositeBackend->AddLogBackend(std::move(backend)); + } + logger.ResetBackend(std::move(compositeBackend)); + } + } +} + +void InitLogger(TAutoPtr<TLogBackend> backend) { + with_lock(g_InitLoggerMutex) { + ++g_LoggerInitialized; + if (g_LoggerInitialized > 1) { + return; + } + + TComponentLevels levels; + levels.fill(ELevel::INFO); + TLoggerOperator<TYqlLog>::Set(new TYqlLog(backend, levels)); + } +} + +void InitLogger(IOutputStream* out) { + InitLogger(new TStreamLogBackend(out)); +} + +void CleanupLogger() { + with_lock(g_InitLoggerMutex) { + --g_LoggerInitialized; + if (g_LoggerInitialized > 0) { + return; + } + + TLoggerOperator<TYqlLog>::Set(new TYqlLog()); + } +} + +void ReopenLog() { + with_lock(g_InitLoggerMutex) { + TLoggerOperator<TYqlLog>::Log().ReopenLog(); + } +} + +} // namespace NLog +} // namespace NYql + +/** + * creates default YQL logger writing to /dev/null + */ +template <> +NYql::NLog::TYqlLog* CreateDefaultLogger<NYql::NLog::TYqlLog>() { + NYql::NLog::TComponentLevels levels; + levels.fill(NYql::NLog::ELevel::INFO); + return new NYql::NLog::TYqlLog("null", levels); +} diff --git a/yql/essentials/utils/log/log.h b/yql/essentials/utils/log/log.h new file mode 100644 index 0000000000..aecf8e39dc --- /dev/null +++ b/yql/essentials/utils/log/log.h @@ -0,0 +1,195 @@ +#pragma once + +#include "log_component.h" +#include "log_level.h" +#include "context.h" +#include "profile.h" + +#include <library/cpp/logger/global/common.h> + +#include <library/cpp/deprecated/atomic/atomic.h> +#include <util/stream/output.h> +#include <util/generic/strbuf.h> + +#include <array> + + +#define YQL_LOG_IMPL(logger, component, level, preprocessor, file, line) \ + logger.NeedToLog(component, level) && NPrivateGlobalLogger::TEatStream() | \ + (*preprocessor::Preprocess(logger.CreateLogElement(component, level, file, line))) + +#define YQL_LOG_IF_IMPL(logger, component, level, preprocessor, condition, file, line) \ + logger.NeedToLog(component, level) && (condition) && NPrivateGlobalLogger::TEatStream() | \ + (*preprocessor::Preprocess(logger.CreateLogElement(component, level, file, line))) + +// with component logger + +#define YQL_CLOG_PREP(level, component, preprocessor) YQL_LOG_IMPL(\ + ::NYql::NLog::YqlLogger(), \ + ::NYql::NLog::EComponent::component, \ + ::NYql::NLog::ELevel::level, \ + preprocessor, \ + __FILE__, __LINE__) + +#define YQL_CLOG(level, component) \ + YQL_CLOG_PREP(level, component, ::NYql::NLog::TContextPreprocessor) + +#define YQL_CLOG_ACTIVE(level, component) ::NYql::NLog::YqlLogger().NeedToLog( \ + ::NYql::NLog::EComponent::component, \ + ::NYql::NLog::ELevel::level) + +// with component/level values logger + +#define YQL_CVLOG_PREP(level, component, preprocessor) YQL_LOG_IMPL(\ + ::NYql::NLog::YqlLogger(), \ + component, \ + level, \ + preprocessor, \ + __FILE__, __LINE__) + +#define YQL_CVLOG(level, component) \ + YQL_CVLOG_PREP(level, component, ::NYql::NLog::TContextPreprocessor) + +#define YQL_CVLOG_ACTIVE(level, component) ::NYql::NLog::YqlLogger().NeedToLog( \ + component, \ + level) + +// default logger + +#define YQL_LOG_PREP(level, preprocessor) \ + YQL_CLOG_PREP(level, Default, preprocessor) + +#define YQL_LOG(level) \ + YQL_LOG_PREP(level, ::NYql::NLog::TContextPreprocessor) + +#define YQL_LOG_ACTIVE(level) YQL_CLOG_ACTIVE(level, Default) + +// conditional logger + +#define YQL_CLOG_PREP_IF(level, component, preprocessor, condition) YQL_LOG_IF_IMPL(\ + ::NYql::NLog::YqlLogger(), \ + ::NYql::NLog::EComponent::component, \ + ::NYql::NLog::ELevel::level, \ + preprocessor, \ + condition, \ + __FILE__, __LINE__) + +#define YQL_CLOG_IF(level, component, condition) \ + YQL_CLOG_PREP_IF(level, component, ::NYql::NLog::TContextPreprocessor, condition) + +#define YQL_LOG_PREP_IF(level, preprocessor, condition) \ + YQL_CLOG_PREP_IF(level, Default, preprocessor, condition) + +#define YQL_LOG_IF(level, condition) \ + YQL_LOG_PREP_IF(level, ::NYql::NLog::TContextPreprocessor, condition) + + +namespace NYql { + +namespace NProto { + class TLoggingConfig; +} // NProto + +namespace NLog { + +using TComponentLevels = + std::array<ELevel, EComponentHelpers::ToInt(EComponent::MaxValue)>; + +void WriteLocalTime(IOutputStream* out); + +/** + * @brief Component based logger frontend. + */ +class TYqlLog: public TLog { +public: + TYqlLog(); + TYqlLog(const TString& logType, const TComponentLevels& levels); + TYqlLog(TAutoPtr<TLogBackend> backend, const TComponentLevels& levels); + + // XXX: not thread-safe + void UpdateProcInfo(const TString& procName); + + ELevel GetComponentLevel(EComponent component) const { + return ELevelHelpers::FromInt(AtomicGet(ComponentLevels_[EComponentHelpers::ToInt(component)])); + } + + void SetComponentLevel(EComponent component, ELevel level) { + AtomicSet(ComponentLevels_[EComponentHelpers::ToInt(component)], ELevelHelpers::ToInt(level)); + } + + bool NeedToLog(EComponent component, ELevel level) const { + return ELevelHelpers::Lte(level, GetComponentLevel(component)); + } + + void SetMaxLogLimit(ui64 limit); + + TAutoPtr<TLogElement> CreateLogElement(EComponent component, ELevel level, TStringBuf file, int line) const; + + void WriteLogPrefix(IOutputStream* out, EComponent component, ELevel level, TStringBuf file, int line) const; + +private: + TString ProcName_; + pid_t ProcId_; + std::array<TAtomic, EComponentHelpers::ToInt(EComponent::MaxValue)> ComponentLevels_{0}; + mutable TAtomic WriteTruncMsg_; +}; + +/** + * @brief returns reference to YQL logger instance. + */ +inline TYqlLog& YqlLogger() { + return static_cast<TYqlLog&>(TLoggerOperator<TYqlLog>::Log()); +} + +/** + * @brief returns true it YQL logger already initialized. + */ +inline bool IsYqlLoggerInitialized() { + return TLoggerOperator<TYqlLog>::Usage(); +} + +/** + * @brief Initialize logger with selected backend type. + * + * @param log - one of { syslog, console, cout, cerr, null, /path/to/file } + * @param startAsDaemon - true if process is demonized + */ +void InitLogger(const TString& log, bool startAsDaemon = false); + +/** + * @brief Initialize logger with backends described in config. +*/ +void InitLogger(const NProto::TLoggingConfig& loggingConfig, bool startAsDaemon = false); + +/** + * @brief Initialize logger with concrete backend. + * + * @param backend - logger backend + */ +void InitLogger(TAutoPtr<TLogBackend> backend); + +/** + * @brief Initialize logger with concrete output stream. + * + * @param out - output stream + */ +void InitLogger(IOutputStream* out); + +void CleanupLogger(); + +void ReopenLog(); + +class YqlLoggerScope { +public: + YqlLoggerScope(const TString& log, bool startAsDaemon = false) { InitLogger(log, startAsDaemon); } + YqlLoggerScope(TAutoPtr<TLogBackend> backend) { InitLogger(backend); } + YqlLoggerScope(IOutputStream* out) { InitLogger(out); } + + ~YqlLoggerScope() { CleanupLogger(); } +}; + +} // namespace NLog +} // namespace NYql + +template <> +NYql::NLog::TYqlLog* CreateDefaultLogger<NYql::NLog::TYqlLog>(); diff --git a/yql/essentials/utils/log/log_component.h b/yql/essentials/utils/log/log_component.h new file mode 100644 index 0000000000..46e366cbd8 --- /dev/null +++ b/yql/essentials/utils/log/log_component.h @@ -0,0 +1,130 @@ +#pragma once + +#include <util/generic/strbuf.h> +#include <util/generic/yexception.h> + + +namespace NYql { +namespace NLog { + +// keep this enum in sync with simmilar enum from ydb/library/yql/utils/log/proto/logger_config.proto +enum class EComponent { + Default = 0, + Core, + CoreExecution, + Sql, + ProviderCommon, + ProviderConfig, + ProviderResult, + ProviderYt, + ProviderKikimr, + ProviderKqp, + ProviderRtmr, + Performance, Perf = Performance, + Net, + ProviderStat, + ProviderSolomon, + CoreEval, + CorePeepHole, + ProviderDq, + ProviderClickHouse, + ProviderYdb, + ProviderPq, + ProviderS3, + CoreDq, + HttpGateway, + ProviderGeneric, + ProviderPg, + // <--- put other log components here + MaxValue +}; + +struct EComponentHelpers { + static constexpr int ToInt(EComponent component) { + return static_cast<int>(component); + } + + static constexpr EComponent FromInt(int component) { + return (component >= ToInt(EComponent::Default) && + component < ToInt(EComponent::MaxValue)) + ? static_cast<EComponent>(component) + : EComponent::Default; + } + + static TStringBuf ToString(EComponent component) { + switch (component) { + case EComponent::Default: return TStringBuf("default"); + case EComponent::Core: return TStringBuf("core"); + case EComponent::CoreEval: return TStringBuf("core eval"); + case EComponent::CorePeepHole: return TStringBuf("core peephole"); + case EComponent::CoreExecution: return TStringBuf("core exec"); + case EComponent::Sql: return TStringBuf("sql"); + case EComponent::ProviderCommon: return TStringBuf("common provider"); + case EComponent::ProviderConfig: return TStringBuf("CONFIG"); + case EComponent::ProviderResult: return TStringBuf("RESULT"); + case EComponent::ProviderYt: return TStringBuf("YT"); + case EComponent::ProviderKikimr: return TStringBuf("KIKIMR"); + case EComponent::ProviderKqp: return TStringBuf("KQP"); + case EComponent::ProviderRtmr: return TStringBuf("RTMR"); + case EComponent::Performance: return TStringBuf("perf"); + case EComponent::Net: return TStringBuf("net"); + case EComponent::ProviderStat: return TStringBuf("STATFACE"); + case EComponent::ProviderSolomon: return TStringBuf("SOLOMON"); + case EComponent::ProviderDq: return TStringBuf("DQ"); + case EComponent::ProviderClickHouse: return TStringBuf("CLICKHOUSE"); + case EComponent::ProviderYdb: return TStringBuf("YDB"); + case EComponent::ProviderPq: return TStringBuf("PQ"); + case EComponent::ProviderS3: return TStringBuf("S3"); + case EComponent::CoreDq: return TStringBuf("core dq"); + case EComponent::HttpGateway: return TStringBuf("http gw"); + case EComponent::ProviderGeneric: return TStringBuf("generic"); + case EComponent::ProviderPg: return TStringBuf("PG"); + default: + ythrow yexception() << "invalid log component value: " + << ToInt(component); + } + } + + static EComponent FromString(TStringBuf str) { + if (str == TStringBuf("default")) return EComponent::Default; + if (str == TStringBuf("core")) return EComponent::Core; + if (str == TStringBuf("core eval")) return EComponent::CoreEval; + if (str == TStringBuf("core peephole")) return EComponent::CorePeepHole; + if (str == TStringBuf("core exec")) return EComponent::CoreExecution; + if (str == TStringBuf("sql")) return EComponent::Sql; + if (str == TStringBuf("common provider")) return EComponent::ProviderCommon; + if (str == TStringBuf("CONFIG")) return EComponent::ProviderConfig; + if (str == TStringBuf("RESULT")) return EComponent::ProviderResult; + if (str == TStringBuf("YT")) return EComponent::ProviderYt; + if (str == TStringBuf("KIKIMR")) return EComponent::ProviderKikimr; + if (str == TStringBuf("KQP")) return EComponent::ProviderKqp; + if (str == TStringBuf("RTMR")) return EComponent::ProviderRtmr; + if (str == TStringBuf("perf")) return EComponent::Performance; + if (str == TStringBuf("net")) return EComponent::Net; + if (str == TStringBuf("STATFACE")) return EComponent::ProviderStat; + if (str == TStringBuf("SOLOMON")) return EComponent::ProviderSolomon; + if (str == TStringBuf("DQ")) return EComponent::ProviderDq; + if (str == TStringBuf("CLICKHOUSE")) return EComponent::ProviderClickHouse; + if (str == TStringBuf("YDB")) return EComponent::ProviderYdb; + if (str == TStringBuf("PQ")) return EComponent::ProviderPq; + if (str == TStringBuf("S3")) return EComponent::ProviderS3; + if (str == TStringBuf("core dq")) return EComponent::CoreDq; + if (str == TStringBuf("http gw")) return EComponent::HttpGateway; + if (str == TStringBuf("generic")) return EComponent::ProviderGeneric; + if (str == TStringBuf("PG")) return EComponent::ProviderPg; + ythrow yexception() << "unknown log component: '" << str << '\''; + } + + template <typename TFunctor> + static void ForEach(TFunctor&& f) { + static const int minValue = ToInt(EComponent::Default); + static const int maxValue = ToInt(EComponent::MaxValue); + + for (int c = minValue; c < maxValue; c++) { + f(FromInt(c)); + } + } +}; + +} // namespace NLog +} // namespace NYql diff --git a/yql/essentials/utils/log/log_level.h b/yql/essentials/utils/log/log_level.h new file mode 100644 index 0000000000..ccb12e4690 --- /dev/null +++ b/yql/essentials/utils/log/log_level.h @@ -0,0 +1,98 @@ +#pragma once + +#include <library/cpp/logger/priority.h> + +#include <util/generic/strbuf.h> +#include <util/generic/yexception.h> + + +namespace NYql { +namespace NLog { + +enum class ELevel { + FATAL = TLOG_EMERG, + ERROR = TLOG_ERR, + WARN = TLOG_WARNING, + NOTICE = TLOG_NOTICE, + INFO = TLOG_INFO, + DEBUG = TLOG_DEBUG, + TRACE = TLOG_RESOURCES, +}; + +struct ELevelHelpers { + static constexpr bool Lte(ELevel l1, ELevel l2) { + return ToInt(l1) <= ToInt(l2); + } + + static constexpr ELogPriority ToLogPriority(ELevel level) { + return static_cast<ELogPriority>(ToInt(level)); + } + + static ELevel FromLogPriority(ELogPriority priority) { + return FromInt(static_cast<int>(priority)); + } + + static constexpr int ToInt(ELevel level) { + return static_cast<int>(level); + } + + static ELevel FromInt(int level) { + switch (level) { + case TLOG_EMERG: + case TLOG_ALERT: + case TLOG_CRIT: return ELevel::FATAL; + + case TLOG_ERR: return ELevel::ERROR; + case TLOG_WARNING: return ELevel::WARN; + + case TLOG_NOTICE: + case TLOG_INFO: return ELevel::INFO; + + case TLOG_DEBUG: return ELevel::DEBUG; + case TLOG_RESOURCES: return ELevel::TRACE; + + default: + return ELevel::INFO; + } + } + + static TStringBuf ToString(ELevel level) { + // aligned 5-letters string + switch (level) { + case ELevel::FATAL: return TStringBuf("FATAL"); + case ELevel::ERROR: return TStringBuf("ERROR"); + case ELevel::WARN: return TStringBuf("WARN "); + case ELevel::NOTICE:return TStringBuf("NOTE "); + case ELevel::INFO: return TStringBuf("INFO "); + case ELevel::DEBUG: return TStringBuf("DEBUG"); + case ELevel::TRACE: return TStringBuf("TRACE"); + } + ythrow yexception() << "unknown log level: " << ToInt(level); + } + + static ELevel FromString(TStringBuf str) { + // aligned 5-letters string + if (str == TStringBuf("FATAL")) return ELevel::FATAL; + if (str == TStringBuf("ERROR")) return ELevel::ERROR; + if (str == TStringBuf("WARN ")) return ELevel::WARN; + if (str == TStringBuf("NOTE ")) return ELevel::NOTICE; + if (str == TStringBuf("INFO ")) return ELevel::INFO; + if (str == TStringBuf("DEBUG")) return ELevel::DEBUG; + if (str == TStringBuf("TRACE")) return ELevel::TRACE; + ythrow yexception() << "unknown log level: " << str; + } + + template <typename TFunctor> + static void ForEach(TFunctor&& f) { + static const int minValue = ToInt(ELevel::FATAL); + static const int maxValue = ToInt(ELevel::TRACE); + + for (int l = minValue; l <= maxValue; l++) { + f(FromInt(l)); + } + } +}; + + +} // namspace NLog +} // namspace NYql diff --git a/yql/essentials/utils/log/log_ut.cpp b/yql/essentials/utils/log/log_ut.cpp new file mode 100644 index 0000000000..d77fbea761 --- /dev/null +++ b/yql/essentials/utils/log/log_ut.cpp @@ -0,0 +1,667 @@ +#include "log.h" +#include "context.h" +#include "profile.h" +#include <yql/essentials/utils/log/ut/log_parser.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/logger/stream.h> + +#include <util/datetime/base.h> +#include <util/generic/yexception.h> +#include <util/system/getpid.h> +#include <util/string/split.h> +#include <util/string/cast.h> +#include <util/string/subst.h> + +#include <regex> + + +using namespace NYql; +using namespace NLog; + +Y_UNIT_TEST_SUITE(TLogTest) +{ + Y_UNIT_TEST(Format) { + TStringStream out; + YqlLoggerScope logger(&out); + YqlLogger().UpdateProcInfo("my_proc"); + + TString message = "some performance info"; + YQL_LOG(INFO) << message; + + TLogRow logRow = ParseLogRow(out.Str()); + + TDuration elapsed(logRow.Time - TInstant::Now()); + UNIT_ASSERT(elapsed < TDuration::MilliSeconds(5)); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_STRINGS_EQUAL(logRow.ProcName, "my_proc"); + UNIT_ASSERT_EQUAL(logRow.ProcId, GetPID()); + UNIT_ASSERT(logRow.ThreadId > 0); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL( + logRow.FileName, + TStringBuf(__FILE__).RNextTok(LOCSLASH_C)); + UNIT_ASSERT(logRow.LineNumber != 0); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, message); + } + + Y_UNIT_TEST(Levels) { + TStringStream out; + YqlLoggerScope logger(&out); // default log level INFO + + YQL_LOG(FATAL) << "fatal message"; + YQL_LOG(ERROR) << "error message"; + YQL_LOG(WARN) << "warning message"; + YQL_LOG(INFO) << "info message"; + YQL_LOG(DEBUG) << "debug message"; + YQL_LOG(TRACE) << "trace message"; + + TString fatalStr, errorStr, warnStr, infoStr, _; + Split(out.Str(), '\n', fatalStr, errorStr, warnStr, infoStr, _); + + { + TLogRow logRow = ParseLogRow(fatalStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::FATAL); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "fatal message"); + } + { + TLogRow logRow = ParseLogRow(errorStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::ERROR); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "error message"); + } + { + TLogRow logRow = ParseLogRow(warnStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "warning message"); + } + { + TLogRow logRow = ParseLogRow(infoStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "info message"); + } + } + + Y_UNIT_TEST(Components) { + TStringStream out; + YqlLoggerScope logger(&out); + + YQL_CLOG(INFO, Default) << "default message"; + YQL_CLOG(INFO, Core) << "core message"; + YQL_CLOG(INFO, Sql) << "sql message"; + YQL_CLOG(INFO, ProviderCommon) << "common message"; + YQL_CLOG(INFO, ProviderYt) << "yt message"; + YQL_CLOG(INFO, ProviderKikimr) << "kikimr message"; + YQL_CLOG(INFO, ProviderRtmr) << "rtmr message"; + YQL_CLOG(INFO, Performance) << "performance message"; + YQL_CLOG(INFO, Perf) << "perf message"; + + TString defaultStr, coreStr, sqlStr, commonStr, ytStr, + kikimrStr, rtmrStr, performanceStr, perfStr, _; + Split(out.Str(), '\n', defaultStr, coreStr, sqlStr, + commonStr, ytStr, + kikimrStr, rtmrStr, + performanceStr, perfStr, _); + + { + TLogRow logRow = ParseLogRow(defaultStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "default message"); + } + { + TLogRow logRow = ParseLogRow(coreStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "core message"); + } + { + TLogRow logRow = ParseLogRow(sqlStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Sql); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "sql message"); + } + { + TLogRow logRow = ParseLogRow(commonStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::ProviderCommon); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "common message"); + } + { + TLogRow logRow = ParseLogRow(ytStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::ProviderYt); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "yt message"); + } + { + TLogRow logRow = ParseLogRow(kikimrStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::ProviderKikimr); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "kikimr message"); + } + { + TLogRow logRow = ParseLogRow(rtmrStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::ProviderRtmr); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "rtmr message"); + } + { + TLogRow logRow = ParseLogRow(performanceStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "performance message"); + } + { + TLogRow logRow = ParseLogRow(perfStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "perf message"); + } + } + + Y_UNIT_TEST(Conditional) { + TStringStream out; + YqlLoggerScope logger(&out); + + YQL_LOG_IF(INFO, true) << "default info message"; + YQL_LOG_IF(INFO, false) << "must not be logged"; + + YQL_CLOG_IF(INFO, Perf, true) << "perf info message"; + YQL_CLOG_IF(INFO, Perf, false) << "perf info message"; + + TString defaultStr, perfStr, _; + Split(out.Str(), '\n', defaultStr, perfStr, _); + + { + TLogRow logRow = ParseLogRow(defaultStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "default info message"); + } + { + TLogRow logRow = ParseLogRow(perfStr); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "perf info message"); + } + } + + Y_UNIT_TEST(Contexts) { + TStringStream out; + YqlLoggerScope logger(&out); + + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, ""); + YQL_LOG(INFO) << "level0 - begin"; + { + YQL_LOG_CTX_SCOPE("ctx1"); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1"); + YQL_LOG(INFO) << "level1 - begin"; + + YQL_LOG_CTX_BLOCK(TStringBuf("ctx2")) { + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1/ctx2"); + YQL_LOG(WARN) << "level2"; + } + + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1"); + YQL_LOG(INFO) << "level1 - end"; + } + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, ""); + YQL_LOG(INFO) << "level0 - end"; + + TString row1Str, row2Str, row3Str, row4Str, row5Str, _; + Split(out.Str(), '\n', row1Str, row2Str, row3Str, row4Str, row5Str, _); + + { + TLogRow logRow = ParseLogRow(row1Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "level0 - begin"); + } + { + TLogRow logRow = ParseLogRow(row2Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1} level1 - begin"); + } + { + TLogRow logRow = ParseLogRow(row3Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1/ctx2} level2"); + } + { + TLogRow logRow = ParseLogRow(row4Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1} level1 - end"); + } + { + TLogRow logRow = ParseLogRow(row5Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "level0 - end"); + } + } + + Y_UNIT_TEST(UnknownSessionContexts) { + TStringStream out; + YqlLoggerScope logger(&out); + + { + YQL_LOG_CTX_ROOT_SCOPE("ctx"); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, ""); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx"); + YQL_LOG(INFO) << "level0 - begin"; + + { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(CurrentLogContextPath()); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, ""); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx"); + + YQL_LOG(INFO) << "level1 - begin"; + YQL_LOG_CTX_BLOCK("ctx1") { + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, ""); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx/ctx1"); + + YQL_LOG(WARN) << "level2"; + } + + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, ""); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx"); + YQL_LOG(INFO) << "level1 - end"; + } + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, ""); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx"); + YQL_LOG(INFO) << "level0 - end"; + } + + TString row1Str, row2Str, row3Str, row4Str, row5Str, _; + Split(out.Str(), '\n', row1Str, row2Str, row3Str, row4Str, row5Str, _); + { + TLogRow logRow = ParseLogRow(row1Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx} level0 - begin"); + } + { + TLogRow logRow = ParseLogRow(row2Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx} level1 - begin"); + } + { + TLogRow logRow = ParseLogRow(row3Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx/ctx1} level2"); + } + { + TLogRow logRow = ParseLogRow(row4Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx} level1 - end"); + } + { + TLogRow logRow = ParseLogRow(row5Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx} level0 - end"); + } + } + + Y_UNIT_TEST(SessionContexts) { + TStringStream out; + YqlLoggerScope logger(&out); + + { + YQL_LOG_CTX_ROOT_SESSION_SCOPE("sessionId", "ctx"); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "sessionId"); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx"); + YQL_LOG(INFO) << "level0 - begin"; + + { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(CurrentLogContextPath()); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "sessionId"); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx"); + + YQL_LOG(INFO) << "level1 - begin"; + YQL_LOG_CTX_BLOCK("ctx1") { + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "sessionId"); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx/ctx1"); + + YQL_LOG(WARN) << "level2"; + } + + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "sessionId"); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx"); + YQL_LOG(INFO) << "level1 - end"; + } + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "sessionId"); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx"); + YQL_LOG(INFO) << "level0 - end"; + } + + TString row1Str, row2Str, row3Str, row4Str, row5Str, _; + Split(out.Str(), '\n', row1Str, row2Str, row3Str, row4Str, row5Str, _); + { + TLogRow logRow = ParseLogRow(row1Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{sessionId/ctx} level0 - begin"); + } + { + TLogRow logRow = ParseLogRow(row2Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{sessionId/ctx} level1 - begin"); + } + { + TLogRow logRow = ParseLogRow(row3Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{sessionId/ctx/ctx1} level2"); + } + { + TLogRow logRow = ParseLogRow(row4Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{sessionId/ctx} level1 - end"); + } + { + TLogRow logRow = ParseLogRow(row5Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{sessionId/ctx} level0 - end"); + } + } + + Y_UNIT_TEST(ThrowWithContext) { + bool isThrown = false; + YQL_LOG_CTX_SCOPE("first"); + try { + YQL_LOG_CTX_SCOPE("second"); + YQL_LOG_CTX_THROW yexception() << "some message"; + } catch (const yexception& e) { + isThrown = true; + + UNIT_ASSERT_STRINGS_EQUAL(e.AsStrBuf(), "some message"); + + TString throwedLogCtx = ThrowedLogContextPath(); + TStringBuf file, line, context; + TStringBuf(throwedLogCtx).Split(".cpp:", file, line); + line.Split(':', line, context); + + TString expectedFile(__LOCATION__.File); + SubstGlobal(expectedFile, LOCSLASH_C, '/'); + UNIT_ASSERT_STRINGS_EQUAL(TString(file)+".cpp", expectedFile); + int lineNumber; + UNIT_ASSERT(TryFromString<int>(line, lineNumber)); + UNIT_ASSERT(lineNumber > 0); + UNIT_ASSERT_STRINGS_EQUAL(context, " {first/second} "); + + // second call without throw returns empty string + throwedLogCtx = ThrowedLogContextPath(); + UNIT_ASSERT(throwedLogCtx.empty()); + } + + UNIT_ASSERT_C(isThrown, "exception was not thrown"); + } + + Y_UNIT_TEST(ContextOverride) { + TStringStream out; + YqlLoggerScope logger(&out); + + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, ""); + { + YQL_LOG_CTX_SCOPE("ctx1"); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1"); + YQL_LOG(INFO) << "level1 - begin"; + + YQL_LOG_CTX_BLOCK(TStringBuf("ctx2")) { + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1/ctx2"); + YQL_LOG(WARN) << "level2 - begin"; + + { + YQL_LOG_CTX_ROOT_SCOPE("ctx3"); + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx3"); + YQL_LOG(ERROR) << "level3"; + } + + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1/ctx2"); + YQL_LOG(WARN) << "level2 - end"; + } + + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1"); + YQL_LOG(INFO) << "level1 - end"; + } + UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, ""); + + TString row1Str, row2Str, row3Str, row4Str, row5Str, _; + Split(out.Str(), '\n', row1Str, row2Str, row3Str, row4Str, row5Str, _); + + { + TLogRow logRow = ParseLogRow(row1Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1} level1 - begin"); + } + { + TLogRow logRow = ParseLogRow(row2Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1/ctx2} level2 - begin"); + } + { + TLogRow logRow = ParseLogRow(row3Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::ERROR); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx3} level3"); + } + { + TLogRow logRow = ParseLogRow(row4Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1/ctx2} level2 - end"); + } + { + TLogRow logRow = ParseLogRow(row5Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1} level1 - end"); + } + } + + Y_UNIT_TEST(Profiling) { + TStringStream out; + YqlLoggerScope logger(&out); + + { + YQL_PROFILE_SCOPE(INFO, "scope1"); + } + + YQL_PROFILE_BLOCK(WARN, "block1") { + Sleep(TDuration::MilliSeconds(2)); + } + + YQL_PROFILE_BLOCK(ERROR, "block2") { + Sleep(TDuration::MilliSeconds(1200)); + } + + bool isExecuted = false; + YQL_PROFILE_BLOCK(TRACE, "block3") { // log will be filtered out + isExecuted = true; + } + UNIT_ASSERT(isExecuted); + + TString row1Str, row2Str, row3Str, _; + Split(out.Str(), '\n', row1Str, row2Str, row3Str, _); + + { + TLogRow logRow = ParseLogRow(row1Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance); + std::regex re("Execution of \\[scope1\\] took [0-9\\.]+us"); + bool isMatch = std::regex_match(logRow.Message.c_str(), re); + UNIT_ASSERT_C(isMatch, "Unexpected message: " << logRow.Message); + } + { + TLogRow logRow = ParseLogRow(row2Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance); + std::regex re("Execution of \\[block1\\] took [0-9\\.]+ms"); + bool isMatch = std::regex_match(logRow.Message.c_str(), re); + UNIT_ASSERT_C(isMatch, "Unexpected message: " << logRow.Message); + } + { + TLogRow logRow = ParseLogRow(row3Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::ERROR); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance); + std::regex re("Execution of \\[block2\\] took [0-9\\.]+s"); + bool isMatch = std::regex_match(logRow.Message.c_str(), re); + UNIT_ASSERT_C(isMatch, "Unexpected message: " << logRow.Message); + } + } + + + int Func1(int a, char b) { + YQL_PROFILE_FUNC(INFO); + return a + b; + } + + int Func2(int a, char b) { + YQL_PROFILE_FUNCSIG(WARN); + return a + b; + } + + Y_UNIT_TEST(ProfilingFuncs) { + TStringStream out; + YqlLoggerScope logger(&out); + + Func1(1, 2); + Func2(1, 2); + + TString row1Str, row2Str, _; + Split(out.Str(), '\n', row1Str, row2Str, _); + + { + TLogRow logRow = ParseLogRow(row1Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance); + std::regex re("Execution of \\[Func1\\] took [0-9\\.]+us"); + bool isMatch = std::regex_match(logRow.Message.c_str(), re); + UNIT_ASSERT_C(isMatch, "Unexpected message: " << logRow.Message); + } + { + TLogRow logRow = ParseLogRow(row2Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance); +#ifdef _win_ + std::regex re("Execution of \\[int __cdecl NTestSuiteTLogTest::Func2\\(int, char\\)\\] took [0-9\\.]+us"); +#else + std::regex re("Execution of \\[int NTestSuiteTLogTest::Func2\\(int, char\\)\\] took [0-9\\.]+us"); +#endif + bool isMatch = std::regex_match(logRow.Message.c_str(), re); + UNIT_ASSERT_C(isMatch, "Unexpected message: " << logRow.Message); + } + } + + Y_UNIT_TEST(Limit1) { + size_t limit = 0; + { + TStringStream out; + YqlLoggerScope logger(&out); + YqlLogger().UpdateProcInfo("proc"); + YQL_CLOG(INFO, Core) << "message1"; + limit = out.Str().length() * 2 - 7; // Not more than 2 log lines + } + + TStringStream out; + YqlLoggerScope logger(&out); + YqlLogger().UpdateProcInfo("proc"); + YqlLogger().SetMaxLogLimit(limit); + + YQL_CLOG(INFO, Core) << "message1"; + YQL_CLOG(INFO, Core) << "message2"; + YQL_CLOG(INFO, Core) << "message3"; + + TString row1Str, row2Str, row3Str, _; + Split(out.Str(), '\n', row1Str, row2Str, row3Str, _); + + { + TLogRow logRow = ParseLogRow(row1Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "message1"); + } + { + TLogRow logRow = ParseLogRow(row2Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "message2"); + } + { + TLogRow logRow = ParseLogRow(row3Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::FATAL); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "Log is truncated by limit"); + } + } + + Y_UNIT_TEST(Limit2) { + size_t limit = 0; + { + TStringStream out; + YqlLoggerScope logger(&out); + YqlLogger().UpdateProcInfo("proc"); + YQL_CLOG(INFO, Core) << "message1"; + limit = out.Str().length() * 2 - 7; // Not more than 2 log lines + } + + TStringStream out; + YqlLoggerScope logger(&out); + YqlLogger().UpdateProcInfo("proc"); + YqlLogger().SetMaxLogLimit(limit); + + YQL_CLOG(INFO, Core) << "message1"; + YQL_CLOG(INFO, Core) << "message2"; + YQL_CLOG(WARN, Core) << "message3"; + + TString row1Str, row2Str, row3Str, row4Str, _; + Split(out.Str(), '\n', row1Str, row2Str, row3Str, row4Str, _); + + { + TLogRow logRow = ParseLogRow(row1Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "message1"); + } + { + TLogRow logRow = ParseLogRow(row2Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "message2"); + } + { + TLogRow logRow = ParseLogRow(row3Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::FATAL); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "Log is truncated by limit"); + } + { + TLogRow logRow = ParseLogRow(row4Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "message3"); + } + } +} diff --git a/yql/essentials/utils/log/profile.cpp b/yql/essentials/utils/log/profile.cpp new file mode 100644 index 0000000000..4dd412b8f6 --- /dev/null +++ b/yql/essentials/utils/log/profile.cpp @@ -0,0 +1,45 @@ +#include "profile.h" +#include "log.h" + +#include <util/stream/format.h> + + +#define YQL_PERF_LOG(level, file, line) YQL_LOG_IMPL( \ + ::NYql::NLog::YqlLogger(), ::NYql::NLog::EComponent::Perf, level, \ + ::NYql::NLog::TContextPreprocessor, file, line) + + +namespace NYql { +namespace NLog { + +TProfilingScope::~TProfilingScope() { + if (Name_ == nullptr) { + return; + } + + double elapsed = static_cast<double>(::MicroSeconds() - StartedAt_); + TStringBuf unit("us"); + if (elapsed > 1000000) { + elapsed /= 1000000; + unit = TStringBuf("s"); + } else if (elapsed > 1000) { + elapsed /= 1000; + unit = TStringBuf("ms"); + } + + auto doLog = [&]() { + YQL_PERF_LOG(Level_, File_, Line_) + << TStringBuf("Execution of [") << Name_ + << TStringBuf("] took ") << Prec(elapsed, 3) << unit; + }; + + if (!LogCtxPath_.first.empty() || !LogCtxPath_.second.empty()) { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(LogCtxPath_); + doLog(); + } else { + doLog(); + } +} + +} // namspace NLog +} // namspace NYql diff --git a/yql/essentials/utils/log/profile.h b/yql/essentials/utils/log/profile.h new file mode 100644 index 0000000000..dda7e03f9d --- /dev/null +++ b/yql/essentials/utils/log/profile.h @@ -0,0 +1,74 @@ +#pragma once + +#include "log_level.h" +#include "context.h" + +#include <util/system/datetime.h> + + +#define YQL_PROFILE_SCOPE(level, name) \ + ::NYql::NLog::TProfilingScope Y_GENERATE_UNIQUE_ID(ps)( \ + name, ::NYql::NLog::ELevel::level, __FILE__, __LINE__) + +#define YQL_PROFILE_BLOCK_IMPL(level, name) \ + ::NYql::NLog::TProfilingScope( \ + name, ::NYql::NLog::ELevel::level, __FILE__, __LINE__) + +#define YQL_PROFILE_SCOPE_VAL(level, name) \ + TAutoPtr<::NYql::NLog::TProfilingScope>(new ::NYql::NLog::TProfilingScope(\ + name, ::NYql::NLog::ELevel::level, __FILE__, __LINE__, \ + ::NYql::NLog::CurrentLogContextPath())) + +#define YQL_PROFILE_BIND_VAL(future, scopeVal) \ + future.Apply([scopeVal](const decltype(future)& f) { \ + return f.GetValue(); \ + }); + +#define YQL_PROFILE_BLOCK(level, name) \ + if (auto Y_GENERATE_UNIQUE_ID(t) = YQL_PROFILE_SCOPE_VAL(level, name)) { \ + goto Y_CAT(YQL_LOG_CTX_LABEL, __LINE__); \ + } else Y_CAT(YQL_LOG_CTX_LABEL, __LINE__): + +#define YQL_PROFILE_FUNC(level) YQL_PROFILE_SCOPE(level, __FUNCTION__) +#define YQL_PROFILE_FUNCSIG(level) YQL_PROFILE_SCOPE(level, Y_FUNC_SIGNATURE) + +#define YQL_PROFILE_FUNC_VAL(level) YQL_PROFILE_SCOPE_VAL(level, __FUNCTION__) +#define YQL_PROFILE_FUNCSIG_VAL(level) YQL_PROFILE_SCOPE_VAL(level, Y_FUNC_SIGNATURE) + + +namespace NYql { +namespace NLog { + +/** + * @brief Adds elapsed execution time to log when goes outside of scope. + */ +class TProfilingScope { +public: + TProfilingScope(const char* name, ELevel level, const char* file, int line, + std::pair<TString, TString> logCtxPath = std::make_pair(TString(), TString())) + : Name_(name) + , Level_(level) + , File_(file) + , Line_(line) + , StartedAt_(::MicroSeconds()) + , LogCtxPath_(std::move(logCtxPath)) + { + } + + ~TProfilingScope(); + + explicit inline operator bool() const noexcept { + return true; + } + +private: + const char* Name_; + ELevel Level_; + const char* File_; + int Line_; + ui64 StartedAt_; + std::pair<TString, TString> LogCtxPath_; +}; + +} // namspace NLog +} // namspace NYql diff --git a/yql/essentials/utils/log/proto/logger_config.proto b/yql/essentials/utils/log/proto/logger_config.proto new file mode 100644 index 0000000000..29aebf4088 --- /dev/null +++ b/yql/essentials/utils/log/proto/logger_config.proto @@ -0,0 +1,65 @@ +package NYql.NProto; +option java_package = "ru.yandex.yql.proto"; + +message TLoggingConfig { + enum ELogTo { + STDERR = 1; + STDOUT = 2; + CONSOLE = 3; + FILE = 4; + SYSLOG = 5; + YQL_UA_LOGGER = 6; + } + + enum ELevel { + FATAL = 0; + ERROR = 1; + WARN = 2; + INFO = 3; + DEBUG = 4; + TRACE = 5; + } + + enum EComponent { + DEFAULT = 0; + CORE = 1; + CORE_EXECUTION = 2; + SQL = 3; + PROVIDER_COMMON = 4; + PROVIDER_CONFIG = 5; + PROVIDER_RESULT = 6; + PROVIDER_YT = 7; + PROVIDER_KIKIMR = 8; + PROVIDER_KQP = 9; + PROVIDER_RTMR = 10; + PERFORMANCE = 11; + NET = 12; + PROVIDER_STAT = 13; + PROVIDER_SOLOMON = 14; + CORE_EVAL = 15; + CORE_PEEPHOLE = 16; + PROVIDER_DQ = 17; + PROVIDER_CLICKHOUSE = 18; + PROVIDER_YDB = 19; + PROVIDER_PQ = 20; + PROVIDER_S3 = 21; + CORE_DQ = 22; + HTTP_GATEWAY = 23; + PROVIDER_GENERIC = 24; + PROVIDER_PG = 25; + } + + message TComponentLevel { + optional EComponent C = 1; + optional ELevel L = 2; + } + + message TLogDestination { + optional ELogTo Type = 1; + optional string Target = 2; + } + + repeated TLogDestination LogDest = 2; // If none is passed InitLogger sets default STDERR backend + repeated TComponentLevel Levels = 3; + optional ELevel AllComponentsLevel = 4 [default = INFO]; +} diff --git a/yql/essentials/utils/log/proto/ya.make b/yql/essentials/utils/log/proto/ya.make new file mode 100644 index 0000000000..f251cf71c5 --- /dev/null +++ b/yql/essentials/utils/log/proto/ya.make @@ -0,0 +1,11 @@ +PROTO_LIBRARY() + +SRCS( + logger_config.proto +) + +IF (NOT PY_PROTOS_FOR) + EXCLUDE_TAGS(GO_PROTO) +ENDIF() + +END() diff --git a/yql/essentials/utils/log/tls_backend.cpp b/yql/essentials/utils/log/tls_backend.cpp new file mode 100644 index 0000000000..a92f123c9b --- /dev/null +++ b/yql/essentials/utils/log/tls_backend.cpp @@ -0,0 +1,49 @@ +#include "tls_backend.h" + +#include <util/system/tls.h> + + +namespace NYql { +namespace NLog { +namespace { + +Y_POD_STATIC_THREAD(TLogBackend*) CurrentBackend; + +} // namspace + +TLogBackend* SetLogBackendForCurrentThread(TLogBackend* backend) { + TLogBackend* prev = *(&CurrentBackend); + *(&CurrentBackend) = backend; + return prev; +} + +void TTlsLogBackend::WriteData(const TLogRecord& rec) { + TLogBackend* backend = *(&CurrentBackend); + if (backend) { + backend->WriteData(rec); + } else { + DefaultBackend_->WriteData(rec); + } +} + +void TTlsLogBackend::ReopenLog() { + TLogBackend* backend = *(&CurrentBackend); + if (backend) { + backend->ReopenLog(); + } else { + DefaultBackend_->ReopenLog(); + } +} + +ELogPriority TTlsLogBackend::FiltrationLevel() const { + TLogBackend* backend = *(&CurrentBackend); + if (backend) { + return backend->FiltrationLevel(); + } else { + return DefaultBackend_->FiltrationLevel(); + } + return LOG_MAX_PRIORITY; +} + +} // namspace NLog +} // namspace NYql diff --git a/yql/essentials/utils/log/tls_backend.h b/yql/essentials/utils/log/tls_backend.h new file mode 100644 index 0000000000..802a73aae9 --- /dev/null +++ b/yql/essentials/utils/log/tls_backend.h @@ -0,0 +1,67 @@ +#pragma once + +#include <library/cpp/logger/backend.h> + +#include <util/generic/ptr.h> + +#include <utility> + + +namespace NYql { +namespace NLog { + +/** + * @brief Dispatches all invocations to default logger backend configured + * for current thread. Must be used in conjunction with + * SetLogBackendForCurrentThread() function or TScopedBackend class. + */ +class TTlsLogBackend: public TLogBackend { +public: + TTlsLogBackend(TAutoPtr<TLogBackend> defaultBackend) + : DefaultBackend_(defaultBackend) + { + Y_DEBUG_ABORT_UNLESS(DefaultBackend_, "default backend is not set"); + } + + void WriteData(const TLogRecord& rec) override; + void ReopenLog() override; + ELogPriority FiltrationLevel() const override; + +private: + TAutoPtr<TLogBackend> DefaultBackend_; +}; + +/** + * @brief Sets given backend as default for current thread. Must be used in + * conjunction with TTlsLogBackend. + * + * @param backend - pointer to logger backend + * @return previous default logger backend + */ +TLogBackend* SetLogBackendForCurrentThread(TLogBackend* backend); + +/** + * @brief Sets itself as default for current thread on instantiation + * and restores previous one on destruction. Must be used in + * conjunction with TTlsLogBackend. + */ +template <typename TBackend> +class TScopedBackend: public TBackend { +public: + template <typename... TArgs> + TScopedBackend(TArgs&&... args) + : TBackend(std::forward<TArgs>(args)...) + , PrevBacked_(SetLogBackendForCurrentThread(this)) + { + } + + ~TScopedBackend() { + SetLogBackendForCurrentThread(PrevBacked_); + } + +private: + TLogBackend* PrevBacked_; +}; + +} // namspace NLog +} // namspace NYql diff --git a/yql/essentials/utils/log/tls_backend_ut.cpp b/yql/essentials/utils/log/tls_backend_ut.cpp new file mode 100644 index 0000000000..03e4684d78 --- /dev/null +++ b/yql/essentials/utils/log/tls_backend_ut.cpp @@ -0,0 +1,121 @@ +#include "tls_backend.h" +#include "log.h" +#include <yql/essentials/utils/log/ut/log_parser.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/logger/stream.h> +#include <library/cpp/logger/null.h> + +#include <util/system/thread.h> +#include <util/string/split.h> + +#include <thread> +#include <chrono> + + +using namespace NYql; +using namespace NLog; + +class TRunnable { +public: + TRunnable(TStringBuf name, int count) + : Name_(name) + , Count_(count) + { + } + + void operator()() { + using namespace std::chrono_literals; + + YQL_LOG(INFO) << "this message will be missed"; + { + TScopedBackend<TStreamLogBackend> logBackend(&Logs_); + for (int i = 0; i < Count_; i++) { + YQL_LOG(INFO) << Name_; + std::this_thread::sleep_for(20ms); + } + } + YQL_LOG(INFO) << "this message will be missed"; + } + + const TString& GetLogs() const { + return Logs_.Str(); + } + +private: + TString Name_; + int Count_; + TStringStream Logs_; +}; + +Y_UNIT_TEST_SUITE(TTlsLogBackendTest) +{ + Y_UNIT_TEST(CaptureOutputs) { + YqlLoggerScope logger(new TTlsLogBackend(new TNullLogBackend)); + + YQL_LOG(INFO) << "this message will be missed"; + + TRunnable r1("t1", 3); + std::thread t1(std::ref(r1)); + + TRunnable r2("t2", 2); + std::thread t2(std::ref(r2)); + + t1.join(); + t2.join(); + +// Cout << "--[t1 logs]-----------------\n" << r1.GetLogs() << Endl; +// Cout << "--[t2 logs]-----------------\n" << r2.GetLogs() << Endl; + + { // t1 + TString row1Str, row2Str, row3Str, _; + Split(r1.GetLogs(), '\n', row1Str, row2Str, row3Str, _); + + ui64 threadId = 0; + { + TLogRow logRow = ParseLogRow(row1Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT(logRow.ThreadId > 0); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "t1"); + threadId = logRow.ThreadId; + } + { + TLogRow logRow = ParseLogRow(row2Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_EQUAL(logRow.ThreadId, threadId); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "t1"); + } + { + TLogRow logRow = ParseLogRow(row3Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_EQUAL(logRow.ThreadId, threadId); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "t1"); + } + } + + { // t2 + TString row1Str, row2Str, _; + Split(r2.GetLogs(), '\n', row1Str, row2Str, _); + + ui64 threadId = 0; + { + TLogRow logRow = ParseLogRow(row1Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT(logRow.ThreadId > 0); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "t2"); + threadId = logRow.ThreadId; + } + { + TLogRow logRow = ParseLogRow(row2Str); + UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO); + UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default); + UNIT_ASSERT_EQUAL(logRow.ThreadId, threadId); + UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "t2"); + } + } + } +} diff --git a/yql/essentials/utils/log/ut/log_parser.h b/yql/essentials/utils/log/ut/log_parser.h new file mode 100644 index 0000000000..87cf52ed8b --- /dev/null +++ b/yql/essentials/utils/log/ut/log_parser.h @@ -0,0 +1,62 @@ +#pragma once + +#include <yql/essentials/utils/log/log.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <util/datetime/base.h> + +#include <regex> + + +namespace NYql { +namespace NLog { + +struct TLogRow { + TInstant Time; + ELevel Level; + TString ProcName; + pid_t ProcId; + ui64 ThreadId; + EComponent Component; + TString FileName; + ui32 LineNumber; + TString Message; +}; + +static TLogRow ParseLogRow(const TString& str) { + static std::regex rowRe( + "^([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\\.[0-9]{3}) " // (1) time + "([A-Z ]{5}) " // (2) level + "([a-zA-Z0-9_\\.-]+)" // (3) process name + ".pid=([0-9]+)," // (4) process id + " tid=(0?x?[0-9a-fA-F]+). " // (5) thread id + ".([a-zA-Z0-9_\\. ]+). " // (6) component name + "([^:]+):" // (7) file name + "([0-9]+): " // (8) line number + "([^\n]*)\n?$" // (9) message + , std::regex_constants::extended); + + std::cmatch match; + bool isMatch = std::regex_match(str.c_str(), match, rowRe); + + UNIT_ASSERT_C(isMatch, "log row does not match format: '" << str << '\''); + UNIT_ASSERT_EQUAL_C(match.size(), 10, "expected 10 groups in log row: '" << str << '\''); + + TLogRow logRow; + logRow.Time = TInstant::ParseIso8601(match[1].str()) - TDuration::Hours(4); + logRow.Level = ELevelHelpers::FromString(match[2].str()); + logRow.ProcName = match[3].str(); + logRow.ProcId = FromString<pid_t>(match[4].str()); + logRow.ThreadId = match[5].str().substr(0, 2) == "0x" ? + IntFromString<ui64, 16, TStringBuf>(match[5].str().substr(2)) : + IntFromString<ui64, 10, TStringBuf>(match[5].str()); + logRow.Component = EComponentHelpers::FromString(match[6].str()); + logRow.FileName = match[7].str(); + logRow.LineNumber = FromString<ui32>(match[8].str()); + logRow.Message = match[9].str(); + return logRow; +} + +} // namspace NLog +} // namspace NYql diff --git a/yql/essentials/utils/log/ut/ya.make b/yql/essentials/utils/log/ut/ya.make new file mode 100644 index 0000000000..15b6d07d08 --- /dev/null +++ b/yql/essentials/utils/log/ut/ya.make @@ -0,0 +1,8 @@ +UNITTEST_FOR(yql/essentials/utils/log) + +SRCS( + log_ut.cpp + tls_backend_ut.cpp +) + +END() diff --git a/yql/essentials/utils/log/ya.make b/yql/essentials/utils/log/ya.make new file mode 100644 index 0000000000..b4c97af67c --- /dev/null +++ b/yql/essentials/utils/log/ya.make @@ -0,0 +1,22 @@ +LIBRARY() + +SRCS( + context.cpp + log.cpp + profile.cpp + tls_backend.cpp +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/logger + library/cpp/logger/global + library/cpp/deprecated/atomic + yql/essentials/utils/log/proto +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yql/essentials/utils/md5_stream.cpp b/yql/essentials/utils/md5_stream.cpp new file mode 100644 index 0000000000..8a1c219164 --- /dev/null +++ b/yql/essentials/utils/md5_stream.cpp @@ -0,0 +1,20 @@ +#include "md5_stream.h" + +namespace NYql { + +TMd5OutputStream::TMd5OutputStream(IOutputStream& delegatee) + : Delegatee_(delegatee) +{ +} + +TString TMd5OutputStream::Finalize() { + char buf[33] = { 0 }; + return TString(Accumulator_.End(buf)); +} + +void TMd5OutputStream::DoWrite(const void* buf, size_t len) { + Delegatee_.Write(buf, len); + Accumulator_.Update(buf, len); +} + +} diff --git a/yql/essentials/utils/md5_stream.h b/yql/essentials/utils/md5_stream.h new file mode 100644 index 0000000000..ca8b1c4bb9 --- /dev/null +++ b/yql/essentials/utils/md5_stream.h @@ -0,0 +1,19 @@ +#pragma once + +#include <util/stream/output.h> +#include <library/cpp/digest/md5/md5.h> + +namespace NYql { +class TMd5OutputStream : public IOutputStream { +public: + explicit TMd5OutputStream(IOutputStream& delegatee); + TString Finalize(); + +private: + void DoWrite(const void* buf, size_t len) override; + +private: + IOutputStream& Delegatee_; + MD5 Accumulator_; +}; +} diff --git a/yql/essentials/utils/md5_stream_ut.cpp b/yql/essentials/utils/md5_stream_ut.cpp new file mode 100644 index 0000000000..1d04c632d4 --- /dev/null +++ b/yql/essentials/utils/md5_stream_ut.cpp @@ -0,0 +1,47 @@ +#include "md5_stream.h" +#include <util/stream/input.h> +#include <util/stream/str.h> +#include <library/cpp/testing/unittest/registar.h> + +using namespace NYql; + +namespace { +TString Consume(const TString& input) { + TStringInput s(input); + + TString output; + TStringOutput outputStream(output); + TMd5OutputStream md5Stream(outputStream); + + UNIT_ASSERT_VALUES_EQUAL(input.size(), TransferData(&s, &md5Stream)); + UNIT_ASSERT_VALUES_EQUAL(input, output); + return md5Stream.Finalize(); +} +} + +Y_UNIT_TEST_SUITE(TStreamMd5Tests) { + Y_UNIT_TEST(Empty) { + const auto md5 = Consume(""); + const TString emptyStringMd5 = "d41d8cd98f00b204e9800998ecf8427e"; + UNIT_ASSERT_VALUES_EQUAL(md5, emptyStringMd5); + } + + Y_UNIT_TEST(ShortText) { + const auto md5 = Consume("hello from Y!"); + const TString expectedMd5 = "abf59ed7b0daa71085e76e461a737cc2"; + UNIT_ASSERT_VALUES_EQUAL(md5, expectedMd5); + } + + Y_UNIT_TEST(BigText) { + // TransferData uses TempBuf of 64K + const TString s(1000000, 'A'); + const auto md5 = Consume(s.c_str()); + /* + $ for i in {1..1000000};do echo -n A >> 1M.txt;done + $ md5sum 1M.txt + 48fcdb8b87ce8ef779774199a856091d 1M.txt + */ + const TString expectedMd5 = "48fcdb8b87ce8ef779774199a856091d"; + UNIT_ASSERT_VALUES_EQUAL(md5, expectedMd5); + } +} diff --git a/yql/essentials/utils/method_index.cpp b/yql/essentials/utils/method_index.cpp new file mode 100644 index 0000000000..030dbc3111 --- /dev/null +++ b/yql/essentials/utils/method_index.cpp @@ -0,0 +1,44 @@ +#include "method_index.h" +#include <util/generic/yexception.h> +#include <util/string/hex.h> + +namespace NYql { + +size_t GetMethodPtrIndex(uintptr_t ptr) { +#ifdef _win_ + size_t offset; + if (memcmp((void*)ptr, "\x48\x8B\x01\xFF", 4) == 0) { + if (*(ui8*)(ptr + 4) == 0x60) { + offset = *(ui8*)(ptr + 5); + } else if (*(ui8*)(ptr + 4) == 0xa0) { + offset = *(ui32*)(ptr + 5); + } else { + ythrow yexception() << "Unsupported code: " << HexEncode((char*)ptr + 4, 1); + } + } else if (memcmp((void*)ptr, "\x50\x48\x89\x0c\x24\x48\x8b\x0c\x24\x48\x8b\x01\x48\x8b", 14) == 0) { + if (*(ui8*)(ptr + 14) == 0x40) { + offset = *(ui8*)(ptr + 15); + } else if (*(ui8*)(ptr + 14) == 0x80) { + offset = *(ui32*)(ptr + 15); + } else { + ythrow yexception() << "Unsupported code: " << HexEncode((char*)ptr + 14, 1); + } + } else if (memcmp((void*)ptr, "\x48\x8b\x01\x48\x8b", 5) == 0) { + if (*(ui8*)(ptr + 5) == 0x40) { + offset = *(ui8*)(ptr + 6); + } else if (*(ui8*)(ptr + 5) == 0x80) { + offset = *(ui32*)(ptr + 6); + } else { + ythrow yexception() << "Unsupported code: " << HexEncode((char*)ptr + 5, 1); + } + } else { + ythrow yexception() << "Unsupported code: " << HexEncode((char*)ptr, 16); + } + + return offset / 8 + 1; +#else + return ptr >> 3; +#endif +} + +} diff --git a/yql/essentials/utils/method_index.h b/yql/essentials/utils/method_index.h new file mode 100644 index 0000000000..6539fd3a3d --- /dev/null +++ b/yql/essentials/utils/method_index.h @@ -0,0 +1,24 @@ +#pragma once +#include <util/system/platform.h> +#include <cstring> +#include <cstdint> + +namespace NYql { + +size_t GetMethodPtrIndex(uintptr_t ptr); + +template<typename Method> +inline size_t GetMethodIndex(Method method) { + uintptr_t ptr; + std::memcpy(&ptr, &method, sizeof(uintptr_t)); + return GetMethodPtrIndex(ptr); +} + +template<typename Method> +inline uintptr_t GetMethodPtr(Method method) { + uintptr_t ptr; + std::memcpy(&ptr, &method, sizeof(uintptr_t)); + return ptr; +} + +}
\ No newline at end of file diff --git a/yql/essentials/utils/multi_resource_lock.cpp b/yql/essentials/utils/multi_resource_lock.cpp new file mode 100644 index 0000000000..84b095a207 --- /dev/null +++ b/yql/essentials/utils/multi_resource_lock.cpp @@ -0,0 +1,42 @@ +#include "multi_resource_lock.h" + +using namespace NYql; + +TMultiResourceLock::TResourceLock TMultiResourceLock::Acquire(TString resourceId) { + TLock::TPtr lock = ProvideResourceLock(resourceId); + + // resource-specific mutex should be locked outside of Guard_ lock + return { *this, std::move(lock), std::move(resourceId) }; +} + +TMultiResourceLock::~TMultiResourceLock() { + with_lock(Guard_) { + Y_ABORT_UNLESS(Locks_.empty(), "~TMultiResourceLock: we still have %lu unreleased locks", Locks_.size()); + } +} + +TMultiResourceLock::TLock::TPtr TMultiResourceLock::ProvideResourceLock(const TString& resourceId) { + with_lock(Guard_) { + auto it = Locks_.find(resourceId); + if (it == Locks_.end()) { + it = Locks_.emplace(resourceId, MakeIntrusive<TLock>()).first; + } + + // important: ref count will be incremented under lock + // in this case we have guarantee TryCleanup will not erase this resource just after exit from this method and before entering lock->Mutex_.Acquire() + return it->second; + } +} + +void TMultiResourceLock::TryCleanup(const TString& resourceId) { + with_lock(Guard_) { + auto it = Locks_.find(resourceId); + if (it == Locks_.end()) { + return; + } + + if (it->second->IsUnique()) { + Locks_.erase(it); + } + } +} diff --git a/yql/essentials/utils/multi_resource_lock.h b/yql/essentials/utils/multi_resource_lock.h new file mode 100644 index 0000000000..61804fa9a4 --- /dev/null +++ b/yql/essentials/utils/multi_resource_lock.h @@ -0,0 +1,81 @@ +#pragma once + +#include "yql_panic.h" + +#include <util/generic/map.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> +#include <util/system/mutex.h> + +namespace NYql { + +class TMultiResourceLock : private TNonCopyable { +private: + struct TLock : public TThrRefBase { + typedef TIntrusivePtr<TLock> TPtr; + + bool IsUnique() const { + return RefCount() == 1; + } + + TMutex Mutex_; + }; + +public: + struct TResourceLock : private TNonCopyable { + TResourceLock(TMultiResourceLock& owner, TLock::TPtr lock, TString resourceId) + : Owner_(owner) + , Lock_(std::move(lock)) + , ResourceId_(std::move(resourceId)) + { + Y_ENSURE(Lock_); + Lock_->Mutex_.Acquire(); + } + + TResourceLock(TResourceLock&& other) + : Owner_(other.Owner_) + , Lock_(std::move(other.Lock_)) + , ResourceId_(std::move(other.ResourceId_)) + { + + } + + TResourceLock& operator=(TResourceLock&&) = delete; + + ~TResourceLock() { + if (!Lock_) { + return; + } + + Lock_->Mutex_.Release(); + // decrement ref count before TryCleanup + Lock_ = nullptr; + Owner_.TryCleanup(ResourceId_); + } + + private: + TMultiResourceLock& Owner_; + TLock::TPtr Lock_; + TString ResourceId_; + }; + + TResourceLock Acquire(TString resourceId); + + template <typename F> + auto RunWithLock(TString resourceId, const F& f) -> decltype(f()) { + auto lock = Acquire(std::move(resourceId)); + return f(); + } + + ~TMultiResourceLock(); + +private: + TLock::TPtr ProvideResourceLock(const TString& resourceId); + void TryCleanup(const TString& resourceId); + +private: + TMutex Guard_; + TMap<TString, TLock::TPtr> Locks_; +}; + +} diff --git a/yql/essentials/utils/multi_resource_lock_ut.cpp b/yql/essentials/utils/multi_resource_lock_ut.cpp new file mode 100644 index 0000000000..0af9cea3ff --- /dev/null +++ b/yql/essentials/utils/multi_resource_lock_ut.cpp @@ -0,0 +1,54 @@ +#include "multi_resource_lock.h" +#include <util/generic/xrange.h> +#include <library/cpp/threading/future/async.h> +#include <library/cpp/testing/unittest/registar.h> + +namespace NYql { +using namespace NThreading; + +Y_UNIT_TEST_SUITE(TMultiResourceLock) { + Y_UNIT_TEST(ManyResources) { + TMultiResourceLock multiLock; + const int workersCount = 3; + TVector<TVector<int>> workersData; + workersData.resize(workersCount); + + TAdaptiveThreadPool queue; + queue.Start(0); + + TVector<NThreading::TFuture<void>> workers; + workers.reserve(workersCount); + TManualEvent startEvent; + + for (int i = 0; i < workersCount; ++i) { + TString resourceId = ToString(i); + TVector<int>& data = workersData.at(i); + NThreading::TFuture<void> f = NThreading::Async([&, resourceId]() { + startEvent.Wait(); + + for (int j = 0; j < 1000; ++j) { + const auto& l = multiLock.Acquire(resourceId); + Y_UNUSED(l); + data.push_back(j); + } + }, queue); + + workers.push_back(std::move(f)); + } + + startEvent.Signal(); + + NThreading::TFuture<void> all = NThreading::WaitExceptionOrAll(workers); + all.GetValueSync(); + queue.Stop(); + + // analyze workersData: + auto range0_999 = xrange(0, 1000); + for (auto& w : workersData) { + UNIT_ASSERT_VALUES_EQUAL(w.size(), 1000); + UNIT_ASSERT(std::equal(range0_999.begin(), range0_999.end(), w.begin())); + } + } +} + +} diff --git a/yql/essentials/utils/network/bind_in_range.cpp b/yql/essentials/utils/network/bind_in_range.cpp new file mode 100644 index 0000000000..88cf641ce3 --- /dev/null +++ b/yql/essentials/utils/network/bind_in_range.cpp @@ -0,0 +1,27 @@ +#include "bind_in_range.h" + +#include <yql/essentials/utils/log/log.h> +#include <util/datetime/base.h> + +namespace NYql { + +TVector<NBus::TBindResult> BindInRange(TRangeWalker<int>& portWalker) { + const int cyclesLimit = 3; + const int rangeSize = portWalker.GetRangeSize(); + const auto cycleDelay = TDuration::Seconds(3); + + for (int cycle = 0; cycle < cyclesLimit; ++cycle) { + for (int i = 0; i < rangeSize; ++i) { + try { + return NBus::BindOnPort(portWalker.MoveToNext(), false).second; + } catch (const TSystemError&) { + YQL_LOG(DEBUG) << CurrentExceptionMessage(); + } + } + + Sleep(cycleDelay); + } + + ythrow yexception() << "Unable to bind within port range [" << portWalker.GetStart() << ", " << portWalker.GetFinish() << "]"; +} +} diff --git a/yql/essentials/utils/network/bind_in_range.h b/yql/essentials/utils/network/bind_in_range.h new file mode 100644 index 0000000000..5621529dd5 --- /dev/null +++ b/yql/essentials/utils/network/bind_in_range.h @@ -0,0 +1,8 @@ +#pragma once + +#include <yql/essentials/utils/range_walker.h> +#include <library/cpp/messagebus/network.h> + +namespace NYql { +TVector<NBus::TBindResult> BindInRange(TRangeWalker<int>& portWalker); +} diff --git a/yql/essentials/utils/network/ya.make b/yql/essentials/utils/network/ya.make new file mode 100644 index 0000000000..282e1502b3 --- /dev/null +++ b/yql/essentials/utils/network/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + bind_in_range.cpp + bind_in_range.h +) + +PEERDIR( + library/cpp/messagebus +) + +END() diff --git a/yql/essentials/utils/parse_double.cpp b/yql/essentials/utils/parse_double.cpp new file mode 100644 index 0000000000..90923160c5 --- /dev/null +++ b/yql/essentials/utils/parse_double.cpp @@ -0,0 +1,76 @@ +#include "parse_double.h" + +#include <util/string/ascii.h> +#include <util/string/cast.h> + +namespace NYql { + +namespace { +template <typename T> +bool GenericTryFloatFromString(TStringBuf buf, T& value) { + value = 0; + if (!buf.size()) { + return false; + } + + if (TryFromString(buf.data(), buf.size(), value)) { + return true; + } + + const char* ptr = buf.data(); + ui32 size = buf.size(); + char sign = '+'; + if (*ptr == '+' || *ptr == '-') { + sign = *ptr; + ++ptr; + --size; + } + + if (size != 3) { + return false; + } + + // NaN or Inf (ignoring case) + if (AsciiToUpper(ptr[0]) == 'N' && AsciiToUpper(ptr[1]) == 'A' && AsciiToUpper(ptr[2]) == 'N') { + value = std::numeric_limits<T>::quiet_NaN(); + } else if (AsciiToUpper(ptr[0]) == 'I' && AsciiToUpper(ptr[1]) == 'N' && AsciiToUpper(ptr[2]) == 'F') { + value = std::numeric_limits<T>::infinity(); + } else { + return false; + } + + if (sign == '-') { + value = -value; + } + + return true; +} +} + +float FloatFromString(TStringBuf buf) { + float result = 0; + if (!TryFloatFromString(buf, result)) { + throw yexception() << "unable to parse float from '" << buf << "'"; + } + + return result; +} + +double DoubleFromString(TStringBuf buf) { + double result = 0; + if (!TryDoubleFromString(buf, result)) { + throw yexception() << "unable to parse double from '" << buf << "'"; + } + + return result; +} + +bool TryFloatFromString(TStringBuf buf, float& value) { + return GenericTryFloatFromString(buf, value); +} + +bool TryDoubleFromString(TStringBuf buf, double& value) { + return GenericTryFloatFromString(buf, value); +} + +} diff --git a/yql/essentials/utils/parse_double.h b/yql/essentials/utils/parse_double.h new file mode 100644 index 0000000000..61d1d940c9 --- /dev/null +++ b/yql/essentials/utils/parse_double.h @@ -0,0 +1,18 @@ +#pragma once + +#include <util/generic/strbuf.h> + +namespace NYql { + +/* +These parse functions can understand nan, inf, -inf case-insensitively +They do not parse empty string to zero +*/ + +float FloatFromString(TStringBuf buf); +double DoubleFromString(TStringBuf buf); + +bool TryFloatFromString(TStringBuf buf, float& value); +bool TryDoubleFromString(TStringBuf buf, double& value); + +} diff --git a/yql/essentials/utils/parse_double_ut.cpp b/yql/essentials/utils/parse_double_ut.cpp new file mode 100644 index 0000000000..4aecf64f88 --- /dev/null +++ b/yql/essentials/utils/parse_double_ut.cpp @@ -0,0 +1,55 @@ +#include "parse_double.h" +#include <library/cpp/testing/unittest/registar.h> + +namespace NYql { + +Y_UNIT_TEST_SUITE(TParseDouble) { + + template <typename T, typename F> + void ParseAndCheck(TStringBuf buf, F f, T expected) { + T result = 0; + UNIT_ASSERT(f(buf, result)); + UNIT_ASSERT_DOUBLES_EQUAL(expected, result, 1e-6); + } + + Y_UNIT_TEST(ExactValues) { + ParseAndCheck(TStringBuf("nan"), TryFloatFromString, std::numeric_limits<float>::quiet_NaN()); + ParseAndCheck(TStringBuf("nAn"), TryDoubleFromString, std::numeric_limits<double>::quiet_NaN()); + + ParseAndCheck(TStringBuf("+nan"), TryFloatFromString, std::numeric_limits<float>::quiet_NaN()); + ParseAndCheck(TStringBuf("+NAN"), TryDoubleFromString, std::numeric_limits<double>::quiet_NaN()); + + ParseAndCheck(TStringBuf("-nan"), TryFloatFromString, std::numeric_limits<float>::quiet_NaN()); + ParseAndCheck(TStringBuf("-NaN"), TryDoubleFromString, std::numeric_limits<double>::quiet_NaN()); + + ParseAndCheck(TStringBuf("inf"), TryFloatFromString, std::numeric_limits<float>::infinity()); + ParseAndCheck(TStringBuf("iNf"), TryDoubleFromString, std::numeric_limits<double>::infinity()); + + ParseAndCheck(TStringBuf("+inf"), TryFloatFromString, std::numeric_limits<float>::infinity()); + ParseAndCheck(TStringBuf("+INF"), TryDoubleFromString, std::numeric_limits<double>::infinity()); + + ParseAndCheck(TStringBuf("-inf"), TryFloatFromString, -std::numeric_limits<float>::infinity()); + ParseAndCheck(TStringBuf("-InF"), TryDoubleFromString, -std::numeric_limits<double>::infinity()); + + ParseAndCheck<float>(TStringBuf("-12.3456"), TryFloatFromString, -12.3456); + ParseAndCheck(TStringBuf("-12.3456"), TryDoubleFromString, -12.3456); + + ParseAndCheck<float>(TStringBuf("1.23e-2"), TryFloatFromString, 0.0123); + ParseAndCheck(TStringBuf("1.23e-2"), TryDoubleFromString, 0.0123); + + UNIT_ASSERT_EQUAL(FloatFromString(TStringBuf("iNf")), std::numeric_limits<float>::infinity()); + UNIT_ASSERT_EQUAL(DoubleFromString(TStringBuf("iNf")), std::numeric_limits<float>::infinity()); + } + + Y_UNIT_TEST(Errors) { + UNIT_ASSERT_EXCEPTION_CONTAINS(FloatFromString(TStringBuf("")), std::exception, "unable to parse float from ''"); + UNIT_ASSERT_EXCEPTION_CONTAINS(DoubleFromString(TStringBuf("")), std::exception, "unable to parse double from ''"); + + UNIT_ASSERT_EXCEPTION_CONTAINS(FloatFromString(TStringBuf("info")), std::exception, "unable to parse float from 'info'"); + UNIT_ASSERT_EXCEPTION_CONTAINS(DoubleFromString(TStringBuf("-nana")), std::exception, "unable to parse double from '-nana'"); + + UNIT_ASSERT_EXCEPTION_CONTAINS(FloatFromString(TStringBuf(nullptr)), std::exception, "unable to parse float from ''"); + UNIT_ASSERT_EXCEPTION_CONTAINS(DoubleFromString(TStringBuf(nullptr)), std::exception, "unable to parse double from ''"); + } +} +} diff --git a/yql/essentials/utils/prefetch.h b/yql/essentials/utils/prefetch.h new file mode 100644 index 0000000000..50eea7df5c --- /dev/null +++ b/yql/essentials/utils/prefetch.h @@ -0,0 +1,13 @@ +#pragma once + +namespace NYql { + +inline void PrefetchForRead(const void* ptr) { + __builtin_prefetch(ptr, 0, 3); +} + +inline void PrefetchForWrite(void* ptr) { + __builtin_prefetch(ptr, 1, 3); +} + +} // namespace NYql diff --git a/yql/essentials/utils/proc_alive.cpp b/yql/essentials/utils/proc_alive.cpp new file mode 100644 index 0000000000..7efb2584bc --- /dev/null +++ b/yql/essentials/utils/proc_alive.cpp @@ -0,0 +1,37 @@ +#include "proc_alive.h" + +#include <util/system/platform.h> +#include <util/system/compat.h> +#include <util/system/error.h> +#include <util/system/winint.h> + +#include <errno.h> + + +namespace NYql { + +bool IsProcessAlive(TProcessId pid) { +#ifdef _unix_ + // If sending a null signal fails with the error ESRCH, then we know + // the process doesn’t exist. If the call fails with the error + // EPERM - the process exists, but we don’t have permission to send + // a signal to it. + kill(pid, 0); + return errno != ESRCH; +#elif defined(_win_) + HANDLE process = OpenProcess(SYNCHRONIZE, FALSE, pid); + if (process == NULL) { + return false; + } + + DWORD ret = WaitForSingleObject(process, 0); + CloseHandle(process); + return ret == WAIT_TIMEOUT; +#else + Y_UNUSED(pid); + return false; +#endif +} + +} // NYql + diff --git a/yql/essentials/utils/proc_alive.h b/yql/essentials/utils/proc_alive.h new file mode 100644 index 0000000000..c4b798b4ca --- /dev/null +++ b/yql/essentials/utils/proc_alive.h @@ -0,0 +1,9 @@ +#pragma once + +#include <util/system/getpid.h> + +namespace NYql { + +bool IsProcessAlive(TProcessId pid); + +} diff --git a/yql/essentials/utils/rand_guid.cpp b/yql/essentials/utils/rand_guid.cpp new file mode 100644 index 0000000000..d89eefbd3b --- /dev/null +++ b/yql/essentials/utils/rand_guid.cpp @@ -0,0 +1,32 @@ +#include "rand_guid.h" + +#include <util/system/datetime.h> +#include <util/system/getpid.h> +#include <util/system/unaligned_mem.h> +#include <util/generic/guid.h> + +namespace NYql { + +TAtomic TRandGuid::Counter = 0; + +TRandGuid::TRandGuid() { + ResetSeed(); +} + +void TRandGuid::ResetSeed() { + new (&Rnd_) TMersenne<ui64>(GetCycleCount() + MicroSeconds() + GetPID()); +} + +TString TRandGuid::GenGuid() { + TGUID ret = {}; + WriteUnaligned<ui64>(ret.dw, GetRnd().GenRand()); + ret.dw[2] = (ui32)GetRnd().GenRand(); + ret.dw[3] = AtomicIncrement(Counter); + + return GetGuidAsString(ret); +} + +ui64 TRandGuid::GenNumber() { + return GetRnd().GenRand(); +} +} diff --git a/yql/essentials/utils/rand_guid.h b/yql/essentials/utils/rand_guid.h new file mode 100644 index 0000000000..30496bdd5f --- /dev/null +++ b/yql/essentials/utils/rand_guid.h @@ -0,0 +1,30 @@ +#pragma once + +#include <library/cpp/deprecated/atomic/atomic.h> +#include <util/random/mersenne.h> + +#include <type_traits> + +namespace NYql { +class TRandGuid { +public: + TRandGuid(); + TRandGuid(TRandGuid&&) = default; + TRandGuid& operator=(TRandGuid&&) = default; + + void ResetSeed(); + + TString GenGuid(); + ui64 GenNumber(); + +private: + TMersenne<ui64>& GetRnd() { + return reinterpret_cast<TMersenne<ui64>&>(Rnd_); + } + +private: + std::aligned_storage<sizeof(TMersenne<ui64>) ,alignof(TMersenne<ui64>)>::type Rnd_; + + static TAtomic Counter; +}; +} diff --git a/yql/essentials/utils/range_walker.h b/yql/essentials/utils/range_walker.h new file mode 100644 index 0000000000..268fe36472 --- /dev/null +++ b/yql/essentials/utils/range_walker.h @@ -0,0 +1,47 @@ +#pragma once + +#include <util/generic/yexception.h> + +namespace NYql { + +template <typename T> +class TRangeWalker { +private: + const T Start_; + const T Finish_; + T Current_; + +public: + TRangeWalker(T start, T finish) + : Start_(start) + , Finish_(finish) + , Current_(start) + { + if (Start_ > Finish_) { + ythrow yexception() << "Invalid range for walker"; + } + } + + T GetStart() const { + return Start_; + } + + T GetFinish() const { + return Finish_; + } + + T GetRangeSize() const { + return Finish_ - Start_ + 1; + } + + T MoveToNext() { + T result = Current_++; + + if (Current_ > Finish_) { + Current_ = Start_; + } + + return result; + } +}; +} diff --git a/yql/essentials/utils/range_walker_ut.cpp b/yql/essentials/utils/range_walker_ut.cpp new file mode 100644 index 0000000000..d6a86cc804 --- /dev/null +++ b/yql/essentials/utils/range_walker_ut.cpp @@ -0,0 +1,35 @@ +#include "range_walker.h" + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NYql; + +Y_UNIT_TEST_SUITE(TRangeWalkerTests) { + Y_UNIT_TEST(InvalidRange) { + UNIT_ASSERT_EXCEPTION_CONTAINS(TRangeWalker<int>(2, 1), yexception, "Invalid range for walker"); + } + + Y_UNIT_TEST(SingleValueRange) { + TRangeWalker<int> w(5, 5); + UNIT_ASSERT_EQUAL(5, w.GetStart()); + UNIT_ASSERT_EQUAL(5, w.GetFinish()); + UNIT_ASSERT_EQUAL(1, w.GetRangeSize()); + + for (int i = 0; i < 10; ++i) { + UNIT_ASSERT_EQUAL(5, w.MoveToNext()); + } + } + + Y_UNIT_TEST(ManyValuesRange) { + TRangeWalker<int> w(5, 7); + UNIT_ASSERT_EQUAL(5, w.GetStart()); + UNIT_ASSERT_EQUAL(7, w.GetFinish()); + UNIT_ASSERT_EQUAL(3, w.GetRangeSize()); + + for (int i = 0; i < 10; ++i) { + UNIT_ASSERT_EQUAL(5, w.MoveToNext()); + UNIT_ASSERT_EQUAL(6, w.MoveToNext()); + UNIT_ASSERT_EQUAL(7, w.MoveToNext()); + } + } +} diff --git a/yql/essentials/utils/resetable_setting.h b/yql/essentials/utils/resetable_setting.h new file mode 100644 index 0000000000..0112105a3b --- /dev/null +++ b/yql/essentials/utils/resetable_setting.h @@ -0,0 +1,67 @@ +#pragma once + +#include "yql_panic.h" + +#include <util/generic/maybe.h> +#include <util/generic/variant.h> + +namespace NYql { + +template <typename S, typename R> +class TResetableSettingBase { +protected: + using TSet = S; + using TReset = R; + +public: + void Set(const TSet& value) { + Value.ConstructInPlace(value); + } + + void Reset(const TReset& value) { + Value.ConstructInPlace(value); + } + + bool Defined() const { + return Value.Defined(); + } + + explicit operator bool() const { + return Defined(); + } + + bool IsSet() const { + YQL_ENSURE(Defined()); + return Value->index() == 0; + } + + const TSet& GetValueSet() const { + YQL_ENSURE(IsSet()); + return std::get<TSet>(*Value); + } + + const TReset& GetValueReset() const { + YQL_ENSURE(!IsSet()); + return std::get<TReset>(*Value); + } + +private: + TMaybe<std::variant<TSet, TReset>> Value; +}; + +template <typename S, typename R> +class TResetableSetting: public TResetableSettingBase<S, R> { +}; + +template <typename S> +class TResetableSetting<S, void>: public TResetableSettingBase<S, TNothing> { +private: + const TNothing& GetValueReset() const; + +public: + void Reset() { + TResetableSettingBase<S, TNothing>::Reset(Nothing()); + } +}; + +} diff --git a/yql/essentials/utils/retry.cpp b/yql/essentials/utils/retry.cpp new file mode 100644 index 0000000000..ac40d79b12 --- /dev/null +++ b/yql/essentials/utils/retry.cpp @@ -0,0 +1 @@ +#include "retry.h" diff --git a/yql/essentials/utils/retry.h b/yql/essentials/utils/retry.h new file mode 100644 index 0000000000..aa8391fa48 --- /dev/null +++ b/yql/essentials/utils/retry.h @@ -0,0 +1,17 @@ +#pragma once + +namespace NYql { + +template <typename TRetriableException, typename TAction, typename TExceptionHandler> +auto WithRetry(int attempts, TAction&& a, TExceptionHandler&& exceptionHandler) { + for (int i = 1; i < attempts; ++i) { + try { + return a(); + } catch (const TRetriableException& e) { + exceptionHandler(e, i, attempts); + } + } + + return a(); +} +} diff --git a/yql/essentials/utils/retry_ut.cpp b/yql/essentials/utils/retry_ut.cpp new file mode 100644 index 0000000000..47cb35fd73 --- /dev/null +++ b/yql/essentials/utils/retry_ut.cpp @@ -0,0 +1,73 @@ +#include "retry.h" + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NYql; + +namespace { + +class TMyError : public yexception { +}; + +} + +Y_UNIT_TEST_SUITE(TRetryTests) { + Y_UNIT_TEST(ZeroAttempts) { + auto r = WithRetry<TMyError>(0, + []() { return TString("abc"); }, + [](auto, auto, auto) { UNIT_FAIL("Exception handler invoked"); }); + + UNIT_ASSERT_VALUES_EQUAL("abc", r); + } + + Y_UNIT_TEST(NoRetries) { + auto r = WithRetry<TMyError>(5, + []() { return TString("abc"); }, + [](auto, auto, auto) { UNIT_FAIL("Exception handler invoked"); }); + + UNIT_ASSERT_VALUES_EQUAL("abc", r); + } + + Y_UNIT_TEST(NoRetriesButException) { + UNIT_ASSERT_EXCEPTION_CONTAINS(WithRetry<TMyError>(5, + []() { throw yexception() << "xxxx"; }, + [](auto, auto, auto) { UNIT_FAIL("Exception handler invoked"); }), yexception, "xxxx"); + } + + Y_UNIT_TEST(FewRetries) { + int counter = 0; + int exceptions = 0; + auto r = WithRetry<TMyError>(3, [&]() { + if (counter++ < 2) { + throw TMyError() << "yyyy"; + } + + return counter; + }, [&](const auto& e, int attempt, int attemptCount) { + ++exceptions; + UNIT_ASSERT_VALUES_EQUAL(e.what(), "yyyy"); + UNIT_ASSERT_VALUES_EQUAL(attempt, counter); + UNIT_ASSERT_VALUES_EQUAL(attemptCount, 3); + }); + + UNIT_ASSERT_VALUES_EQUAL(2, exceptions); + UNIT_ASSERT_VALUES_EQUAL(3, r); + UNIT_ASSERT_VALUES_EQUAL(3, counter); + } + + Y_UNIT_TEST(ManyRetries) { + int counter = 0; + int exceptions = 0; + UNIT_ASSERT_EXCEPTION_CONTAINS(WithRetry<TMyError>(3, [&]() { + throw TMyError() << "yyyy" << counter++; + }, [&](const auto& e, int attempt, int attemptCount) { + ++exceptions; + UNIT_ASSERT_STRING_CONTAINS(e.what(), "yyyy"); + UNIT_ASSERT_VALUES_EQUAL(attempt, counter); + UNIT_ASSERT_VALUES_EQUAL(attemptCount, 3); + }), TMyError, "yyyy2"); + + UNIT_ASSERT_VALUES_EQUAL(2, exceptions); + UNIT_ASSERT_VALUES_EQUAL(3, counter); + } +} diff --git a/yql/essentials/utils/rope/rope_over_buffer.cpp b/yql/essentials/utils/rope/rope_over_buffer.cpp new file mode 100644 index 0000000000..e8074372fd --- /dev/null +++ b/yql/essentials/utils/rope/rope_over_buffer.cpp @@ -0,0 +1,43 @@ +#include "rope_over_buffer.h" + +#include <yql/essentials/utils/yql_panic.h> + +namespace NYql { + +namespace { + +class TContigousChunkOverBuf : public IContiguousChunk { +public: + TContigousChunkOverBuf(const std::shared_ptr<const void>& owner, TContiguousSpan span) + : Owner_(owner) + , Span_(span) + { + } +private: + TContiguousSpan GetData() const override { + return Span_; + } + + TMutableContiguousSpan GetDataMut() override { + YQL_ENSURE(false, "Payload mutation is not supported"); + } + + size_t GetOccupiedMemorySize() const override { + return Span_.GetSize(); + } + + const std::shared_ptr<const void> Owner_; + const TContiguousSpan Span_; +}; + +} // namespace + + +TRope MakeReadOnlyRope(const std::shared_ptr<const void>& owner, const char* data, size_t size) { + if (!size) { + return TRope(); + } + return TRope(new TContigousChunkOverBuf(owner, {data, size})); +} + +} // namespace NYql diff --git a/yql/essentials/utils/rope/rope_over_buffer.h b/yql/essentials/utils/rope/rope_over_buffer.h new file mode 100644 index 0000000000..9d18e52263 --- /dev/null +++ b/yql/essentials/utils/rope/rope_over_buffer.h @@ -0,0 +1,11 @@ +#pragma once + +#include <contrib/ydb/library/actors/util/rope.h> + +#include <memory> + +namespace NYql { + +TRope MakeReadOnlyRope(const std::shared_ptr<const void>& owner, const char* data, size_t size); + +}
\ No newline at end of file diff --git a/yql/essentials/utils/rope/ya.make b/yql/essentials/utils/rope/ya.make new file mode 100644 index 0000000000..9b92a50c48 --- /dev/null +++ b/yql/essentials/utils/rope/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + rope_over_buffer.cpp +) + +PEERDIR( + contrib/ydb/library/actors/util + yql/essentials/utils +) + +END() + diff --git a/yql/essentials/utils/signals/signals.cpp b/yql/essentials/utils/signals/signals.cpp new file mode 100644 index 0000000000..9ebe667120 --- /dev/null +++ b/yql/essentials/utils/signals/signals.cpp @@ -0,0 +1,300 @@ +#include "signals.h" +#include "utils.h" + +#include <yql/essentials/utils/log/log.h> +#include <yql/essentials/utils/backtrace/backtrace.h> +#include <contrib/ydb/library/yql/providers/yt/lib/log/yt_logger.h> + +#include <util/stream/output.h> +#include <util/generic/yexception.h> +#include <util/datetime/base.h> +#include <util/network/socket.h> +#include <util/system/getpid.h> + +#ifdef _linux_ +# include <sys/prctl.h> +#endif + +#include <string.h> +#include <signal.h> +#include <errno.h> +#include <stdlib.h> + + +namespace NYql { + +volatile sig_atomic_t NeedTerminate = 0; +volatile sig_atomic_t NeedQuit = 0; +volatile sig_atomic_t NeedReconfigure = 0; +volatile sig_atomic_t NeedReopenLog = 0; +volatile sig_atomic_t NeedReapZombies = 0; +volatile sig_atomic_t NeedInterrupt = 0; + +volatile sig_atomic_t CatchInterrupt = 0; + +TPipe SignalPipeW; +TPipe SignalPipeR; + +namespace { + +void SignalHandler(int signo) +{ + switch (signo) { + case SIGTERM: + NeedTerminate = 1; + break; + + case SIGQUIT: + NeedQuit = 1; + break; + +#ifdef _unix_ + case SIGHUP: + NeedReconfigure = 1; + break; + + case SIGUSR1: + NeedReopenLog = 1; + break; + + case SIGCHLD: + NeedReapZombies = 1; + break; +#endif + + case SIGINT: + if (CatchInterrupt) { + NeedInterrupt = 1; + } else { + fprintf(stderr, "%s (pid=%d) captured SIGINT\n", + GetProcTitle(), getpid()); + signal(signo, SIG_DFL); + raise(signo); + } + break; + + default: + break; + } +} + +void SignalHandlerWithSelfPipe(int signo) +{ + SignalHandler(signo); + + int savedErrno = errno; + if (write(SignalPipeW.GetHandle(), "x", 1) == -1 && errno != EAGAIN) { + static TStringBuf msg("cannot write to signal pipe"); +#ifndef STDERR_FILENO +#define STDERR_FILENO 2 +#endif + write(STDERR_FILENO, msg.data(), msg.size()); + abort(); + } + errno = savedErrno; +} + + +#ifndef _unix_ +const char* strsignal(int signo) +{ + switch (signo) { + case SIGTERM: return "SIGTERM"; + case SIGINT: return "SIGINT"; + case SIGQUIT: return "SIGQUIT"; + default: + return "UNKNOWN"; + } +} +#endif + +namespace { + +class TEmergencyLogOutput: public IOutputStream { +public: + TEmergencyLogOutput() + : Current_(Buf_) + , End_(Y_ARRAY_END(Buf_)) + { + } + + ~TEmergencyLogOutput() { + } + +private: + inline size_t Avail() const noexcept { + return End_ - Current_; + } + + void DoFlush() override { + if (Current_ != Buf_) { + NYql::NLog::YqlLogger().Write(TLOG_EMERG, Buf_, Current_ - Buf_); + Current_ = Buf_; + } + } + + void DoWrite(const void* buf, size_t len) override { + len = Min(len, Avail()); + if (len) { + char* end = Current_ + len; + memcpy(Current_, buf, len); + Current_ = end; + } + } + +private: + char Buf_[1 << 20]; + char* Current_; + char* const End_; + +}; + +TEmergencyLogOutput EMERGENCY_LOG_OUT; + +} + +void LogBacktraceOnSignal(int signum) +{ + if (NYql::NLog::IsYqlLoggerInitialized()) { + EMERGENCY_LOG_OUT << strsignal(signum) << TStringBuf(" (pid=") << GetPID() << TStringBuf("): "); + NYql::NBacktrace::KikimrBackTraceFormatImpl(&EMERGENCY_LOG_OUT); + EMERGENCY_LOG_OUT.Flush(); + } + NYql::FlushYtDebugLog(); + /* Now reraise the signal. We reactivate the signal’s default handling, + which is to terminate the process. We could just call exit or abort, + but reraising the signal sets the return status from the process + correctly. */ + raise(signum); +} + + +#ifdef _unix_ +int SetSignalHandler(int signo, void (*handler)(int)) +{ + struct sigaction sa; + sa.sa_flags = SA_RESTART; + sa.sa_handler = handler; + sigemptyset(&sa.sa_mask); + + return sigaction(signo, &sa, nullptr); +} + +#else +int SetSignalHandler(int signo, void (*handler)(int)) +{ + return (signal(signo, handler) == SIG_ERR) ? -1 : 0; +} + +#endif + +struct TSignalHandlerDesc +{ + int signo; + void (*handler)(int); +}; + +void SetSignalHandlers(const TSignalHandlerDesc* handlerDescs) +{ + sigset_t interestedSignals; + SigEmptySet(&interestedSignals); + + for (int i = 0; handlerDescs[i].signo != -1; i++) { + int signo = handlerDescs[i].signo; + SigAddSet(&interestedSignals, signo); + + if (SetSignalHandler(signo, handlerDescs[i].handler) == -1) { + ythrow TSystemError() << "Cannot set handler for signal " + << strsignal(signo); + } + } + + if (SigProcMask(SIG_BLOCK, &interestedSignals, NULL) == -1) { + ythrow TSystemError() << "Cannot set sigprocmask"; + } + + NYql::NBacktrace::AddAfterFatalCallback([](int signo){ LogBacktraceOnSignal(signo); }); + NYql::NBacktrace::RegisterKikimrFatalActions(); +} + +} // namespace + + +void InitSignals() +{ + TSignalHandlerDesc handlerDescs[] = { + { SIGTERM, SignalHandler }, + { SIGINT, SignalHandler }, + { SIGQUIT, SignalHandler }, +#ifdef _unix_ + { SIGPIPE, SIG_IGN }, + { SIGHUP, SignalHandler }, + { SIGUSR1, SignalHandler }, + { SIGCHLD, SignalHandler }, +#endif + { -1, nullptr } + }; + + SetSignalHandlers(handlerDescs); +} + +void InitSignalsWithSelfPipe() +{ + TSignalHandlerDesc handlerDescs[] = { + { SIGTERM, SignalHandlerWithSelfPipe }, + { SIGINT, SignalHandlerWithSelfPipe }, + { SIGQUIT, SignalHandlerWithSelfPipe }, +#ifdef _unix_ + { SIGPIPE, SIG_IGN }, + { SIGHUP, SignalHandlerWithSelfPipe }, + { SIGUSR1, SignalHandlerWithSelfPipe }, + { SIGCHLD, SignalHandlerWithSelfPipe }, +#endif + { -1, nullptr } + }; + + TPipe::Pipe(SignalPipeR, SignalPipeW); + SetNonBlock(SignalPipeR.GetHandle()); + SetNonBlock(SignalPipeW.GetHandle()); + + SetSignalHandlers(handlerDescs); +} + +void CatchInterruptSignal(bool doCatch) { + CatchInterrupt = doCatch; +} + +void SigSuspend(const sigset_t* mask) +{ +#ifdef _unix_ + sigsuspend(mask); +#else + Y_UNUSED(mask); + Sleep(TDuration::Seconds(1)); +#endif +} + +void AllowAnySignals() +{ + sigset_t blockMask; + SigEmptySet(&blockMask); + + if (SigProcMask(SIG_SETMASK, &blockMask, NULL) == -1) { + ythrow TSystemError() << "Cannot set sigprocmask"; + } +} + +bool HasPendingQuitOrTerm() { +#ifdef _unix_ + sigset_t signals; + SigEmptySet(&signals); + if (sigpending(&signals)) { + ythrow TSystemError() << "Error in sigpending"; + } + + return (SigIsMember(&signals, SIGQUIT) == 1) || (SigIsMember(&signals, SIGTERM) == 1); +#else + return false; +#endif +} +} // namespace NYql diff --git a/yql/essentials/utils/signals/signals.h b/yql/essentials/utils/signals/signals.h new file mode 100644 index 0000000000..612f906207 --- /dev/null +++ b/yql/essentials/utils/signals/signals.h @@ -0,0 +1,34 @@ +#pragma once + +#include <util/system/defaults.h> +#include <util/system/sigset.h> +#include <util/system/pipe.h> + +#include <signal.h> + + +namespace NYql { + +#ifdef _win_ +using sig_atomic_t = int; +#endif + +extern volatile sig_atomic_t NeedTerminate; +extern volatile sig_atomic_t NeedQuit; +extern volatile sig_atomic_t NeedReconfigure; +extern volatile sig_atomic_t NeedReopenLog; +extern volatile sig_atomic_t NeedReapZombies; +extern volatile sig_atomic_t NeedInterrupt; + +extern TPipe SignalPipeW; +extern TPipe SignalPipeR; + +void InitSignals(); +void InitSignalsWithSelfPipe(); +void CatchInterruptSignal(bool doCatch); + +void SigSuspend(const sigset_t* mask); +void AllowAnySignals(); +bool HasPendingQuitOrTerm(); + +} // namespace NYql diff --git a/yql/essentials/utils/signals/utils.cpp b/yql/essentials/utils/signals/utils.cpp new file mode 100644 index 0000000000..b1de131b30 --- /dev/null +++ b/yql/essentials/utils/signals/utils.cpp @@ -0,0 +1,122 @@ +#include "utils.h" + +#include <util/generic/yexception.h> +#include <util/string/subst.h> + +#include <google/protobuf/text_format.h> + +#include <library/cpp/protobuf/json/proto2json.h> +#include <library/cpp/json/yson/json2yson.h> + +#include <string.h> + +extern char** environ; + +namespace NYql { + +static char** g_OriginalArgv = nullptr; +static char* g_OriginalArgvLast = nullptr; + +/* + * To change the process title in Linux and Darwin we have to set argv[1] + * to NULL and to copy the title to the same place where the argv[0] points to. + * However, argv[0] may be too small to hold a new title. Fortunately, Linux + * and Darwin store argv[] and environ[] one after another. So we should + * ensure that is the continuous memory and then we allocate the new memory + * for environ[] and copy it. After this we could use the memory starting + * from argv[0] for our process title. + * + * continuous memory block for process title + * ________________________________/\____________________________________ + * / \ + * +---------+---------+-----+------+------------+------------+-----+------+ + * | argv[0] | argv[1] | ... | NULL | environ[0] | environ[1] | ... | NULL | + * +---------+---------+-----+------+------------+------------+-----+------+ + * \_________________ _________________/ + * \/ + * must be relocated elsewhere + */ +void ProcTitleInit(int argc, const char* argv[]) +{ + Y_UNUSED(argc); + Y_ABORT_UNLESS(!g_OriginalArgv, "ProcTitleInit() was already called"); + + g_OriginalArgv = const_cast<char**>(argv); + + size_t size = 0; + for (int i = 0; environ[i]; i++) { + size += strlen(environ[i]) + 1; + } + + char* newEnviron = new char[size]; + g_OriginalArgvLast = g_OriginalArgv[0]; + + for (int i = 0; g_OriginalArgv[i]; i++) { + if (g_OriginalArgvLast == g_OriginalArgv[i]) { + g_OriginalArgvLast = g_OriginalArgv[i] + strlen(g_OriginalArgv[i]) + 1; + } + } + + for (int i = 0; environ[i]; i++) { + if (g_OriginalArgvLast == environ[i]) { + size_t size = strlen(environ[i]) + 1; + g_OriginalArgvLast = environ[i] + size; + + strncpy(newEnviron, environ[i], size); + environ[i] = newEnviron; + newEnviron += size; + } + } + + g_OriginalArgvLast--; +} + +void SetProcTitle(const char* title) +{ + if (!g_OriginalArgv) return; + + char* p = g_OriginalArgv[0]; + p += strlcpy(p, "yqlworker: ", g_OriginalArgvLast - p); + p += strlcpy(p, title, g_OriginalArgvLast - p); + + if (g_OriginalArgvLast - p > 0) { + memset(p, 0, g_OriginalArgvLast - p); + } + + g_OriginalArgv[1] = nullptr; +} + +void AddProcTitleSuffix(const char* suffix) +{ + if (!g_OriginalArgv) return; + + char* p = g_OriginalArgv[0]; + p += strlcat(p, " ", g_OriginalArgvLast - p); + p += strlcat(p, suffix, g_OriginalArgvLast - p); +} + +const char* GetProcTitle() +{ + return g_OriginalArgv ? g_OriginalArgv[0] : "UNKNOWN"; +} + +TString PbMessageToStr(const google::protobuf::Message& msg) +{ + TString str; + ::google::protobuf::TextFormat::Printer printer; + printer.SetSingleLineMode(true); + printer.PrintToString(msg, &str); + return str; +} + +TString Proto2Yson(const google::protobuf::Message& proto) { + NJson::TJsonValue json; + NProtobufJson::Proto2Json(proto, json); + + TString ysonResult; + TStringOutput stream(ysonResult); + NJson2Yson::SerializeJsonValueAsYson(json, &stream); + return ysonResult; +} + +} // namespace NYql diff --git a/yql/essentials/utils/signals/utils.h b/yql/essentials/utils/signals/utils.h new file mode 100644 index 0000000000..75c55244fa --- /dev/null +++ b/yql/essentials/utils/signals/utils.h @@ -0,0 +1,21 @@ +#pragma once + +#include <util/generic/fwd.h> + +namespace google { +namespace protobuf { + class Message; +} // namespace protobuf +} // namespace google + +namespace NYql { + +void ProcTitleInit(int argc, const char* argv[]); +void SetProcTitle(const char* title); +void AddProcTitleSuffix(const char* suffix); +const char* GetProcTitle(); + +TString PbMessageToStr(const google::protobuf::Message& msg); +TString Proto2Yson(const google::protobuf::Message& proto); + +} // namespace NYql diff --git a/yql/essentials/utils/signals/ya.make b/yql/essentials/utils/signals/ya.make new file mode 100644 index 0000000000..3c72783dbf --- /dev/null +++ b/yql/essentials/utils/signals/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SRCS( + signals.cpp + signals.h + utils.cpp + utils.h +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/logger/global + library/cpp/protobuf/json + library/cpp/json/yson + yql/essentials/utils/log + yql/essentials/utils/backtrace + contrib/ydb/library/yql/providers/yt/lib/log +) + +END() diff --git a/yql/essentials/utils/sort.cpp b/yql/essentials/utils/sort.cpp new file mode 100644 index 0000000000..67992aff39 --- /dev/null +++ b/yql/essentials/utils/sort.cpp @@ -0,0 +1 @@ +#include "sort.h"
\ No newline at end of file diff --git a/yql/essentials/utils/sort.h b/yql/essentials/utils/sort.h new file mode 100644 index 0000000000..4adb7c9225 --- /dev/null +++ b/yql/essentials/utils/sort.h @@ -0,0 +1,27 @@ +#pragma once + +#include <contrib/libs/miniselect/include/miniselect/floyd_rivest_select.h> + +namespace NYql { + +template <class RandomIt> +void FastNthElement(RandomIt first, RandomIt middle, RandomIt last) { + ::miniselect::floyd_rivest_select(first, middle, last); +} + +template <class RandomIt, class Compare> +void FastNthElement(RandomIt first, RandomIt middle, RandomIt last, Compare compare) { + ::miniselect::floyd_rivest_select(first, middle, last, compare); +} + +template <class RandomIt> +void FastPartialSort(RandomIt first, RandomIt middle, RandomIt last) { + ::miniselect::floyd_rivest_partial_sort(first, middle, last); +} + +template <class RandomIt, class Compare> +void FastPartialSort(RandomIt first, RandomIt middle, RandomIt last, Compare compare) { + ::miniselect::floyd_rivest_partial_sort(first, middle, last, compare); +} + +} diff --git a/yql/essentials/utils/swap_bytes.cpp b/yql/essentials/utils/swap_bytes.cpp new file mode 100644 index 0000000000..c37aebcb6d --- /dev/null +++ b/yql/essentials/utils/swap_bytes.cpp @@ -0,0 +1 @@ +#include "swap_bytes.h" diff --git a/yql/essentials/utils/swap_bytes.h b/yql/essentials/utils/swap_bytes.h new file mode 100644 index 0000000000..bf09bb321a --- /dev/null +++ b/yql/essentials/utils/swap_bytes.h @@ -0,0 +1,20 @@ +#pragma once +#include <util/system/compiler.h> +#include <util/system/defaults.h> + +namespace NYql { + +// clang generates bswap for ui32 and ui64 +template <typename TUnsigned> +Y_FORCE_INLINE +TUnsigned SwapBytes(TUnsigned value) { + TUnsigned result; + auto* from = (ui8*)&value + sizeof(TUnsigned) - 1; + auto* to = (ui8*)&result; + for (size_t i = 0; i < sizeof(TUnsigned); ++i) { + *to++ = *from--; + } + return result; +} + +} diff --git a/yql/essentials/utils/sys/become_user.cpp b/yql/essentials/utils/sys/become_user.cpp new file mode 100644 index 0000000000..bbb6b5735c --- /dev/null +++ b/yql/essentials/utils/sys/become_user.cpp @@ -0,0 +1,188 @@ +#include "become_user.h" + +#ifdef _linux_ +#include <yql/essentials/utils/sys/linux_version.h> + +#include <util/generic/yexception.h> +#include <util/system/user.h> + +#include <memory> +#include <vector> +#include <errno.h> + +#include <grp.h> +#include <pwd.h> +#include <unistd.h> + +#include <sys/prctl.h> +#include <contrib/libs/libcap/include/sys/capability.h> +#include <contrib/libs/libcap/include/sys/securebits.h> + +// strange, but sometimes we have to specify values manually +#define PR_CAP_AMBIENT 47 +#define PR_CAP_AMBIENT_IS_SET 1 +#define PR_CAP_AMBIENT_RAISE 2 +#define PR_CAP_AMBIENT_LOWER 3 +#define PR_CAP_AMBIENT_CLEAR_ALL 4 + +namespace NYql { + +namespace { + +void SetCapFlag(cap_t caps, cap_flag_t flag, cap_value_t value) { + if (cap_set_flag(caps, flag, 1, &value, CAP_SET) < 0) { + throw TSystemError() << "cap_set_flag() failed, flag = " << static_cast<int>(flag) << ", value = " << value; + } +} + +void SetCapFlags(cap_t caps, cap_value_t value) { + SetCapFlag(caps, CAP_EFFECTIVE, value); + SetCapFlag(caps, CAP_PERMITTED, value); + SetCapFlag(caps, CAP_INHERITABLE, value); +} + +void ClearAmbientCapFlags() { + // from man: PR_CAP_AMBIENT (since Linux 4.3) + if (IsLinuxKernelBelow4_3()) { + return; + } + + if (prctl(PR_CAP_AMBIENT, PR_CAP_AMBIENT_CLEAR_ALL, 0, 0, 0) < 0) { + throw TSystemError() << "prctl(PR_CAP_AMBIENT, PR_CAP_AMBIENT_CLEAR_ALL, ....) failed"; + } +} + +void SetAmbientCapFlag(cap_value_t value) { + if (IsLinuxKernelBelow4_3()) { + return; + } + + if (prctl(PR_CAP_AMBIENT, PR_CAP_AMBIENT_RAISE, value, 0, 0) < 0) { + throw TSystemError() << "prctl(PR_CAP_AMBIENT, PR_CAP_AMBIENT_RAISE, ....) failed, value = " << value; + } +} + +void SetCapFlagsVector(const std::vector<cap_value_t>& flags) { + cap_t caps = cap_init(); + std::unique_ptr<std::remove_reference_t<decltype(*caps)>, decltype(&cap_free)> capsHolder(caps, &cap_free); + + if (!caps) { + throw TSystemError() << "cap_init() failed"; + } + + cap_clear(caps); + + for (auto f : flags) { + SetCapFlags(caps, f); + } + + if (cap_set_proc(caps) < 0) { + throw TSystemError() << "cap_set_proc() failed"; + } + + ClearAmbientCapFlags(); + for (auto f : flags) { + SetAmbientCapFlag(f); + } +} + +void EnsureCapFlagsVectorCannotBeRaised(const std::vector<cap_value_t>& flags) { + for (auto f : flags) { + try { + // one-by-one + SetCapFlagsVector({ f }); + } catch (const TSystemError&) { + continue; + } + + throw yexception() << "Cap flag " << f << " raised unexpectedly"; + } +} + +void DoBecomeUser(const char* username, const char* groupname) { + errno = 0; + passwd* pw = getpwnam(username); + if (pw == nullptr) { + if (errno == 0) { + ythrow yexception() << "unknown user: " << username; + } else { + ythrow TSystemError() << "can't get user info"; + } + } + + if (groupname == nullptr || strlen(groupname) == 0) { + groupname = username; + } + + errno = 0; + group* gr = getgrnam(groupname); + if (gr == nullptr) { + if (errno == 0) { + ythrow yexception() << "unknown group: " << groupname; + } else { + ythrow TSystemError() << "can't get group info"; + } + } + + if (setgid(gr->gr_gid) == -1) { + ythrow TSystemError() << "can't change process group"; + } + + if (initgroups(username, gr->gr_gid) == -1) { + ythrow TSystemError() << "can't initgroups"; + } + + if (setuid(pw->pw_uid) == -1) { + ythrow TSystemError() << "can't change process user"; + } + + if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) { + ythrow TSystemError() << "can't set dumpable flag for a process"; + } +} + +} + +void BecomeUser(const TString& username, const TString& groupname) { + DoBecomeUser(username.data(), groupname.data()); +} + +void TurnOnBecomeUserAmbientCaps() { + SetCapFlagsVector({ CAP_SETUID, CAP_SETGID, CAP_SETPCAP, CAP_KILL }); + if (prctl(PR_SET_SECUREBITS, SECBIT_NO_SETUID_FIXUP | SECBIT_NO_SETUID_FIXUP_LOCKED, 0, 0, 0) == -1) { + ythrow TSystemError() << "can't set secure bits for a process"; + } +} + +void TurnOffBecomeUserAbility() { + ClearAmbientCapFlags(); + SetCapFlagsVector({}); + EnsureCapFlagsVectorCannotBeRaised({ CAP_SETUID, CAP_SETGID, CAP_SETPCAP, CAP_KILL }); + + // ensure we cannot get root access back + if (setuid(0) != -1) { + ythrow TSystemError() << "unexpected switch to root in TurnOffBecomeUserAbility"; + } +} + +void DumpCaps(const TString& title) { + cap_t caps = cap_get_proc(); + std::unique_ptr<std::remove_reference_t<decltype(*caps)>, decltype(&cap_free)> capsHolder(caps, &cap_free); + + ssize_t size; + char* capsText = cap_to_text(caps, &size); + Cerr << title << ": current user: " << GetUsername() << ", proc caps: " << capsText << Endl; + + cap_free(capsText); +} + +void SendSignalOnParentThreadExit(int signo) +{ + if (::prctl(PR_SET_PDEATHSIG, signo) == -1) { + ythrow TSystemError() << "Cannot set signal " << strsignal(signo) << " for parent death using prctl"; + } +} + +} + +#endif diff --git a/yql/essentials/utils/sys/become_user.h b/yql/essentials/utils/sys/become_user.h new file mode 100644 index 0000000000..c5c2025d8b --- /dev/null +++ b/yql/essentials/utils/sys/become_user.h @@ -0,0 +1,26 @@ +#pragma once + +#include <util/generic/string.h> + +namespace NYql { + +// works on Linux only + +// assume we have enough capabilities to do so: CAP_SETUID, CAP_SETGID +void BecomeUser(const TString& username, const TString& groupname); + +// should be called by root (more specifically caps required: CAP_SETPCAP) +// special ambient capabilities will be set up: CAP_SETUID, CAP_SETGID, CAP_KILL +// they will be preserved by fork and exec* +void TurnOnBecomeUserAmbientCaps(); + +// forget ambient capabilities and ensure we cannot setuid to root +void TurnOffBecomeUserAbility(); + +// dump to stderr current secirity context incluing uid/guid/caps +void DumpCaps(const TString& title); + +// subscribe child process on receiving signal on parent process death (particularly on parent thread exit) +void SendSignalOnParentThreadExit(int signo); + +} diff --git a/yql/essentials/utils/sys/become_user_dummy.cpp b/yql/essentials/utils/sys/become_user_dummy.cpp new file mode 100644 index 0000000000..897d9c3977 --- /dev/null +++ b/yql/essentials/utils/sys/become_user_dummy.cpp @@ -0,0 +1,26 @@ +#include "become_user.h" +#ifndef _linux_ +namespace NYql { + +void BecomeUser(const TString& username, const TString& groupname) { + Y_UNUSED(username); + Y_UNUSED(groupname); +} + +void TurnOnBecomeUserAmbientCaps() { +} + +void TurnOffBecomeUserAbility() { +} + +void DumpCaps(const TString& title) { + Y_UNUSED(title); +} + +void SendSignalOnParentThreadExit(int signo) +{ + Y_UNUSED(signo); +} + +} +#endif diff --git a/yql/essentials/utils/sys/linux_version.cpp b/yql/essentials/utils/sys/linux_version.cpp new file mode 100644 index 0000000000..5d10af8294 --- /dev/null +++ b/yql/essentials/utils/sys/linux_version.cpp @@ -0,0 +1,46 @@ +#include "linux_version.h" + +#include <util/generic/yexception.h> +#include <util/system/platform.h> + +#ifdef _linux_ +# include <sys/utsname.h> +#endif + +namespace NYql { + std::tuple<int, int, int> DetectLinuxKernelVersion3() { +#ifdef _linux_ + // see https://github.com/torvalds/linux/blob/master/Makefile + // version is composed as follows: + // VERSION = 4 + // PATCHLEVEL = 18 + // SUBLEVEL = 0 + // EXTRAVERSION = -rc4 + // KERNELVERSION = $(VERSION)$(if $(PATCHLEVEL),.$(PATCHLEVEL)$(if $(SUBLEVEL),.$(SUBLEVEL)))$(EXTRAVERSION) + + utsname buf = {}; + if (uname(&buf)) { + ythrow TSystemError() << "uname call failed"; + } + + int v = 0; + int p = 0; + int s = 0; + if (sscanf(buf.release, "%d.%d.%d", &v, &p, &s) != 3) { + ythrow yexception() << "Failed to parse linux kernel version " << buf.release; + } + return std::make_tuple(v, p, s); +#else + return {}; +#endif + } + + std::pair<int, int> DetectLinuxKernelVersion2() { + auto v = DetectLinuxKernelVersion3(); + return std::make_pair(std::get<0>(v), std::get<1>(v)); + } + + bool IsLinuxKernelBelow4_3() { + return DetectLinuxKernelVersion2() < std::make_pair(4, 3); + } +} diff --git a/yql/essentials/utils/sys/linux_version.h b/yql/essentials/utils/sys/linux_version.h new file mode 100644 index 0000000000..c8c32da125 --- /dev/null +++ b/yql/essentials/utils/sys/linux_version.h @@ -0,0 +1,13 @@ +#pragma once + +#include <tuple> + +namespace NYql { + // returns version, patch level, sublevel, e.g. (4, 4, 114) for `uname -r` == "4.4.114-50" + std::tuple<int, int, int> DetectLinuxKernelVersion3(); + + // returns version, patch level + std::pair<int, int> DetectLinuxKernelVersion2(); + + bool IsLinuxKernelBelow4_3(); +} diff --git a/yql/essentials/utils/sys/ya.make b/yql/essentials/utils/sys/ya.make new file mode 100644 index 0000000000..698aeb8ba8 --- /dev/null +++ b/yql/essentials/utils/sys/ya.make @@ -0,0 +1,20 @@ +LIBRARY() + +SRCS( + become_user.h + become_user_dummy.cpp + linux_version.cpp + linux_version.h +) + +IF (OS_LINUX) + PEERDIR( + contrib/libs/libcap + ) + + SRCS( + become_user.cpp + ) +ENDIF() + +END() diff --git a/yql/essentials/utils/test_http_server/test_http_server.cpp b/yql/essentials/utils/test_http_server/test_http_server.cpp new file mode 100644 index 0000000000..5d55230c41 --- /dev/null +++ b/yql/essentials/utils/test_http_server/test_http_server.cpp @@ -0,0 +1,151 @@ +#include "test_http_server.h" + +#include <library/cpp/http/misc/httpcodes.h> +#include <library/cpp/http/server/http_ex.h> + +#include <util/generic/yexception.h> + +namespace NYql { + +class TTestHttpServer::TImpl : public THttpServer::ICallBack { + class TRequestProcessor : public THttpClientRequestEx { + public: + explicit TRequestProcessor(TImpl* parent) + : Parent_(parent) + { + Y_UNUSED(Parent_); + } + + bool Reply(void* /*tsr*/) override { + if (!ProcessHeaders()) { + return true; + } + + if (!RequestString.StartsWith("GET ")) { + return true; + } + + TRequest r; + for (auto& p : ParsedHeaders) { + if (p.first == "Authorization" && p.second.StartsWith("OAuth ")) { + r.OAuthToken = p.second.substr(strlen("OAuth ")); + continue; + } + + if (p.first == "If-None-Match") { + r.IfNoneMatch = p.second; + continue; + } + + if (p.first == "If-Modified-Since") { + r.IfModifiedSince = p.second; + continue; + } + } + + auto reply = Parent_->ProcessNextRequest(r); + + switch (reply.Code) { + case HTTP_OK: + Output() << "HTTP/1.1 200 Ok\r\n"; + break; + + case HTTP_NOT_MODIFIED: + Output() << "HTTP/1.1 304 Not modified\r\n"; + break; + + case HTTP_FORBIDDEN: + Output() << "HTTP/1.1 403 Forbidden\r\n"; + break; + + default: + return true; + } + + if (reply.ETag) { + Output() << "ETag: " + reply.ETag + "\r\n"; + } + + if (reply.LastModified) { + Output() << "Last-Modified: " + reply.LastModified + "\r\n"; + } + + if (reply.Content || reply.ContentLength) { + const int length = reply.ContentLength.GetOrElse(reply.Content.length()); + Output() << "Content-Length: " << length << "\r\n"; + } + + Output() << "\r\n"; + if (reply.Content) { + Output() << reply.Content; + } + + Output().Finish(); + + return true; + } + + private: + TImpl* Parent_ = nullptr; + }; + +public: + explicit TImpl(int port) + : HttpServer_(this, THttpServer::TOptions(port)) + , Port_(port) + { + + } + + TClientRequest* CreateClient() override { + return new TRequestProcessor(this); + } + + void Start() { + Y_ENSURE(HttpServer_.Start()); + } + + void Stop() { + HttpServer_.Stop(); + } + + TString GetUrl() const { + return "http://localhost:" + ToString(Port_); + } + + void SetRequestHandler(TRequestHandler handler) { + RequestHandler_ = std::move(handler); + } + +private: + TReply ProcessNextRequest(const TRequest& request) { + return RequestHandler_(request); + } + +private: + THttpServer HttpServer_; + const int Port_; + TRequestHandler RequestHandler_; +}; + +TTestHttpServer::TTestHttpServer(int port) + : Impl_(new TImpl(port)) { +} + +TTestHttpServer::~TTestHttpServer() { + Impl_->Stop(); +} + +void TTestHttpServer::Start() { + Impl_->Start(); +} + +TString TTestHttpServer::GetUrl() const { + return Impl_->GetUrl(); +} + +void TTestHttpServer::SetRequestHandler(TRequestHandler handler) { + return Impl_->SetRequestHandler(std::move(handler)); +} + +} diff --git a/yql/essentials/utils/test_http_server/test_http_server.h b/yql/essentials/utils/test_http_server/test_http_server.h new file mode 100644 index 0000000000..385cfaf970 --- /dev/null +++ b/yql/essentials/utils/test_http_server/test_http_server.h @@ -0,0 +1,77 @@ +#pragma once + +#include <library/cpp/http/misc/httpcodes.h> + +#include <util/generic/maybe.h> +#include <util/generic/ptr.h> +#include <util/generic/string.h> + +#include <functional> + +namespace NYql { + +class TTestHttpServer { +public: + struct TReply { + int Code = 0; + TString ETag; + TString LastModified; + TString Content; + TMaybe<int> ContentLength; + + static TReply Ok(const TString& content, TMaybe<int> contentLength = {}) { + TReply r; + r.Code = HTTP_OK; + r.Content = content; + r.ContentLength = contentLength; + return r; + } + + static TReply OkETag(const TString& content, const TString& etag, TMaybe<int> contentLength = {}) { + TReply r = Ok(content, contentLength); + r.ETag = etag; + return r; + } + + static TReply OkLastModified(const TString& content, const TString& lastModified, TMaybe<int> contentLength = {}) { + TReply r = Ok(content, contentLength); + r.LastModified = lastModified; + return r; + } + + static TReply NotModified(const TString& etag = {}, const TString& lastModified = {}) { + TReply r; + r.Code = HTTP_NOT_MODIFIED; + r.ETag = etag; + r.LastModified = lastModified; + return r; + } + + static TReply Forbidden() { + TReply r; + r.Code = HTTP_FORBIDDEN; + return r; + } + }; + + struct TRequest { + TString OAuthToken; + TString IfNoneMatch; + TString IfModifiedSince; + }; + + typedef std::function<TReply(const TRequest& request)> TRequestHandler; + +public: + explicit TTestHttpServer(int port); + ~TTestHttpServer(); + void Start(); + TString GetUrl() const; + void SetRequestHandler(TRequestHandler handler); + +private: + class TImpl; + THolder<TImpl> Impl_; +}; + +} diff --git a/yql/essentials/utils/test_http_server/ya.make b/yql/essentials/utils/test_http_server/ya.make new file mode 100644 index 0000000000..41c6710c5f --- /dev/null +++ b/yql/essentials/utils/test_http_server/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + test_http_server.cpp +) + +PEERDIR( + library/cpp/http/server + library/cpp/http/misc +) + +END() diff --git a/yql/essentials/utils/threading/async_queue.cpp b/yql/essentials/utils/threading/async_queue.cpp new file mode 100644 index 0000000000..4021636b55 --- /dev/null +++ b/yql/essentials/utils/threading/async_queue.cpp @@ -0,0 +1,18 @@ +#include "async_queue.h" + +namespace NYql { + +TAsyncQueue::TAsyncQueue(size_t numThreads, const TString& poolName) { + if (1 == numThreads) { + MtpQueue_.Reset(new TFakeThreadPool()); + } else { + MtpQueue_.Reset(new TSimpleThreadPool(TThreadPoolParams{poolName})); + } + MtpQueue_->Start(numThreads); +} + +TAsyncQueue::TPtr TAsyncQueue::Make(size_t numThreads, const TString& poolName) { + return new TAsyncQueue(numThreads, poolName); +} + +} diff --git a/yql/essentials/utils/threading/async_queue.h b/yql/essentials/utils/threading/async_queue.h new file mode 100644 index 0000000000..3dd75b9e08 --- /dev/null +++ b/yql/essentials/utils/threading/async_queue.h @@ -0,0 +1,51 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> +#include <library/cpp/threading/future/async.h> + +#include <util/thread/pool.h> +#include <util/generic/ptr.h> +#include <util/generic/function.h> +#include <util/system/guard.h> +#include <util/system/rwlock.h> + +#include <exception> + +namespace NYql { + +class TAsyncQueue: public TThrRefBase { +public: + using TPtr = TIntrusivePtr<TAsyncQueue>; + + static TPtr Make(size_t numThreads, const TString& poolName); + + void Stop() { + auto guard = TWriteGuard(Lock_); + if (MtpQueue_) { + MtpQueue_->Stop(); + MtpQueue_.Destroy(); + } + } + + template <typename TCallable> + [[nodiscard]] + ::NThreading::TFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>> Async(TCallable&& func) { + { + auto guard = TReadGuard(Lock_); + if (MtpQueue_) { + return ::NThreading::Async(std::move(func), *MtpQueue_); + } + } + + return ::NThreading::MakeErrorFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>>(std::make_exception_ptr(yexception() << "Thread pool is already stopped")); + } + +private: + TAsyncQueue(size_t numThreads, const TString& poolName); + +private: + TRWMutex Lock_; + THolder<IThreadPool> MtpQueue_; +}; + +} // NYql diff --git a/yql/essentials/utils/threading/ya.make b/yql/essentials/utils/threading/ya.make new file mode 100644 index 0000000000..d1cd6291a7 --- /dev/null +++ b/yql/essentials/utils/threading/ya.make @@ -0,0 +1,7 @@ +LIBRARY() + +SRCS( + async_queue.cpp +) + +END() diff --git a/yql/essentials/utils/url_builder.cpp b/yql/essentials/utils/url_builder.cpp new file mode 100644 index 0000000000..31344fbb22 --- /dev/null +++ b/yql/essentials/utils/url_builder.cpp @@ -0,0 +1,50 @@ +#include "url_builder.h" +#include <library/cpp/string_utils/quote/quote.h> +#include <util/generic/yexception.h> + +namespace NYql { + +TUrlBuilder::TUrlBuilder(const TString& uri) + : MainUri(uri) +{ +} + +TUrlBuilder& TUrlBuilder::AddUrlParam(const TString& name, const TString& value) { + Params.emplace_back(TParam {name, value}); + return *this; +} + +TUrlBuilder& TUrlBuilder::AddPathComponent(const TString& value) { + if (!value) { + throw yexception() << "Empty path component is not allowed"; + } + TStringBuilder res; + res << MainUri; + if (!MainUri.EndsWith('/')) { + res << '/'; + } + res << UrlEscapeRet(value, true); + + MainUri = std::move(res); + return *this; +} + +TString TUrlBuilder::Build() const { + if (Params.empty()) { + return MainUri; + } + + TStringBuilder res; + res << MainUri << "?"; + TStringBuf separator = ""sv; + for (const auto& p : Params) { + res << separator << p.Name; + if (p.Value) { + res << "=" << CGIEscapeRet(p.Value); + } + separator = "&"sv; + } + return std::move(res); +} + +} // NYql diff --git a/yql/essentials/utils/url_builder.h b/yql/essentials/utils/url_builder.h new file mode 100644 index 0000000000..774bc3bcd8 --- /dev/null +++ b/yql/essentials/utils/url_builder.h @@ -0,0 +1,26 @@ +#pragma once + +#include <vector> +#include <util/string/builder.h> + +namespace NYql { + +class TUrlBuilder { + struct TParam { + TString Name; + TString Value; + }; +public: + explicit TUrlBuilder(const TString& uri); + + // Assuming name is already escaped, do not use strings from user input + TUrlBuilder& AddUrlParam(const TString& name, const TString& value = ""); + TUrlBuilder& AddPathComponent(const TString& value); + + TString Build() const; +private: + std::vector<TParam> Params; + TString MainUri; +}; + +} // NYql diff --git a/yql/essentials/utils/url_builder_ut.cpp b/yql/essentials/utils/url_builder_ut.cpp new file mode 100644 index 0000000000..ad15a91698 --- /dev/null +++ b/yql/essentials/utils/url_builder_ut.cpp @@ -0,0 +1,57 @@ +#include "url_builder.h" + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NYql; + +Y_UNIT_TEST_SUITE(TUrlBuilder) { + Y_UNIT_TEST(UriOnly) { + TUrlBuilder builder("https://localhost/abc"); + UNIT_ASSERT_VALUES_EQUAL(builder.Build(), "https://localhost/abc"); + } + + Y_UNIT_TEST(Basic) { + TUrlBuilder builder("https://localhost/abc"); + builder.AddUrlParam("param1", "val1"); + builder.AddUrlParam("param2", "val2"); + + UNIT_ASSERT_VALUES_EQUAL(builder.Build(), "https://localhost/abc?param1=val1¶m2=val2"); + } + + Y_UNIT_TEST(BasicWithEncoding) { + auto url = TUrlBuilder("https://localhost/abc") + .AddUrlParam("param1", "=!@#$%^&*(){}[]\" ") + .AddUrlParam("param2", "val2") + .Build(); + + UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc?param1=%3D!@%23$%25%5E%26*%28%29%7B%7D%5B%5D%22+¶m2=val2"); + } + + Y_UNIT_TEST(EmptyPathComponent) { + TUrlBuilder builder("https://localhost/abc"); + UNIT_ASSERT_EXCEPTION_CONTAINS(builder.AddPathComponent(""), std::exception, "Empty path component is not allowed"); + auto url = builder.Build(); + // not changed + UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc"); + } + + Y_UNIT_TEST(SeveralPathComponents) { + auto url = TUrlBuilder("https://localhost/abc") + .AddPathComponent("oops") + .AddPathComponent("long oops") + .AddUrlParam("param1", "val1") + .AddUrlParam("param1", "long param") + .Build(); + UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc/oops/long%20oops?param1=val1¶m1=long+param"); + } + + Y_UNIT_TEST(SeveralPathComponentsWithSlashInBaseUri) { + // base uri ends with '/' + auto url = TUrlBuilder("https://localhost/abc/") + .AddPathComponent("oops%1234") + .AddPathComponent("long&oops=xxx") + .AddUrlParam("param1", "a&b=cdef") + .Build(); + UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc/oops%251234/long&oops=xxx?param1=a%26b%3Dcdef"); + } +} diff --git a/yql/essentials/utils/ut/ya.make b/yql/essentials/utils/ut/ya.make new file mode 100644 index 0000000000..0f7163ef79 --- /dev/null +++ b/yql/essentials/utils/ut/ya.make @@ -0,0 +1,14 @@ +UNITTEST_FOR(yql/essentials/utils) + +SRCS( + fp_bits_ut.cpp + md5_stream_ut.cpp + multi_resource_lock_ut.cpp + parse_double_ut.cpp + range_walker_ut.cpp + retry_ut.cpp + url_builder_ut.cpp + utf8_ut.cpp +) + +END() diff --git a/yql/essentials/utils/utf8.cpp b/yql/essentials/utils/utf8.cpp new file mode 100644 index 0000000000..af284849a8 --- /dev/null +++ b/yql/essentials/utils/utf8.cpp @@ -0,0 +1,260 @@ +#include "utf8.h" + +#include <util/charset/wide.h> + +#include <ctype.h> +#include <vector> + +namespace NYql { + +namespace { + +unsigned char GetRange(unsigned char c) { + // Referring to DFA of http://bjoern.hoehrmann.de/utf-8/decoder/dfa/ + // With new mapping 1 -> 0x10, 7 -> 0x20, 9 -> 0x40, such that AND operation can test multiple types. + static const unsigned char type[] = { + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, + 0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10, + 0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40, + 0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20, + 0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20, + 8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2, + 10,3,3,3,3,3,3,3,3,3,3,3,3,4,3,3, 11,6,6,6,5,8,8,8,8,8,8,8,8,8,8,8, + }; + return type[c]; +} + +struct TByteRange { + ui8 First = 0; + ui8 Last = 0; +}; + +struct TUtf8Ranges { + size_t BytesCount = 0; + TByteRange Bytes[4] = {}; +}; + +// see https://lemire.me/blog/2018/05/09/how-quickly-can-you-check-that-a-string-is-valid-unicode-utf-8 +inline static const std::vector<TUtf8Ranges> Utf8Ranges = { + { 1, { {0x00, 0x7f}, {0x00, 0x00}, {0x00, 0x00}, {0x00, 0x00}, } }, + { 2, { {0xc2, 0xdf}, {0x80, 0xbf}, {0x00, 0x00}, {0x00, 0x00}, } }, + { 3, { {0xe0, 0xe0}, {0xa0, 0xbf}, {0x80, 0xbf}, {0x00, 0x00}, } }, + { 3, { {0xe1, 0xec}, {0x80, 0xbf}, {0x80, 0xbf}, {0x00, 0x00}, } }, + { 3, { {0xed, 0xed}, {0x80, 0x9f}, {0x80, 0xbf}, {0x00, 0x00}, } }, + { 3, { {0xee, 0xef}, {0x80, 0xbf}, {0x80, 0xbf}, {0x00, 0x00}, } }, + { 4, { {0xf0, 0xf0}, {0x90, 0xbf}, {0x80, 0xbf}, {0x80, 0xbf}, } }, + { 4, { {0xf1, 0xf3}, {0x80, 0xbf}, {0x80, 0xbf}, {0x80, 0xbf}, } }, + { 4, { {0xf4, 0xf4}, {0x80, 0x8f}, {0x80, 0xbf}, {0x80, 0xbf}, } }, +}; + +std::optional<std::string> RoundBadUtf8(size_t range, std::string_view inputString, size_t pos, + bool roundDown) +{ + Y_ENSURE(range > 0); + Y_ENSURE(range < Utf8Ranges.size()); + + const std::string prefix{inputString.substr(0, pos)}; + std::string_view suffix = inputString.substr(pos, Utf8Ranges[range].BytesCount); + + std::string newSuffix; + if (roundDown) { + std::optional<size_t> lastNonMin; + for (size_t i = 0; i < suffix.size(); ++i) { + if (Utf8Ranges[range].Bytes[i].First < ui8(suffix[i])) { + lastNonMin = i; + } + } + + if (!lastNonMin) { + for (size_t i = 0; i < Utf8Ranges[range - 1].BytesCount; ++i) { + newSuffix.push_back(Utf8Ranges[range - 1].Bytes[i].Last); + } + } else { + for (size_t i = 0; i < Utf8Ranges[range].BytesCount; ++i) { + if (i < *lastNonMin) { + ui8 c = suffix[i]; + newSuffix.push_back(c); + } else if (i == *lastNonMin) { + ui8 c = suffix[i]; + newSuffix.push_back(std::min<ui8>(c - 1, Utf8Ranges[range].Bytes[i].Last)); + } else { + newSuffix.push_back(Utf8Ranges[range].Bytes[i].Last); + } + } + } + } else { + std::optional<size_t> lastNonMax; + bool valid = true; + for (size_t i = 0; i < suffix.size(); ++i) { + ui8 last = Utf8Ranges[range].Bytes[i].Last; + ui8 first = Utf8Ranges[range].Bytes[i].First; + ui8 curr = ui8(suffix[i]); + + valid = valid && curr <= last && curr >= first; + if (curr < last) { + lastNonMax = i; + } + } + + if (valid) { + newSuffix = suffix; + for (size_t i = suffix.size(); i < Utf8Ranges[range].BytesCount; ++i) { + newSuffix.push_back(Utf8Ranges[range].Bytes[i].First); + } + } else if (!lastNonMax) { + return NextValidUtf8(prefix); + } else { + for (size_t i = 0; i < Utf8Ranges[range].BytesCount; ++i) { + if (i < *lastNonMax) { + ui8 c = suffix[i]; + newSuffix.push_back(c); + } else if (i == *lastNonMax) { + ui8 c = suffix[i]; + newSuffix.push_back(std::max<ui8>(c + 1, Utf8Ranges[range].Bytes[i].First)); + } else { + newSuffix.push_back(Utf8Ranges[range].Bytes[i].First); + } + } + } + + } + return prefix + newSuffix; +} + +} + +bool IsUtf8(const std::string_view& str) { + for (auto it = str.cbegin(); str.cend() != it;) { +#define COPY() if (str.cend() != it) { c = *it++; } else { return false; } +#define TRANS(mask) result &= ((GetRange(static_cast<unsigned char>(c)) & mask) != 0) +#define TAIL() COPY(); TRANS(0x70) + auto c = *it++; + if (!(c & 0x80)) + continue; + + bool result = true; + switch (GetRange(static_cast<unsigned char>(c))) { + case 2: TAIL(); break; + case 3: TAIL(); TAIL(); break; + case 4: COPY(); TRANS(0x50); TAIL(); break; + case 5: COPY(); TRANS(0x10); TAIL(); TAIL(); break; + case 6: TAIL(); TAIL(); TAIL(); break; + case 10: COPY(); TRANS(0x20); TAIL(); break; + case 11: COPY(); TRANS(0x60); TAIL(); TAIL(); break; + default: return false; + } + + if (!result) return false; +#undef COPY +#undef TRANS +#undef TAIL + } + return true; +} + +unsigned char WideCharSize(char head) { + switch (GetRange(static_cast<unsigned char>(head))) { + case 0: return 1; + case 2: return 2; + case 3: return 3; + case 4: return 3; + case 5: return 4; + case 6: return 4; + case 10: return 3; + case 11: return 4; + default: return 0; + } +} + +std::optional<std::string> RoundToNearestValidUtf8(const std::string_view& str, bool roundDown) { + const size_t ss = str.size(); + for (size_t pos = 0; pos < ss; ) { + ui8 c = str[pos]; + + for (size_t i = 0; i < Utf8Ranges.size(); ++i) { + auto& range = Utf8Ranges[i]; + + if (c < range.Bytes[0].First) { + return RoundBadUtf8(i, str, pos, roundDown); + } + + if (c <= range.Bytes[0].Last) { + // valid UTF8 code point start + for (size_t j = 1; j < range.BytesCount; ++j) { + if (pos + j >= ss) { + return RoundBadUtf8(i, str, pos, roundDown); + } + ui8 cur = str[pos + j]; + if (!(cur >= range.Bytes[j].First && cur <= range.Bytes[j].Last)) { + return RoundBadUtf8(i, str, pos, roundDown); + } + } + + pos += range.BytesCount; + break; + } else if (i + 1 == Utf8Ranges.size()) { + if (!roundDown) { + return NextValidUtf8(str.substr(0, pos)); + } + return RoundBadUtf8(i, str, pos, roundDown); + } + } + } + return std::string(str); +} + +std::optional<std::string> NextValidUtf8(const std::string_view& str) { + Y_ENSURE(IsUtf8(str)); + TUtf32String wide = UTF8ToUTF32<false>(str); + bool incremented = false; + size_t toDrop = 0; + for (auto it = wide.rbegin(); it != wide.rend(); ++it) { + auto& c = *it; + if (c < 0x10ffff) { + c = (c == 0xd7ff) ? 0xe000 : (c + 1); + incremented = true; + break; + } else { + ++toDrop; + } + } + + if (!incremented) { + return {}; + } + + Y_ENSURE(toDrop < wide.size()); + wide.resize(wide.size() - toDrop); + + TString result = WideToUTF8(wide); + return std::string(result.data(), result.size()); +} + +std::optional<std::string> NextLexicographicString(const std::string_view& str) { + bool incremented = false; + size_t toDrop = 0; + std::string result{str}; + for (auto it = result.rbegin(); it != result.rend(); ++it) { + auto& c = *it; + if (ui8(c) < 0xff) { + ++c; + incremented = true; + break; + } else { + ++toDrop; + } + } + + if (!incremented) { + return {}; + } + + Y_ENSURE(toDrop < result.size()); + result.resize(result.size() - toDrop); + return result; +} + +} diff --git a/yql/essentials/utils/utf8.h b/yql/essentials/utils/utf8.h new file mode 100644 index 0000000000..5c28353416 --- /dev/null +++ b/yql/essentials/utils/utf8.h @@ -0,0 +1,16 @@ +#pragma once + +#include <optional> +#include <string_view> + +namespace NYql { + +bool IsUtf8(const std::string_view& str); + +unsigned char WideCharSize(char head); + +std::optional<std::string> RoundToNearestValidUtf8(const std::string_view& str, bool roundDown); +std::optional<std::string> NextValidUtf8(const std::string_view& str); +std::optional<std::string> NextLexicographicString(const std::string_view& str); + +} diff --git a/yql/essentials/utils/utf8_ut.cpp b/yql/essentials/utils/utf8_ut.cpp new file mode 100644 index 0000000000..7479acd7a1 --- /dev/null +++ b/yql/essentials/utils/utf8_ut.cpp @@ -0,0 +1,99 @@ +#include "utf8.h" + +#include <library/cpp/testing/unittest/registar.h> + +Y_UNIT_TEST_SUITE(TUtf8Tests) { + Y_UNIT_TEST(Simple) { + UNIT_ASSERT(NYql::IsUtf8("")); + UNIT_ASSERT(NYql::IsUtf8("\x01_ASCII_\x7F")); + UNIT_ASSERT(NYql::IsUtf8("Привет!")); + UNIT_ASSERT(NYql::IsUtf8("\xF0\x9F\x94\xA2")); + + UNIT_ASSERT(!NYql::IsUtf8("\xf5\x80\x80\x80")); + UNIT_ASSERT(!NYql::IsUtf8("\xed\xa6\x80")); + UNIT_ASSERT(!NYql::IsUtf8("\xF0\x9F\x94")); + UNIT_ASSERT(!NYql::IsUtf8("\xE3\x85\xB6\xE7\x9C\xB0\xE3\x9C\xBA\xE2\xAA\x96\xEE\xA2\x8C\xEC\xAF\xB8\xE1\xB2\xBB\xEC\xA3\x9C\xE3\xAB\x8B\xEC\x95\x92\xE1\x8A\xBF\xE2\x8E\x86\xEC\x9B\x8D\xE2\x8E\xAE\xE3\x8A\xA3\xE0\xAC\xBC\xED\xB6\x85")); + UNIT_ASSERT(!NYql::IsUtf8("\xc0\xbe\xd0\xb1\xd0\xbd\xd0\xbe\xd0\xb2\xd0\xbb\xd0\xb5\xd0\xbd\xd0\xb8\xd1\x8e")); + } + + Y_UNIT_TEST(CharSize) { + UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize(' '), 1); + UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize('\x00'), 1); + UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize('\x7F'), 1); + UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize('\xD1'), 2); + UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize('\xF0'), 4); + UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize('\xFF'), 0); + } + + Y_UNIT_TEST(RoundingDown) { + auto checkDown = [](std::string_view in, std::string_view out) { + auto res = NYql::RoundToNearestValidUtf8(in, true); + UNIT_ASSERT(res); + UNIT_ASSERT(NYql::IsUtf8(*res)); + UNIT_ASSERT_VALUES_EQUAL(*res, out); + UNIT_ASSERT(*res <= in); + }; + checkDown("привет", "привет"); + checkDown("тест\x80", "тест\x7f"); + checkDown("привет\xf5", "привет\xf4\x8f\xbf\xbf"); + checkDown("тест2\xee\x80\x7f", "тест2\xed\x9f\xbf"); + checkDown("ага\xf0\xaa\xaa\xff", "ага\xf0\xaa\xaa\xbf"); + } + + Y_UNIT_TEST(RoundingUp) { + auto checkUp = [](std::string_view in, std::string_view out) { + auto res = NYql::RoundToNearestValidUtf8(in, false); + UNIT_ASSERT(res); + UNIT_ASSERT(NYql::IsUtf8(*res)); + UNIT_ASSERT_VALUES_EQUAL(*res, out); + UNIT_ASSERT(*res >= in); + }; + + checkUp("", ""); + checkUp("привет", "привет"); + checkUp("а\xf6", "б"); + checkUp("\xf4\x8f\xbf\xbfа\xf4\x8f\xbf\xbf\xf5", "\xf4\x8f\xbf\xbfб"); + UNIT_ASSERT(!NYql::RoundToNearestValidUtf8("\xf4\x8f\xbf\xbf\xf5", false)); + UNIT_ASSERT(!NYql::RoundToNearestValidUtf8("\xf5", false)); + checkUp("тест\x80", "тест\xc2\x80"); + checkUp("тест\xdf", "тест\xdf\x80"); + checkUp("тест\xf0\x90\xff", "тест\xf0\x91\x80\x80"); + checkUp("ааа\xff", "ааб"); + } + + Y_UNIT_TEST(NextValid) { + auto checkNext = [](std::string_view in, std::string_view out) { + auto res = NYql::NextValidUtf8(in); + UNIT_ASSERT(res); + UNIT_ASSERT(NYql::IsUtf8(*res)); + UNIT_ASSERT_VALUES_EQUAL(*res, out); + UNIT_ASSERT(*res > in); + }; + + UNIT_ASSERT(!NYql::NextValidUtf8("")); + checkNext("привет", "привеу"); + checkNext("а", "б"); + checkNext(std::string_view("\x00", 1), "\x01"); + checkNext("\xf4\x8f\xbf\xbfа\xf4\x8f\xbf\xbf", "\xf4\x8f\xbf\xbfб"); + UNIT_ASSERT(!NYql::NextValidUtf8("\xf4\x8f\xbf\xbf")); + UNIT_ASSERT(!NYql::NextValidUtf8("\xf4\x8f\xbf\xbf\xf4\x8f\xbf\xbf")); + } + + Y_UNIT_TEST(NextValidString) { + auto checkNext = [](std::string_view in, std::string_view out) { + auto res = NYql::NextLexicographicString(in); + UNIT_ASSERT(res); + UNIT_ASSERT_VALUES_EQUAL(*res, out); + UNIT_ASSERT(*res > in); + }; + + UNIT_ASSERT(!NYql::NextLexicographicString("")); + checkNext("привет", "привеу"); + checkNext("а", "б"); + checkNext(std::string_view("\x00", 1), "\x01"); + checkNext("\xf4\x8f\xbf\xbfа\xf4\x8f\xbf\xbf", "\xf4\x8f\xbf\xbfа\xf4\x8f\xbf\xc0"); + UNIT_ASSERT(!NYql::NextLexicographicString("\xff")); + UNIT_ASSERT(!NYql::NextLexicographicString("\xff\xff")); + checkNext(std::string_view("x\x00\xff\xff", 4), "x\x01"); + } +} diff --git a/yql/essentials/utils/ya.make b/yql/essentials/utils/ya.make new file mode 100644 index 0000000000..8d18004e2b --- /dev/null +++ b/yql/essentials/utils/ya.make @@ -0,0 +1,67 @@ +LIBRARY() + +SRCS( + cast.h + debug_info.cpp + debug_info.h + exceptions.cpp + exceptions.h + future_action.cpp + future_action.h + hash.cpp + hash.h + limiting_allocator.cpp + md5_stream.cpp + md5_stream.h + method_index.cpp + method_index.h + multi_resource_lock.cpp + multi_resource_lock.h + parse_double.cpp + parse_double.h + proc_alive.cpp + proc_alive.h + rand_guid.cpp + rand_guid.h + resetable_setting.h + retry.cpp + retry.h + sort.cpp + sort.h + swap_bytes.cpp + swap_bytes.h + url_builder.cpp + utf8.cpp + yql_panic.cpp + yql_panic.h + yql_paths.cpp + yql_paths.h +) + +PEERDIR( + library/cpp/digest/md5 + library/cpp/string_utils/quote + library/cpp/threading/future + library/cpp/deprecated/atomic + contrib/libs/miniselect +) + +END() + +RECURSE( + backtrace + failure_injector + fetch + log + network + rope + signals + sys + test_http_server + threading +) + +RECURSE_FOR_TESTS( + ut +) + diff --git a/yql/essentials/utils/yql_panic.cpp b/yql/essentials/utils/yql_panic.cpp new file mode 100644 index 0000000000..83c6acc21f --- /dev/null +++ b/yql/essentials/utils/yql_panic.cpp @@ -0,0 +1,18 @@ +#include "yql_panic.h" + +namespace NYql { +namespace NDetail { + +void YqlPanic(const ::NPrivate::TStaticBuf& file, int line, const char* function, + const TStringBuf& condition, const TStringBuf& message) { + auto err = TYqlPanic() << file.As<TStringBuf>() << ":" << line << " " + << function << "(): requirement " << condition << " failed"; + if (!message.empty()) { + err << ", message: " << message; + } + + throw err; +} + +} // namespace NDetail +} // namespace NYql diff --git a/yql/essentials/utils/yql_panic.h b/yql/essentials/utils/yql_panic.h new file mode 100644 index 0000000000..e7e559b2c5 --- /dev/null +++ b/yql/essentials/utils/yql_panic.h @@ -0,0 +1,22 @@ +#pragma once +#include <util/generic/yexception.h> +#include <util/string/builder.h> +#include <util/system/src_root.h> + +namespace NYql { + +class TYqlPanic : public yexception +{}; + +namespace NDetail { + [[noreturn]] void YqlPanic(const ::NPrivate::TStaticBuf& file, int line, const char* function, const TStringBuf& condition, const TStringBuf& message); +} + +#define YQL_ENSURE(CONDITION, ...) \ + do { \ + if (Y_UNLIKELY(!(CONDITION))) { \ + ::NYql::NDetail::YqlPanic(__SOURCE_FILE_IMPL__, __LINE__, __FUNCTION__, #CONDITION, TStringBuilder() << "" __VA_ARGS__); \ + } \ + } while (0) + +} // namespace NYql diff --git a/yql/essentials/utils/yql_paths.cpp b/yql/essentials/utils/yql_paths.cpp new file mode 100644 index 0000000000..cc7dd29bfb --- /dev/null +++ b/yql/essentials/utils/yql_paths.cpp @@ -0,0 +1,23 @@ +#include "yql_paths.h" + +#include <util/folder/pathsplit.h> + +namespace NYql { + +TString BuildTablePath(TStringBuf prefixPath, TStringBuf path) { + if (prefixPath.empty()) { + return TString(path); + } + prefixPath.SkipPrefix("//"); + + TPathSplitUnix prefixPathSplit(prefixPath); + TPathSplitUnix pathSplit(path); + + if (pathSplit.IsAbsolute) { + return TString(path); + } + + return prefixPathSplit.AppendMany(pathSplit.begin(), pathSplit.end()).Reconstruct(); +} + +} diff --git a/yql/essentials/utils/yql_paths.h b/yql/essentials/utils/yql_paths.h new file mode 100644 index 0000000000..3ec05e7df8 --- /dev/null +++ b/yql/essentials/utils/yql_paths.h @@ -0,0 +1,9 @@ +#pragma once + +#include <util/generic/strbuf.h> + +namespace NYql { + +TString BuildTablePath(TStringBuf prefixPath, TStringBuf path); + +} diff --git a/yql/essentials/ya.make b/yql/essentials/ya.make index 57841ee7e2..9eb5692b54 100644 --- a/yql/essentials/ya.make +++ b/yql/essentials/ya.make @@ -2,5 +2,6 @@ SUBSCRIBER(g:yql) RECURSE( public + utils ) |