aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.com>2024-11-01 15:41:40 +0300
committervvvv <vvvv@yandex-team.com>2024-11-01 15:55:52 +0300
commit3325f745e67f7f442790822b5c9c5e9996708be7 (patch)
treef7318d68bbe8990092715436444b05297ce35777 /yql/essentials
parent6dce3f1c71786f2694b73b1a5155efc58f4557dd (diff)
downloadydb-3325f745e67f7f442790822b5c9c5e9996708be7.tar.gz
Moved yql/utils YQL-19206
Также была выделена жирная зависимость из yql/utils в yql/utils/network, в результате library/cpp/getopt была добавлена явно в те проекты, которые ее ранее наследовали, а не указывали явно commit_hash:36aa4c41f807b4cbbf70a3ed7ac0a1a5079bb75d
Diffstat (limited to 'yql/essentials')
-rw-r--r--yql/essentials/public/issue/ya.make2
-rw-r--r--yql/essentials/public/issue/yql_issue.cpp4
-rw-r--r--yql/essentials/public/result_format/ya.make2
-rw-r--r--yql/essentials/public/result_format/yql_codec_results.cpp2
-rw-r--r--yql/essentials/public/result_format/yql_restricted_yson.cpp4
-rw-r--r--yql/essentials/public/result_format/yql_result_format_data.cpp4
-rw-r--r--yql/essentials/utils/backtrace/backtrace.cpp215
-rw-r--r--yql/essentials/utils/backtrace/backtrace.h34
-rw-r--r--yql/essentials/utils/backtrace/backtrace_dummy.cpp11
-rw-r--r--yql/essentials/utils/backtrace/backtrace_lib.cpp78
-rw-r--r--yql/essentials/utils/backtrace/backtrace_lib.h18
-rw-r--r--yql/essentials/utils/backtrace/backtrace_linux.cpp62
-rw-r--r--yql/essentials/utils/backtrace/backtrace_ut.cpp28
-rw-r--r--yql/essentials/utils/backtrace/symbolize.cpp62
-rw-r--r--yql/essentials/utils/backtrace/symbolizer.h15
-rw-r--r--yql/essentials/utils/backtrace/symbolizer_linux.cpp150
-rw-r--r--yql/essentials/utils/backtrace/ut/ya.make10
-rw-r--r--yql/essentials/utils/backtrace/ya.make34
-rw-r--r--yql/essentials/utils/cast.h15
-rw-r--r--yql/essentials/utils/debug_info.cpp50
-rw-r--r--yql/essentials/utils/debug_info.h10
-rw-r--r--yql/essentials/utils/exceptions.cpp37
-rw-r--r--yql/essentials/utils/exceptions.h33
-rw-r--r--yql/essentials/utils/failure_injector/failure_injector.cpp64
-rw-r--r--yql/essentials/utils/failure_injector/failure_injector.h36
-rw-r--r--yql/essentials/utils/failure_injector/failure_injector_ut.cpp87
-rw-r--r--yql/essentials/utils/failure_injector/ut/ya.make15
-rw-r--r--yql/essentials/utils/failure_injector/ya.make16
-rw-r--r--yql/essentials/utils/fetch/fetch.cpp182
-rw-r--r--yql/essentials/utils/fetch/fetch.h24
-rw-r--r--yql/essentials/utils/fetch/ya.make16
-rw-r--r--yql/essentials/utils/fp_bits.h122
-rw-r--r--yql/essentials/utils/fp_bits_ut.cpp105
-rw-r--r--yql/essentials/utils/future_action.cpp1
-rw-r--r--yql/essentials/utils/future_action.h59
-rw-r--r--yql/essentials/utils/hash.cpp21
-rw-r--r--yql/essentials/utils/hash.h87
-rw-r--r--yql/essentials/utils/limiting_allocator.cpp35
-rw-r--r--yql/essentials/utils/limiting_allocator.h8
-rw-r--r--yql/essentials/utils/log/context.cpp89
-rw-r--r--yql/essentials/utils/log/context.h259
-rw-r--r--yql/essentials/utils/log/log.cpp368
-rw-r--r--yql/essentials/utils/log/log.h195
-rw-r--r--yql/essentials/utils/log/log_component.h130
-rw-r--r--yql/essentials/utils/log/log_level.h98
-rw-r--r--yql/essentials/utils/log/log_ut.cpp667
-rw-r--r--yql/essentials/utils/log/profile.cpp45
-rw-r--r--yql/essentials/utils/log/profile.h74
-rw-r--r--yql/essentials/utils/log/proto/logger_config.proto65
-rw-r--r--yql/essentials/utils/log/proto/ya.make11
-rw-r--r--yql/essentials/utils/log/tls_backend.cpp49
-rw-r--r--yql/essentials/utils/log/tls_backend.h67
-rw-r--r--yql/essentials/utils/log/tls_backend_ut.cpp121
-rw-r--r--yql/essentials/utils/log/ut/log_parser.h62
-rw-r--r--yql/essentials/utils/log/ut/ya.make8
-rw-r--r--yql/essentials/utils/log/ya.make22
-rw-r--r--yql/essentials/utils/md5_stream.cpp20
-rw-r--r--yql/essentials/utils/md5_stream.h19
-rw-r--r--yql/essentials/utils/md5_stream_ut.cpp47
-rw-r--r--yql/essentials/utils/method_index.cpp44
-rw-r--r--yql/essentials/utils/method_index.h24
-rw-r--r--yql/essentials/utils/multi_resource_lock.cpp42
-rw-r--r--yql/essentials/utils/multi_resource_lock.h81
-rw-r--r--yql/essentials/utils/multi_resource_lock_ut.cpp54
-rw-r--r--yql/essentials/utils/network/bind_in_range.cpp27
-rw-r--r--yql/essentials/utils/network/bind_in_range.h8
-rw-r--r--yql/essentials/utils/network/ya.make12
-rw-r--r--yql/essentials/utils/parse_double.cpp76
-rw-r--r--yql/essentials/utils/parse_double.h18
-rw-r--r--yql/essentials/utils/parse_double_ut.cpp55
-rw-r--r--yql/essentials/utils/prefetch.h13
-rw-r--r--yql/essentials/utils/proc_alive.cpp37
-rw-r--r--yql/essentials/utils/proc_alive.h9
-rw-r--r--yql/essentials/utils/rand_guid.cpp32
-rw-r--r--yql/essentials/utils/rand_guid.h30
-rw-r--r--yql/essentials/utils/range_walker.h47
-rw-r--r--yql/essentials/utils/range_walker_ut.cpp35
-rw-r--r--yql/essentials/utils/resetable_setting.h67
-rw-r--r--yql/essentials/utils/retry.cpp1
-rw-r--r--yql/essentials/utils/retry.h17
-rw-r--r--yql/essentials/utils/retry_ut.cpp73
-rw-r--r--yql/essentials/utils/rope/rope_over_buffer.cpp43
-rw-r--r--yql/essentials/utils/rope/rope_over_buffer.h11
-rw-r--r--yql/essentials/utils/rope/ya.make13
-rw-r--r--yql/essentials/utils/signals/signals.cpp300
-rw-r--r--yql/essentials/utils/signals/signals.h34
-rw-r--r--yql/essentials/utils/signals/utils.cpp122
-rw-r--r--yql/essentials/utils/signals/utils.h21
-rw-r--r--yql/essentials/utils/signals/ya.make20
-rw-r--r--yql/essentials/utils/sort.cpp1
-rw-r--r--yql/essentials/utils/sort.h27
-rw-r--r--yql/essentials/utils/swap_bytes.cpp1
-rw-r--r--yql/essentials/utils/swap_bytes.h20
-rw-r--r--yql/essentials/utils/sys/become_user.cpp188
-rw-r--r--yql/essentials/utils/sys/become_user.h26
-rw-r--r--yql/essentials/utils/sys/become_user_dummy.cpp26
-rw-r--r--yql/essentials/utils/sys/linux_version.cpp46
-rw-r--r--yql/essentials/utils/sys/linux_version.h13
-rw-r--r--yql/essentials/utils/sys/ya.make20
-rw-r--r--yql/essentials/utils/test_http_server/test_http_server.cpp151
-rw-r--r--yql/essentials/utils/test_http_server/test_http_server.h77
-rw-r--r--yql/essentials/utils/test_http_server/ya.make12
-rw-r--r--yql/essentials/utils/threading/async_queue.cpp18
-rw-r--r--yql/essentials/utils/threading/async_queue.h51
-rw-r--r--yql/essentials/utils/threading/ya.make7
-rw-r--r--yql/essentials/utils/url_builder.cpp50
-rw-r--r--yql/essentials/utils/url_builder.h26
-rw-r--r--yql/essentials/utils/url_builder_ut.cpp57
-rw-r--r--yql/essentials/utils/ut/ya.make14
-rw-r--r--yql/essentials/utils/utf8.cpp260
-rw-r--r--yql/essentials/utils/utf8.h16
-rw-r--r--yql/essentials/utils/utf8_ut.cpp99
-rw-r--r--yql/essentials/utils/ya.make67
-rw-r--r--yql/essentials/utils/yql_panic.cpp18
-rw-r--r--yql/essentials/utils/yql_panic.h22
-rw-r--r--yql/essentials/utils/yql_paths.cpp23
-rw-r--r--yql/essentials/utils/yql_paths.h9
-rw-r--r--yql/essentials/ya.make1
118 files changed, 6877 insertions, 9 deletions
diff --git a/yql/essentials/public/issue/ya.make b/yql/essentials/public/issue/ya.make
index 0179b23bbb..79f4d3e539 100644
--- a/yql/essentials/public/issue/ya.make
+++ b/yql/essentials/public/issue/ya.make
@@ -14,7 +14,7 @@ PEERDIR(
library/cpp/resource
contrib/ydb/public/api/protos
yql/essentials/public/issue/protos
- contrib/ydb/library/yql/utils
+ yql/essentials/utils
)
GENERATE_ENUM_SERIALIZATION(yql_warning.h)
diff --git a/yql/essentials/public/issue/yql_issue.cpp b/yql/essentials/public/issue/yql_issue.cpp
index 3fc8e3767c..50589e9976 100644
--- a/yql/essentials/public/issue/yql_issue.cpp
+++ b/yql/essentials/public/issue/yql_issue.cpp
@@ -1,7 +1,7 @@
#include "yql_issue.h"
#include "yql_issue_id.h"
-#include <contrib/ydb/library/yql/utils/utf8.h>
+#include <yql/essentials/utils/utf8.h>
#include <library/cpp/colorizer/output.h>
@@ -307,4 +307,4 @@ void Out<NYql::TIssue>(IOutputStream& out, const NYql::TIssue& error) {
template <>
void Out<NYql::TIssues>(IOutputStream& out, const NYql::TIssues& error) {
error.PrintTo(out);
-} \ No newline at end of file
+}
diff --git a/yql/essentials/public/result_format/ya.make b/yql/essentials/public/result_format/ya.make
index 82a206be86..3181d30365 100644
--- a/yql/essentials/public/result_format/ya.make
+++ b/yql/essentials/public/result_format/ya.make
@@ -13,7 +13,7 @@ PEERDIR(
library/cpp/yson/node
library/cpp/string_utils/base64
yql/essentials/public/issue
- contrib/ydb/library/yql/utils
+ yql/essentials/utils
)
END()
diff --git a/yql/essentials/public/result_format/yql_codec_results.cpp b/yql/essentials/public/result_format/yql_codec_results.cpp
index d375131b3d..911993c0c4 100644
--- a/yql/essentials/public/result_format/yql_codec_results.cpp
+++ b/yql/essentials/public/result_format/yql_codec_results.cpp
@@ -2,7 +2,7 @@
#include <library/cpp/string_utils/base64/base64.h>
-#include <contrib/ydb/library/yql/utils/utf8.h>
+#include <yql/essentials/utils/utf8.h>
namespace NYql {
namespace NResult {
diff --git a/yql/essentials/public/result_format/yql_restricted_yson.cpp b/yql/essentials/public/result_format/yql_restricted_yson.cpp
index da89a3c436..405411aebf 100644
--- a/yql/essentials/public/result_format/yql_restricted_yson.cpp
+++ b/yql/essentials/public/result_format/yql_restricted_yson.cpp
@@ -1,7 +1,7 @@
#include "yql_restricted_yson.h"
-#include <contrib/ydb/library/yql/utils/parse_double.h>
-#include <contrib/ydb/library/yql/utils/yql_panic.h>
+#include <yql/essentials/utils/parse_double.h>
+#include <yql/essentials/utils/yql_panic.h>
#include <library/cpp/yson/detail.h>
#include <library/cpp/yson/parser.h>
diff --git a/yql/essentials/public/result_format/yql_result_format_data.cpp b/yql/essentials/public/result_format/yql_result_format_data.cpp
index 5e1e89b74d..a4aeb2545d 100644
--- a/yql/essentials/public/result_format/yql_result_format_data.cpp
+++ b/yql/essentials/public/result_format/yql_result_format_data.cpp
@@ -3,8 +3,8 @@
#include "yql_result_format_impl.h"
#include "yql_restricted_yson.h"
-#include <contrib/ydb/library/yql/utils/parse_double.h>
-#include <contrib/ydb/library/yql/utils/utf8.h>
+#include <yql/essentials/utils/parse_double.h>
+#include <yql/essentials/utils/utf8.h>
#include <library/cpp/yson/node/node_builder.h>
#include <library/cpp/string_utils/base64/base64.h>
diff --git a/yql/essentials/utils/backtrace/backtrace.cpp b/yql/essentials/utils/backtrace/backtrace.cpp
new file mode 100644
index 0000000000..938ac90501
--- /dev/null
+++ b/yql/essentials/utils/backtrace/backtrace.cpp
@@ -0,0 +1,215 @@
+#include "backtrace.h"
+
+#include "backtrace_lib.h"
+#include "symbolizer.h"
+
+#include <library/cpp/deprecated/atomic/atomic.h>
+#include <library/cpp/malloc/api/malloc.h>
+
+#include <util/generic/string.h>
+#include <util/generic/xrange.h>
+#include <util/generic/yexception.h>
+#include <util/stream/format.h>
+#include <util/stream/output.h>
+#include <util/system/backtrace.h>
+#include <util/system/type_name.h>
+#include <util/system/execpath.h>
+#include <util/system/platform.h>
+#include <util/system/mlock.h>
+
+#ifdef _linux_
+#include <signal.h>
+#endif
+
+#include <functional>
+#include <vector>
+#include <sstream>
+#include <iostream>
+
+#ifndef _win_
+
+bool SetSignalHandler(int signo, void (*handler)(int)) {
+ struct sigaction sa;
+ memset(&sa, 0, sizeof(sa));
+ sa.sa_flags = SA_RESETHAND;
+ sa.sa_handler = handler;
+ sigfillset(&sa.sa_mask);
+ return sigaction(signo, &sa, nullptr) != -1;
+}
+
+namespace {
+#if defined(_linux_) && defined(_x86_64_)
+ bool SetSignalAction(int signo, void (*handler)(int, siginfo_t*, void*)) {
+ struct sigaction sa;
+ memset(&sa, 0, sizeof(sa));
+ sa.sa_flags = SA_RESETHAND | SA_SIGINFO;
+ sa.sa_sigaction = (decltype(sa.sa_sigaction))handler;
+ sigfillset(&sa.sa_mask);
+ return sigaction(signo, &sa, nullptr) != -1;
+ }
+#endif
+} // namespace
+#endif // _win_
+
+TAtomic BacktraceStarted = 0;
+
+void SetFatalSignalHandler(void (*handler)(int)) {
+ Y_UNUSED(handler);
+#ifndef _win_
+ for (int signo: {SIGSEGV, SIGILL, SIGABRT, SIGFPE}) {
+ if (!SetSignalHandler(signo, handler)) {
+ ythrow TSystemError() << "Cannot set handler for signal " << strsignal(signo);
+ }
+ }
+#endif
+}
+
+#if defined(_linux_) && defined(_x86_64_)
+void SetFatalSignalAction(void (*sigaction)(int, siginfo_t*, void*))
+{
+ for (int signo: {SIGSEGV, SIGILL, SIGABRT, SIGFPE}) {
+ if (!SetSignalAction(signo, sigaction)) {
+ ythrow TSystemError() << "Cannot set sigaction for signal " << strsignal(signo);
+ }
+ }
+}
+#endif
+
+namespace {
+ std::vector<std::function<void(int)>> Before, After;
+ bool KikimrSymbolize = false;
+ NYql::NBacktrace::TCollectedFrame Frames[NYql::NBacktrace::Limit];
+
+ void CallCallbacks(decltype(Before)& where, int signum) {
+ for (const auto &fn: where) {
+ if (fn) {
+ fn(signum);
+ }
+ }
+ }
+
+ void PrintFrames(IOutputStream* out, const NYql::NBacktrace::TCollectedFrame* frames, size_t cnt);
+
+ void DoBacktrace(IOutputStream* out, void* data) {
+ auto cnt = NYql::NBacktrace::CollectFrames(Frames, data);
+ PrintFrames(out, Frames, cnt);
+ }
+
+ void DoBacktrace(IOutputStream* out, void** stack, size_t cnt) {
+ Y_UNUSED(NYql::NBacktrace::CollectFrames(Frames, stack, cnt));
+ PrintFrames(out, Frames, cnt);
+ }
+
+
+ void SignalHandler(int signum) {
+ CallCallbacks(Before, signum);
+
+ if (!NMalloc::IsAllocatorCorrupted) {
+ if (!AtomicTryLock(&BacktraceStarted)) {
+ return;
+ }
+
+ UnlockAllMemory();
+ DoBacktrace(&Cerr, nullptr);
+ }
+
+ CallCallbacks(After, signum);
+ raise(signum);
+ }
+
+#if defined(_linux_) && defined(_x86_64_)
+ void SignalAction(int signum, siginfo_t*, void* context) {
+ Y_UNUSED(SignalHandler);
+ CallCallbacks(Before, signum);
+
+ if (!NMalloc::IsAllocatorCorrupted) {
+ if (!AtomicTryLock(&BacktraceStarted)) {
+ return;
+ }
+
+ UnlockAllMemory();
+ DoBacktrace(&Cerr, context);
+ }
+
+ CallCallbacks(After, signum);
+ raise(signum);
+ }
+#endif
+}
+
+namespace NYql {
+ namespace NBacktrace {
+ THashMap<TString, TString> Mapping;
+
+ void SetModulesMapping(const THashMap<TString, TString>& mapping) {
+ Mapping = mapping;
+ }
+
+ void AddBeforeFatalCallback(const std::function<void(int)>& before) {
+ Before.push_back(before);
+ }
+
+ void AddAfterFatalCallback(const std::function<void(int)>& after) {
+ After.push_back(after);
+ }
+
+ void RegisterKikimrFatalActions() {
+#if defined(_linux_) && defined(_x86_64_)
+ SetFatalSignalAction(SignalAction);
+#else
+ SetFatalSignalHandler(SignalHandler);
+#endif
+ }
+
+ void EnableKikimrSymbolize() {
+ KikimrSymbolize = true;
+ }
+
+ void KikimrBackTrace() {
+ FormatBackTrace(&Cerr);
+ }
+
+ void KikimrBackTraceFormatImpl(IOutputStream* out) {
+ KikimrSymbolize = true;
+ UnlockAllMemory();
+ DoBacktrace(out, nullptr);
+ }
+
+ void KikimrBacktraceFormatImpl(IOutputStream* out, void* const* stack, size_t stackSize) {
+ KikimrSymbolize = true;
+ DoBacktrace(out, (void**)stack, stackSize);
+ }
+
+ }
+}
+
+void EnableKikimrBacktraceFormat() {
+ SetFormatBackTraceFn(NYql::NBacktrace::KikimrBacktraceFormatImpl);
+}
+
+namespace {
+ NYql::NBacktrace::TStackFrame SFrames[NYql::NBacktrace::Limit];
+ void PrintFrames(IOutputStream* out, const NYql::NBacktrace::TCollectedFrame* frames, size_t count) {
+ auto& outp = *out;
+ Y_UNUSED(SFrames);
+#if defined(_linux_) && defined(_x86_64_)
+ if (KikimrSymbolize) {
+ for (size_t i = 0; i < count; ++i) {
+ SFrames[i] = NYql::NBacktrace::TStackFrame{frames[i].File, frames[i].Address};
+ }
+ NYql::NBacktrace::Symbolize(SFrames, count, out);
+ return;
+ }
+#endif
+ outp << "StackFrames: " << count << "\n";
+ for (size_t i = 0; i < count; ++i) {
+ auto& frame = frames[i];
+ auto fileName = frame.File;
+ if (!strcmp(fileName, "/proc/self/exe")) {
+ fileName = "EXE";
+ }
+ auto it = NYql::NBacktrace::Mapping.find(fileName);
+ outp << "StackFrame: " << (it == NYql::NBacktrace::Mapping.end() ? fileName : it->second) << " " << frame.Address << " 0\n";
+ }
+ }
+} \ No newline at end of file
diff --git a/yql/essentials/utils/backtrace/backtrace.h b/yql/essentials/utils/backtrace/backtrace.h
new file mode 100644
index 0000000000..cd843d8cb4
--- /dev/null
+++ b/yql/essentials/utils/backtrace/backtrace.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <util/system/types.h>
+#include <util/generic/string.h>
+#include <util/generic/hash.h>
+
+#include <functional>
+
+bool SetSignalHandler(int signo, void (*handler)(int));
+class IOutputStream;
+
+void EnableKikimrBacktraceFormat();
+
+namespace NYql {
+
+namespace NBacktrace {
+const int Limit = 400;
+
+void RegisterKikimrFatalActions();
+void AddAfterFatalCallback(const std::function<void(int)>& after);
+void AddBeforeFatalCallback(const std::function<void(int)>& before);
+void EnableKikimrSymbolize();
+
+void KikimrBackTrace();
+void KikimrBackTraceFormatImpl(IOutputStream*);
+void KikimrBacktraceFormatImpl(IOutputStream* out, void* const* stack, size_t stackSize);
+
+void SetModulesMapping(const THashMap<TString, TString>& mapping);
+
+TString Symbolize(const TString& input, const THashMap<TString, TString>& mapping);
+
+} /* namespace Backtrace */
+
+} /* namespace NYql */
diff --git a/yql/essentials/utils/backtrace/backtrace_dummy.cpp b/yql/essentials/utils/backtrace/backtrace_dummy.cpp
new file mode 100644
index 0000000000..ec54e55dca
--- /dev/null
+++ b/yql/essentials/utils/backtrace/backtrace_dummy.cpp
@@ -0,0 +1,11 @@
+#include "backtrace_lib.h"
+
+#include <util/system/backtrace.h>
+
+namespace NYql {
+ namespace NBacktrace {
+ size_t CollectBacktrace(void** addresses, size_t limit, void*) {
+ return BackTrace(addresses, limit);
+ }
+ }
+} \ No newline at end of file
diff --git a/yql/essentials/utils/backtrace/backtrace_lib.cpp b/yql/essentials/utils/backtrace/backtrace_lib.cpp
new file mode 100644
index 0000000000..c0c7b2c667
--- /dev/null
+++ b/yql/essentials/utils/backtrace/backtrace_lib.cpp
@@ -0,0 +1,78 @@
+#include "backtrace_lib.h"
+
+#include <util/generic/hash.h>
+#include <util/system/execpath.h>
+
+#include <algorithm>
+
+#if defined(_linux_) && defined(_x86_64_)
+#include <dlfcn.h>
+#include <link.h>
+#endif
+
+namespace {
+ const size_t Limit = 400;
+ void* Stack[Limit];
+
+ struct TDllInfo {
+ const char* Path;
+ ui64 BaseAddress;
+ };
+
+ const size_t MaxDLLCnt = 100;
+ TDllInfo DLLs[MaxDLLCnt];
+ size_t DLLCount = 0;
+
+#if defined(_linux_) && defined(_x86_64_)
+ int DlIterCallback(struct dl_phdr_info *info, size_t, void *data) {
+ if (*info->dlpi_name) {
+ if (DLLCount + 1 < MaxDLLCnt) {
+ reinterpret_cast<std::remove_reference_t<decltype(DLLs[0])>*>(data)[DLLCount++] = { info->dlpi_name, (ui64)info->dlpi_addr };
+ }
+ }
+ return 0;
+ }
+#endif
+ bool comp(const TDllInfo& a, const TDllInfo& b) {
+ return strcmp(a.Path, b.Path) < 0;
+ }
+
+}
+
+namespace NYql {
+ namespace NBacktrace {
+ TCollectedFrame::TCollectedFrame(uintptr_t addr) {
+ File = GetPersistentExecPath().c_str();
+ Address = addr;
+#if defined(_linux_) && defined(_x86_64_)
+ Dl_info dlInfo;
+ memset(&dlInfo, 0, sizeof(dlInfo));
+ auto ret = dladdr(reinterpret_cast<void*>(addr), &dlInfo);
+ if (ret) {
+ auto it = std::lower_bound(DLLs, DLLs + DLLCount, std::remove_reference_t<decltype(DLLs[0])> {dlInfo.dli_fname, {}}, comp);
+ if (it != DLLs + DLLCount && !strcmp(it->Path, dlInfo.dli_fname)) {
+ File = it->Path;
+ Address -= it->BaseAddress;
+ }
+ }
+#endif
+ }
+
+ size_t CollectFrames(TCollectedFrame* frames, void* data) {
+#if defined(_linux_) && defined(_x86_64_)
+ DLLCount = 0;
+ dl_iterate_phdr(DlIterCallback, &DLLs);
+#endif
+ std::stable_sort(DLLs, DLLs + DLLCount, comp);
+ size_t cnt = CollectBacktrace(Stack, Limit, data);
+ return CollectFrames(frames, Stack, cnt);
+ }
+
+ size_t CollectFrames(TCollectedFrame* frames, void** stack, size_t cnt) {
+ for (size_t i = 0; i < cnt; ++i) {
+ new (frames + i)TCollectedFrame(reinterpret_cast<uintptr_t>(stack[i]));
+ }
+ return cnt;
+ }
+ }
+} \ No newline at end of file
diff --git a/yql/essentials/utils/backtrace/backtrace_lib.h b/yql/essentials/utils/backtrace/backtrace_lib.h
new file mode 100644
index 0000000000..3404716da6
--- /dev/null
+++ b/yql/essentials/utils/backtrace/backtrace_lib.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+
+namespace NYql {
+ namespace NBacktrace {
+ size_t CollectBacktrace(void** addresses, size_t limit, void* data);
+ struct TCollectedFrame {
+ TCollectedFrame(uintptr_t addr);
+ TCollectedFrame() = default;
+ const char* File;
+ size_t Address;
+ };
+ size_t CollectFrames(TCollectedFrame* frames, void* data);
+ size_t CollectFrames(TCollectedFrame* frames, void** stack, size_t cnt);
+ }
+} \ No newline at end of file
diff --git a/yql/essentials/utils/backtrace/backtrace_linux.cpp b/yql/essentials/utils/backtrace/backtrace_linux.cpp
new file mode 100644
index 0000000000..c9a1bd5a22
--- /dev/null
+++ b/yql/essentials/utils/backtrace/backtrace_linux.cpp
@@ -0,0 +1,62 @@
+#include "backtrace_lib.h"
+
+#include <libunwind.h>
+#include <signal.h>
+
+#include <util/system/backtrace.h>
+
+namespace {
+ size_t BackTrace(void** p, size_t len, ucontext_t* con) {
+ unw_context_t context;
+ unw_cursor_t cursor;
+ if (unw_getcontext(&context)) {
+ return 0;
+ }
+
+ if (unw_init_local(&cursor, &context)) {
+ return 0;
+ }
+ const sigcontext* signal_mcontext = (const sigcontext*)&(con->uc_mcontext);
+ unw_set_reg(&cursor, UNW_X86_64_RSI, signal_mcontext->rsi);
+ unw_set_reg(&cursor, UNW_X86_64_RDI, signal_mcontext->rdi);
+ unw_set_reg(&cursor, UNW_X86_64_RBP, signal_mcontext->rbp);
+ unw_set_reg(&cursor, UNW_X86_64_RAX, signal_mcontext->rax);
+ unw_set_reg(&cursor, UNW_X86_64_RBX, signal_mcontext->rbx);
+ unw_set_reg(&cursor, UNW_X86_64_RCX, signal_mcontext->rcx);
+ unw_set_reg(&cursor, UNW_X86_64_R8, signal_mcontext->r8);
+ unw_set_reg(&cursor, UNW_X86_64_R9, signal_mcontext->r9);
+ unw_set_reg(&cursor, UNW_X86_64_R10, signal_mcontext->r10);
+ unw_set_reg(&cursor, UNW_X86_64_R11, signal_mcontext->r11);
+ unw_set_reg(&cursor, UNW_X86_64_R12, signal_mcontext->r12);
+ unw_set_reg(&cursor, UNW_X86_64_R13, signal_mcontext->r13);
+ unw_set_reg(&cursor, UNW_X86_64_R14, signal_mcontext->r14);
+ unw_set_reg(&cursor, UNW_X86_64_R15, signal_mcontext->r15);
+ unw_set_reg(&cursor, UNW_X86_64_RSP, signal_mcontext->rsp);
+
+ unw_set_reg(&cursor, UNW_REG_SP, signal_mcontext->rsp);
+ unw_set_reg(&cursor, UNW_REG_IP, signal_mcontext->rip);
+
+ size_t pos = 0;
+ p[pos++] = (void*)signal_mcontext->rip;
+ while (pos < len && unw_step(&cursor) > 0) {
+ unw_word_t ip = 0;
+ unw_get_reg(&cursor, UNW_REG_IP, &ip);
+ if (unw_is_signal_frame(&cursor)) {
+ continue;
+ }
+ p[pos++] = (void*)ip;
+ }
+ return pos;
+ }
+}
+
+namespace NYql {
+ namespace NBacktrace {
+ size_t CollectBacktrace(void** addresses, size_t limit, void* data) {
+ if (!data) {
+ return BackTrace(addresses, limit);
+ }
+ return BackTrace(addresses, limit, reinterpret_cast<ucontext_t*>(data));
+ }
+ }
+} \ No newline at end of file
diff --git a/yql/essentials/utils/backtrace/backtrace_ut.cpp b/yql/essentials/utils/backtrace/backtrace_ut.cpp
new file mode 100644
index 0000000000..822060ed32
--- /dev/null
+++ b/yql/essentials/utils/backtrace/backtrace_ut.cpp
@@ -0,0 +1,28 @@
+#include "backtrace.h"
+#include <util/generic/vector.h>
+#include <util/generic/string.h>
+#include <library/cpp/testing/unittest/registar.h>
+namespace {
+ Y_NO_INLINE void TestTrace394() {
+ TStringStream ss;
+ NYql::NBacktrace::KikimrBackTraceFormatImpl(&ss);
+#if !defined(_hardening_enabled_) && !defined(_win_)
+ UNIT_ASSERT_STRING_CONTAINS(ss.Str(), "(anonymous namespace)::TestTrace394");
+#endif
+ }
+ Y_NO_INLINE void TestTrace39114() {
+ TStringStream ss;
+ NYql::NBacktrace::KikimrBackTraceFormatImpl(&ss);
+#if !defined(_hardening_enabled_) && !defined(_win_)
+ UNIT_ASSERT_STRING_CONTAINS(ss.Str(), "(anonymous namespace)::TestTrace39114");
+#endif
+ }
+}
+
+Y_UNIT_TEST_SUITE(TEST_BACKTRACE_AND_SYMBOLIZE) {
+ Y_UNIT_TEST(TEST_NO_KIKIMR) {
+ NYql::NBacktrace::EnableKikimrSymbolize();
+ TestTrace394();
+ TestTrace39114();
+ }
+}
diff --git a/yql/essentials/utils/backtrace/symbolize.cpp b/yql/essentials/utils/backtrace/symbolize.cpp
new file mode 100644
index 0000000000..360ff408c7
--- /dev/null
+++ b/yql/essentials/utils/backtrace/symbolize.cpp
@@ -0,0 +1,62 @@
+#include "backtrace.h"
+
+#include "symbolizer.h"
+
+#include <util/string/split.h>
+#include <util/stream/str.h>
+
+namespace NYql {
+
+ namespace NBacktrace {
+ TString Symbolize(const TString& input, const THashMap<TString, TString>& mapping) {
+#if defined(__linux__) && defined(__x86_64__)
+ TString output;
+ TStringOutput out(output);
+
+ i64 stackSize = -1;
+ TVector<TStackFrame> frames;
+ TVector<TString> usedFilenames;
+ for (TStringBuf line: StringSplitter(input).SplitByString("\n")) {
+ if (line.StartsWith("StackFrames:")) {
+ TVector<TString> parts;
+ Split(TString(line), " ", parts);
+ if (parts.size() > 1) {
+ TryFromString<i64>(parts[1], stackSize);
+ frames.reserve(stackSize);
+ }
+ } else if (line.StartsWith("StackFrame:")) {
+ TVector<TString> parts;
+ Split(TString(line), " ", parts);
+ TString modulePath;
+ ui64 address;
+ ui64 offset;
+ if (parts.size() > 3) {
+ modulePath = parts[1];
+ TryFromString<ui64>(parts[2], address);
+ TryFromString<ui64>(parts[3], offset);
+ auto it = mapping.find(modulePath);
+ if (it != mapping.end()) {
+ modulePath = it->second;
+ }
+ usedFilenames.emplace_back(std::move(modulePath));
+ frames.emplace_back(TStackFrame{usedFilenames.back().c_str(), address - offset});
+ }
+ } else {
+ out << line << "\n";
+ }
+ }
+
+ if (stackSize == 0) {
+ out << "Empty stack trace\n";
+ }
+ Symbolize(frames.data(), frames.size(), &out);
+ return output;
+#else
+ Y_UNUSED(mapping);
+ return input;
+#endif
+ }
+
+ } /* namespace NBacktrace */
+
+} /* namespace NYql */
diff --git a/yql/essentials/utils/backtrace/symbolizer.h b/yql/essentials/utils/backtrace/symbolizer.h
new file mode 100644
index 0000000000..0d32ba25d3
--- /dev/null
+++ b/yql/essentials/utils/backtrace/symbolizer.h
@@ -0,0 +1,15 @@
+#pragma once
+#include "backtrace.h"
+
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+
+namespace NYql {
+ namespace NBacktrace {
+ struct TStackFrame {
+ const char* File;
+ size_t Address;
+ };
+ void Symbolize(const TStackFrame* frames, size_t count, IOutputStream* out);
+ }
+} \ No newline at end of file
diff --git a/yql/essentials/utils/backtrace/symbolizer_linux.cpp b/yql/essentials/utils/backtrace/symbolizer_linux.cpp
new file mode 100644
index 0000000000..222bc5df2e
--- /dev/null
+++ b/yql/essentials/utils/backtrace/symbolizer_linux.cpp
@@ -0,0 +1,150 @@
+#include "symbolizer.h"
+
+#include <contrib/libs/backtrace/backtrace.h>
+
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+#include <util/system/type_name.h>
+#include <util/system/execpath.h>
+#include <util/string/builder.h>
+
+#include <mutex>
+#include <numeric>
+#include <algorithm>
+
+#include <util/stream/mem.h>
+
+#ifdef __GNUC__
+ #include <cxxabi.h>
+#endif
+
+namespace {
+const size_t MaxStrLen = 512;
+const size_t MaxDemangleLen = 1024 * 1024;
+char Buff[MaxDemangleLen];
+
+class TNoThrowingMemoryOutput : public TMemoryOutput {
+public:
+ TNoThrowingMemoryOutput(void* c, size_t l) : TMemoryOutput(c, l) {}
+ void Truncate() {
+ *(Buf_ - 1) = '.';
+ *(Buf_ - 2) = '.';
+ *(Buf_ - 3) = '.';
+ }
+
+ void DoWrite(const void* buf, size_t len) override {
+ bool truncated = Buf_ + len > End_;
+ if (truncated) {
+ len = std::min(len, (size_t)(End_ - Buf_));
+ }
+ memcpy(Buf_, buf, len);
+ Buf_ += len;
+ if (truncated) {
+ Truncate();
+ }
+ }
+
+ void DoWriteC(char c) override {
+ if (Buf_ == End_) {
+ Truncate();
+ } else {
+ *Buf_++ = c;
+ }
+ }
+};
+
+void HandleLibBacktraceError(void* data, const char* msg, int) {
+ if (!data) {
+ Cerr << msg;
+ return;
+ }
+ TNoThrowingMemoryOutput out(data, MaxStrLen - 1);
+ out << msg;
+}
+
+const char* Demangle(const char* name) {
+#ifndef __GNUC__
+ return name;
+#else
+ int status;
+ size_t len = MaxDemangleLen - 1;
+ const char* res = __cxxabiv1::__cxa_demangle(name, Buff, &len, &status);
+
+ if (!res) {
+ return name;
+ }
+ return res;
+#endif
+}
+
+int HandleLibBacktraceFrame(void* data, uintptr_t, const char* filename, int lineno, const char* function) {
+ TNoThrowingMemoryOutput out(data, MaxStrLen - 1);
+ const char* fileName = filename ? filename : "???";
+ const char* functionName = function ? Demangle(function) : "???";
+ out << functionName << " at " << fileName << ":" << lineno << ":0";
+ return 0;
+}
+}
+
+namespace NYql {
+ namespace NBacktrace {
+ namespace {
+ std::mutex Mutex;
+ char* Result[Limit];
+ size_t Order[Limit];
+ char TmpBuffer[MaxStrLen * Limit]{};
+ auto CreateState(const char* filename) {
+ return backtrace_create_state(
+ filename,
+ 0,
+ HandleLibBacktraceError,
+ nullptr
+ );
+ }
+ }
+
+ void Symbolize(const TStackFrame* frames, size_t count, IOutputStream* out) {
+ if (!count) {
+ return;
+ }
+ memset(TmpBuffer, 0, sizeof(TmpBuffer));
+ Result[0] = TmpBuffer;
+ for (size_t i = 1; i < Limit; ++i) {
+ Result[i] = Result[i - 1] + MaxStrLen;
+ }
+ const std::lock_guard lock{Mutex};
+
+ std::iota(Order, Order + count, 0u);
+ std::sort(Order, Order + count, [&frames](auto a, auto b) { return strcmp(frames[a].File, frames[b].File) < 0; });
+
+ struct backtrace_state* state = nullptr;
+ for (size_t i = 0; i < count; ++i) {
+ if (!i || frames[Order[i - 1]].File != frames[Order[i]].File) {
+ state = CreateState(frames[Order[i]].File);
+ }
+
+ if (!state) {
+ Result[Order[i]] = nullptr; // File not found
+ continue;
+ }
+
+ int status = backtrace_pcinfo(
+ state,
+ reinterpret_cast<uintptr_t>(frames[Order[i]].Address) - 1, // last byte of the call instruction
+ HandleLibBacktraceFrame,
+ HandleLibBacktraceError,
+ reinterpret_cast<void*>(Result[Order[i]]));
+ if (0 != status) {
+ break;
+ }
+ }
+ for (size_t i = 0; i < count; ++i) {
+ if (Result[i]) {
+ *out << Result[i] << "\n";
+ } else {
+ *out << "File `" << frames[i].File << "` not found\n";
+ }
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/yql/essentials/utils/backtrace/ut/ya.make b/yql/essentials/utils/backtrace/ut/ya.make
new file mode 100644
index 0000000000..c54c56275a
--- /dev/null
+++ b/yql/essentials/utils/backtrace/ut/ya.make
@@ -0,0 +1,10 @@
+UNITTEST_FOR(yql/essentials/utils/backtrace)
+
+
+IF (OS_LINUX AND ARCH_X86_64)
+ SRCS(
+ backtrace_ut.cpp
+ )
+ENDIF()
+
+END()
diff --git a/yql/essentials/utils/backtrace/ya.make b/yql/essentials/utils/backtrace/ya.make
new file mode 100644
index 0000000000..30c2dd1cc1
--- /dev/null
+++ b/yql/essentials/utils/backtrace/ya.make
@@ -0,0 +1,34 @@
+LIBRARY()
+
+SRCS(
+ backtrace.cpp
+ backtrace_lib.cpp
+ symbolize.cpp
+)
+
+PEERDIR(
+ library/cpp/deprecated/atomic
+)
+
+
+IF (OS_LINUX AND ARCH_X86_64)
+ SRCS(
+ backtrace_linux.cpp
+ symbolizer_linux.cpp
+ )
+
+ PEERDIR(
+ contrib/libs/backtrace
+ contrib/libs/libunwind
+ )
+ ADDINCL(contrib/libs/libunwind/include)
+
+ELSE()
+ SRCS(
+ backtrace_dummy.cpp
+ )
+ENDIF()
+
+END()
+
+RECURSE_FOR_TESTS(ut) \ No newline at end of file
diff --git a/yql/essentials/utils/cast.h b/yql/essentials/utils/cast.h
new file mode 100644
index 0000000000..dac8508092
--- /dev/null
+++ b/yql/essentials/utils/cast.h
@@ -0,0 +1,15 @@
+#pragma once
+#include "yql_panic.h"
+
+namespace NYql {
+
+template<class T, class F>
+[[nodiscard]]
+inline T EnsureDynamicCast(F from) {
+ YQL_ENSURE(from, "source should not be null");
+ T result = dynamic_cast<T>(from);
+ YQL_ENSURE(result, "dynamic_cast failed");
+ return result;
+}
+
+} // namespace NYql
diff --git a/yql/essentials/utils/debug_info.cpp b/yql/essentials/utils/debug_info.cpp
new file mode 100644
index 0000000000..ff56166695
--- /dev/null
+++ b/yql/essentials/utils/debug_info.cpp
@@ -0,0 +1,50 @@
+#include "debug_info.h"
+
+#include <util/system/thread.h>
+#include <util/system/tls.h>
+#include <util/stream/file.h>
+#include <util/generic/string.h>
+
+#include <string.h>
+
+
+namespace NYql {
+
+static const size_t OPERATION_ID_MAX_LENGTH = 24;
+static const size_t THREAD_NAME_MAX_LENGTH = 16;
+
+
+struct TDebugInfo {
+ char OperationId[OPERATION_ID_MAX_LENGTH + 1];
+};
+
+Y_POD_THREAD(TDebugInfo) TlsDebugInfo;
+
+
+void SetCurrentOperationId(const char* operationId) {
+ size_t len = strlcpy(
+ (&TlsDebugInfo)->OperationId,
+ operationId,
+ OPERATION_ID_MAX_LENGTH);
+
+ const char* threadName = nullptr;
+ if (len > THREAD_NAME_MAX_LENGTH) {
+ threadName = operationId + (len - THREAD_NAME_MAX_LENGTH + 1);
+ } else {
+ threadName = operationId;
+ }
+ TThread::SetCurrentThreadName(threadName);
+}
+
+long GetRunnigThreadsCount() {
+ TString procStat = TFileInput("/proc/self/stat").ReadAll();
+ long num_threads = -2; // Number of threads in this process (since Linux 2.6)
+
+ int n = sscanf(procStat.data(),
+ "%*d %*s %*c %*d %*d %*d %*d %*d %*u %*u %*u %*u %*u %*u %*u %*d %*d %*d %*d %ld",
+ &num_threads);
+
+ return n == 1 ? num_threads : -2;
+}
+
+} // namespace NYql
diff --git a/yql/essentials/utils/debug_info.h b/yql/essentials/utils/debug_info.h
new file mode 100644
index 0000000000..3e2a55140b
--- /dev/null
+++ b/yql/essentials/utils/debug_info.h
@@ -0,0 +1,10 @@
+#pragma once
+
+
+namespace NYql {
+
+void SetCurrentOperationId(const char* operationId);
+
+long GetRunnigThreadsCount();
+
+} // namespace NYql
diff --git a/yql/essentials/utils/exceptions.cpp b/yql/essentials/utils/exceptions.cpp
new file mode 100644
index 0000000000..8a6d09fc9d
--- /dev/null
+++ b/yql/essentials/utils/exceptions.cpp
@@ -0,0 +1,37 @@
+#include "exceptions.h"
+
+#include <util/string/builder.h>
+
+namespace NYql {
+
+TCodeLineException::TCodeLineException(ui32 code)
+ : SourceLocation("", 0)
+ , Code(code)
+{}
+
+TCodeLineException::TCodeLineException(const TSourceLocation& sl, const TCodeLineException& t)
+ : yexception(t)
+ , SourceLocation(sl)
+ , Code(t.Code)
+{}
+
+const char* TCodeLineException::GetRawMessage() const {
+ return yexception::what();
+}
+
+const char* TCodeLineException::what() const noexcept {
+ try {
+ if (!Message) {
+ Message = TStringBuilder{} << SourceLocation << TStringBuf(": ") << yexception::what();
+ }
+ return Message.c_str();
+ } catch(...) {
+ return "Unexpected exception in TCodeLineException::what()";
+ }
+}
+
+TCodeLineException operator+(const TSourceLocation& sl, TCodeLineException&& t) {
+ return TCodeLineException(sl, t);
+}
+
+} // namespace NFq \ No newline at end of file
diff --git a/yql/essentials/utils/exceptions.h b/yql/essentials/utils/exceptions.h
new file mode 100644
index 0000000000..8df5307da2
--- /dev/null
+++ b/yql/essentials/utils/exceptions.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+
+namespace NYql {
+
+// This exception can separate code line and file name from the error message
+struct TCodeLineException: public yexception {
+
+ TSourceLocation SourceLocation;
+ mutable TString Message;
+ ui32 Code;
+
+ TCodeLineException(ui32 code);
+
+ TCodeLineException(const TSourceLocation& sl, const TCodeLineException& t);
+
+ virtual const char* what() const noexcept override;
+
+ const char* GetRawMessage() const;
+
+};
+
+TCodeLineException operator+(const TSourceLocation& sl, TCodeLineException&& t);
+
+#define YQL_ENSURE_CODELINE(CONDITION, CODE, ...) \
+ do { \
+ if (Y_UNLIKELY(!(CONDITION))) { \
+ ythrow TCodeLineException(CODE) << __VA_ARGS__; \
+ } \
+ } while (0)
+
+} // namespace NYql \ No newline at end of file
diff --git a/yql/essentials/utils/failure_injector/failure_injector.cpp b/yql/essentials/utils/failure_injector/failure_injector.cpp
new file mode 100644
index 0000000000..4f4f9c1eb6
--- /dev/null
+++ b/yql/essentials/utils/failure_injector/failure_injector.cpp
@@ -0,0 +1,64 @@
+#include "failure_injector.h"
+
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/utils/yql_panic.h>
+
+#include <util/generic/singleton.h>
+
+namespace NYql {
+
+void TFailureInjector::Activate() {
+ Singleton<TFailureInjector>()->ActivateImpl();
+}
+
+void TFailureInjector::Set(std::string_view name, ui64 skip, ui64 countOfFails) {
+ Singleton<TFailureInjector>()->SetImpl(name, skip, countOfFails);
+}
+
+void TFailureInjector::Reach(std::string_view name, std::function<void()> action) {
+ Singleton<TFailureInjector>()->ReachImpl(name, action);
+}
+
+void TFailureInjector::ActivateImpl() {
+ Enabled_.store(true);
+ YQL_LOG(DEBUG) << "TFailureInjector::Activate";
+}
+
+THashMap<TString, TFailureInjector::TFailureSpec> TFailureInjector::GetCurrentState() {
+ return Singleton<TFailureInjector>()->GetCurrentStateImpl();
+}
+
+THashMap<TString, TFailureInjector::TFailureSpec> TFailureInjector::GetCurrentStateImpl() {
+ THashMap<TString, TFailureInjector::TFailureSpec> copy;
+ with_lock(Lock) {
+ copy = FailureSpecs;
+ }
+ return copy;
+}
+
+void TFailureInjector::ReachImpl(std::string_view name, std::function<void()> action) {
+ if (!Enabled_.load()) {
+ return;
+ }
+ with_lock(Lock) {
+ if (auto failureSpec = FailureSpecs.FindPtr(name)) {
+ YQL_LOG(DEBUG) << "TFailureInjector::Reach: " << name << ", Skip=" << failureSpec->Skip << ", Fails=" << failureSpec->CountOfFails;
+ if (failureSpec->Skip > 0) {
+ --failureSpec->Skip;
+ } else if (failureSpec->CountOfFails > 0) {
+ YQL_LOG(DEBUG) << "TFailureInjector::OnReach: " << name;
+ --failureSpec->CountOfFails;
+ action();
+ }
+ }
+ }
+}
+
+void TFailureInjector::SetImpl(std::string_view name, ui64 skip, ui64 countOfFails) {
+ with_lock(Lock) {
+ YQL_ENSURE(countOfFails > 0, "failure " << name << ", 'countOfFails' must be positive");
+ FailureSpecs[TString{name}] = TFailureSpec{skip, countOfFails};
+ }
+}
+
+} // NYql
diff --git a/yql/essentials/utils/failure_injector/failure_injector.h b/yql/essentials/utils/failure_injector/failure_injector.h
new file mode 100644
index 0000000000..6db79fc8c0
--- /dev/null
+++ b/yql/essentials/utils/failure_injector/failure_injector.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#include <util/generic/hash.h>
+#include <util/generic/string.h>
+#include <util/system/mutex.h>
+
+#include <string_view>
+
+namespace NYql {
+
+class TFailureInjector {
+public:
+ struct TFailureSpec {
+ ui64 Skip;
+ ui64 CountOfFails;
+ };
+
+ static void Activate();
+
+ static void Set(std::string_view name, ui64 skip, ui64 countOfFails);
+ static void Reach(std::string_view name, std::function<void()> action);
+ static THashMap<TString, TFailureSpec> GetCurrentState();
+
+private:
+ void ActivateImpl();
+
+ void SetImpl(std::string_view name, ui64 skip, ui64 countOfFails);
+ void ReachImpl(std::string_view name, std::function<void()> action);
+ THashMap<TString, TFailureSpec> GetCurrentStateImpl();
+
+ std::atomic<bool> Enabled_ = false;
+ THashMap<TString, TFailureSpec> FailureSpecs;
+ TMutex Lock;
+};
+
+} // NYql
diff --git a/yql/essentials/utils/failure_injector/failure_injector_ut.cpp b/yql/essentials/utils/failure_injector/failure_injector_ut.cpp
new file mode 100644
index 0000000000..4d94b5cce7
--- /dev/null
+++ b/yql/essentials/utils/failure_injector/failure_injector_ut.cpp
@@ -0,0 +1,87 @@
+#include "failure_injector.h"
+
+#include <yql/essentials/utils/log/log.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/datetime/base.h>
+
+#include <chrono>
+
+using namespace NYql;
+using namespace NYql::NLog;
+using namespace std::chrono;
+
+// do nothing
+void OnReach(std::atomic<bool>& called) {
+ called.store(true);
+}
+
+void SetUpLogger() {
+ TString logType = "cout";
+ NLog::InitLogger(logType, false);
+ NLog::EComponentHelpers::ForEach([](NLog::EComponent component) {
+ NLog::YqlLogger().SetComponentLevel(component, ELevel::DEBUG);
+ });
+}
+
+Y_UNIT_TEST_SUITE(TFailureInjectorTests) {
+ Y_UNIT_TEST(BasicFailureTest) {
+ SetUpLogger();
+ std::atomic<bool> called;
+ called.store(false);
+ auto behavior = [&called] { OnReach(called); };
+ TFailureInjector::Reach("misc_failure", behavior);
+ UNIT_ASSERT_EQUAL(false, called.load());
+ TFailureInjector::Activate();
+ TFailureInjector::Set("misc_failure", 0, 1);
+ TFailureInjector::Reach("misc_failure", behavior);
+ UNIT_ASSERT_EQUAL(true, called.load());
+ }
+
+ Y_UNIT_TEST(CheckSkipTest) {
+ SetUpLogger();
+ std::atomic<bool> called;
+ called.store(false);
+ auto behavior = [&called] { OnReach(called); };
+ TFailureInjector::Activate();
+ TFailureInjector::Set("misc_failure", 1, 1);
+
+ TFailureInjector::Reach("misc_failure", behavior);
+ UNIT_ASSERT_EQUAL(false, called.load());
+ TFailureInjector::Reach("misc_failure", behavior);
+ UNIT_ASSERT_EQUAL(true, called.load());
+ }
+
+ Y_UNIT_TEST(CheckFailCountTest) {
+ SetUpLogger();
+ int called = 0;
+ auto behavior = [&called] { ++called; };
+ TFailureInjector::Activate();
+ TFailureInjector::Set("misc_failure", 1, 2);
+
+ TFailureInjector::Reach("misc_failure", behavior);
+ UNIT_ASSERT_EQUAL(0, called);
+ TFailureInjector::Reach("misc_failure", behavior);
+ UNIT_ASSERT_EQUAL(1, called);
+ TFailureInjector::Reach("misc_failure", behavior);
+ UNIT_ASSERT_EQUAL(2, called);
+ TFailureInjector::Reach("misc_failure", behavior);
+ UNIT_ASSERT_EQUAL(2, called);
+ TFailureInjector::Reach("misc_failure", behavior);
+ UNIT_ASSERT_EQUAL(2, called);
+ }
+
+ Y_UNIT_TEST(SlowDownTest) {
+ SetUpLogger();
+ TFailureInjector::Activate();
+ TFailureInjector::Set("misc_failure", 0, 1);
+
+ auto start = system_clock::now();
+ TFailureInjector::Reach("misc_failure", [] { ::Sleep(TDuration::Seconds(5)); });
+ auto finish = system_clock::now();
+ auto duration = duration_cast<std::chrono::seconds>(finish - start);
+ YQL_LOG(DEBUG) << "Duration :" << duration.count();
+ UNIT_ASSERT_GE(duration.count(), 5);
+ }
+}
diff --git a/yql/essentials/utils/failure_injector/ut/ya.make b/yql/essentials/utils/failure_injector/ut/ya.make
new file mode 100644
index 0000000000..579a466287
--- /dev/null
+++ b/yql/essentials/utils/failure_injector/ut/ya.make
@@ -0,0 +1,15 @@
+IF (OS_LINUX OR OS_DARWIN)
+ UNITTEST_FOR(yql/essentials/utils/failure_injector)
+
+ SIZE(SMALL)
+
+ SRCS(
+ failure_injector_ut.cpp
+ )
+
+ PEERDIR(
+ yql/essentials/utils/log
+ )
+
+ END()
+ENDIF()
diff --git a/yql/essentials/utils/failure_injector/ya.make b/yql/essentials/utils/failure_injector/ya.make
new file mode 100644
index 0000000000..e10dfdaecb
--- /dev/null
+++ b/yql/essentials/utils/failure_injector/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ failure_injector.cpp
+)
+
+PEERDIR(
+ yql/essentials/utils
+ yql/essentials/utils/log
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yql/essentials/utils/fetch/fetch.cpp b/yql/essentials/utils/fetch/fetch.cpp
new file mode 100644
index 0000000000..d35baeb097
--- /dev/null
+++ b/yql/essentials/utils/fetch/fetch.cpp
@@ -0,0 +1,182 @@
+#include "fetch.h"
+
+#include <yql/essentials/utils/log/log.h>
+
+#include <library/cpp/openssl/io/stream.h>
+#include <library/cpp/http/misc/httpcodes.h>
+#include <library/cpp/charset/ci_string.h>
+
+#include <util/network/socket.h>
+#include <util/string/cast.h>
+#include <util/generic/strbuf.h>
+
+namespace NYql {
+
+namespace {
+
+THttpURL ParseURL(const TStringBuf addr, NUri::TParseFlags features) {
+ THttpURL url;
+ THttpURL::TParsedState parsedState = url.Parse(addr, features, nullptr, 65536);
+ if (THttpURL::ParsedOK != parsedState) {
+ ythrow yexception() << "Bad URL: \"" << addr << "\", " << HttpURLParsedStateToString(parsedState);
+ }
+ return url;
+}
+
+class TFetchResultImpl: public IFetchResult {
+public:
+ TFetchResultImpl(const THttpURL& url, const THttpHeaders& additionalHeaders, TDuration timeout) {
+ TString host = url.Get(THttpURL::FieldHost);
+ TString path = url.PrintS(THttpURL::FlagPath | THttpURL::FlagQuery);
+ const char* p = url.Get(THttpURL::FieldPort);
+ ui16 port = 80;
+ bool https = false;
+
+ if (url.Get(THttpURL::FieldScheme) == TStringBuf("https")) {
+ port = 443;
+ https = true;
+ }
+
+ if (p) {
+ port = FromString<ui16>(p);
+ }
+
+ TString req;
+ {
+ TStringOutput rqs(req);
+ TStringBuf userAgent = "User-Agent: Mozilla/5.0 (compatible; YQL/1.0)";
+
+ IOutputStream::TPart request[] = {
+ IOutputStream::TPart("GET ", 4),
+ IOutputStream::TPart(path.data(), path.size()),
+ IOutputStream::TPart(" HTTP/1.1", 9),
+ IOutputStream::TPart::CrLf(),
+ IOutputStream::TPart("Host: ", 6),
+ IOutputStream::TPart(host.data(), host.size()),
+ IOutputStream::TPart::CrLf(),
+ IOutputStream::TPart(userAgent.data(), userAgent.size()),
+ IOutputStream::TPart::CrLf(),
+ };
+ rqs.Write(request, Y_ARRAY_SIZE(request));
+ if (!additionalHeaders.Empty()) {
+ additionalHeaders.OutTo(&rqs);
+ }
+ rqs << "\r\n";
+ }
+
+ Socket.Reset(new TSocket(TNetworkAddress(host, port), timeout));
+ SocketInput.Reset(new TSocketInput(*Socket));
+ SocketOutput.Reset(new TSocketOutput(*Socket));
+
+ Socket->SetSocketTimeout(timeout.Seconds(), timeout.MilliSeconds() % 1000);
+
+ if (https) {
+ Ssl.Reset(new TOpenSslClientIO(SocketInput.Get(), SocketOutput.Get()));
+ }
+
+ {
+ THttpOutput ho(Ssl ? (IOutputStream*)Ssl.Get() : (IOutputStream*)SocketOutput.Get());
+ (ho << req).Finish();
+ }
+ HttpInput.Reset(new THttpInput(Ssl ? (IInputStream*)Ssl.Get() : (IInputStream*)SocketInput.Get()));
+ }
+
+ THttpInput& GetStream() override {
+ return *HttpInput;
+ }
+
+ unsigned GetRetCode() override {
+ return ParseHttpRetCode(HttpInput->FirstLine());
+ }
+
+ THttpURL GetRedirectURL(const THttpURL& baseUrl) override {
+ for (auto i = HttpInput->Headers().Begin(); i != HttpInput->Headers().End(); ++i) {
+ if (0 == TCiString::compare(i->Name(), TStringBuf("location"))) {
+ THttpURL target = ParseURL(i->Value(), THttpURL::FeaturesAll | NUri::TFeature::FeatureConvertHostIDN);
+ if (!target.IsValidAbs()) {
+ target.Merge(baseUrl);
+ }
+ return target;
+ }
+ }
+ ythrow yexception() << "Unknown redirect location from " << baseUrl.PrintS();
+ }
+
+ static TFetchResultPtr Fetch(const THttpURL& url, const THttpHeaders& additionalHeaders, const TDuration& timeout) {
+ return new TFetchResultImpl(url, additionalHeaders, timeout);
+ }
+
+private:
+ THolder<TSocket> Socket;
+ THolder<TSocketInput> SocketInput;
+ THolder<TSocketOutput> SocketOutput;
+ THolder<TOpenSslClientIO> Ssl;
+ THolder<THttpInput> HttpInput;
+};
+
+inline bool IsRedirectCode(unsigned code) {
+ switch (code) {
+ case HTTP_MOVED_PERMANENTLY:
+ case HTTP_FOUND:
+ case HTTP_SEE_OTHER:
+ case HTTP_TEMPORARY_REDIRECT:
+ return true;
+ }
+ return false;
+}
+
+inline bool IsRetryCode(unsigned code) {
+ switch (code) {
+ case HTTP_REQUEST_TIME_OUT:
+ case HTTP_AUTHENTICATION_TIMEOUT:
+ case HTTP_TOO_MANY_REQUESTS:
+ case HTTP_GATEWAY_TIME_OUT:
+ case HTTP_SERVICE_UNAVAILABLE:
+ return true;
+ }
+ return false;
+}
+
+} // unnamed
+
+THttpURL ParseURL(const TStringBuf addr) {
+ return ParseURL(addr, THttpURL::FeaturesAll | NUri::TFeature::FeatureConvertHostIDN | NUri::TFeature::FeatureNoRelPath);
+}
+
+TFetchResultPtr Fetch(const THttpURL& url, const THttpHeaders& additionalHeaders, const TDuration& timeout, size_t retries, size_t redirects) {
+ THttpURL currentUrl = url;
+ for (size_t fetchNum = 0; fetchNum < redirects; ++fetchNum) {
+ unsigned responseCode = 0;
+ TFetchResultPtr fr;
+ size_t fetchTry = 0;
+ do {
+ fr = TFetchResultImpl::Fetch(currentUrl, additionalHeaders, timeout);
+ responseCode = fr->GetRetCode();
+ } while (IsRetryCode(responseCode) && ++fetchTry < retries);
+
+ if (responseCode >= 200 && responseCode < 300) {
+ return fr;
+ }
+
+ if (responseCode == HTTP_NOT_MODIFIED) {
+ return fr;
+ }
+
+ if (IsRedirectCode(responseCode)) {
+ currentUrl = fr->GetRedirectURL(currentUrl);
+ YQL_LOG(INFO) << "Got redirect to " << currentUrl.PrintS();
+ continue;
+ }
+
+ TString errorBody;
+ try {
+ errorBody = fr->GetStream().ReadAll();
+ } catch (...) {
+ }
+
+ ythrow yexception() << "Failed to fetch url '" << currentUrl.PrintS() << "' with code " << responseCode << ", body: " << errorBody;
+ }
+ ythrow yexception() << "Failed to fetch url '" << currentUrl.PrintS() << "': too many redirects";
+}
+
+} // NYql
diff --git a/yql/essentials/utils/fetch/fetch.h b/yql/essentials/utils/fetch/fetch.h
new file mode 100644
index 0000000000..d9e1c3c1a5
--- /dev/null
+++ b/yql/essentials/utils/fetch/fetch.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <library/cpp/uri/http_url.h>
+#include <library/cpp/http/io/headers.h>
+#include <library/cpp/http/io/stream.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/string.h>
+#include <util/generic/ptr.h>
+
+namespace NYql {
+
+struct IFetchResult: public TThrRefBase {
+ virtual THttpInput& GetStream() = 0;
+ virtual unsigned GetRetCode() = 0;
+ virtual THttpURL GetRedirectURL(const THttpURL& baseUrl) = 0;
+};
+
+using TFetchResultPtr = TIntrusivePtr<IFetchResult>;
+
+THttpURL ParseURL(const TStringBuf addr);
+TFetchResultPtr Fetch(const THttpURL& url, const THttpHeaders& additionalHeaders = {}, const TDuration& timeout = TDuration::Max(), size_t retries = 3, size_t redirects = 10);
+
+} // NYql
diff --git a/yql/essentials/utils/fetch/ya.make b/yql/essentials/utils/fetch/ya.make
new file mode 100644
index 0000000000..e95b4fde60
--- /dev/null
+++ b/yql/essentials/utils/fetch/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ fetch.cpp
+)
+
+PEERDIR(
+ library/cpp/charset
+ library/cpp/http/io
+ library/cpp/http/misc
+ library/cpp/openssl/io
+ library/cpp/uri
+ yql/essentials/utils/log
+)
+
+END()
diff --git a/yql/essentials/utils/fp_bits.h b/yql/essentials/utils/fp_bits.h
new file mode 100644
index 0000000000..ffce6e161b
--- /dev/null
+++ b/yql/essentials/utils/fp_bits.h
@@ -0,0 +1,122 @@
+#pragma once
+#include <util/system/defaults.h>
+
+#include <cmath>
+#include <limits>
+
+namespace NYql {
+/*
+
+double
+
+visual c++
+
+fff8000000000000 0.0/0.0
+7ff8000000000001 snan
+7ff8000000000000 qnan
+
+gcc/clang
+
+7ff8000000000000 0.0/0.0
+7ff4000000000000 snan
+7ff8000000000000 qnan
+
+float
+
+visual c++
+
+ffc00000 0.0f/0.0f
+7fc00001 snan
+7fc00000 qnan
+
+gcc/clang
+
+7fc00000 0.0f/0.0f
+7fa00000 snan
+7fc00000 qnan
+
+*/
+
+template <typename T>
+struct TFpTraits {
+ static constexpr bool Supported = false;
+};
+
+template <>
+struct TFpTraits<float> {
+ static constexpr bool Supported = std::numeric_limits<float>::is_iec559;
+ using TIntegral = ui32;
+ static constexpr TIntegral SignMask = (1u << 31);
+ static constexpr TIntegral Mantissa = 23;
+ static constexpr TIntegral Exp = 8;
+ static constexpr TIntegral MaxMantissa = (1u << Mantissa) - 1;
+ static constexpr TIntegral MaxExp = (1u << Exp) - 1;
+ static constexpr TIntegral QNan = 0x7fc00000u;
+};
+
+template <>
+struct TFpTraits<double> {
+ static constexpr bool Supported = std::numeric_limits<double>::is_iec559;
+ using TIntegral = ui64;
+ static constexpr TIntegral SignMask = (1ull << 63);
+ static constexpr TIntegral Mantissa = 52;
+ static constexpr TIntegral Exp = 11;
+ static constexpr TIntegral MaxMantissa = (1ull << Mantissa) - 1;
+ static constexpr TIntegral MaxExp = (1ull << Exp) - 1;
+ static constexpr TIntegral QNan = 0x7ff8000000000000ull;
+};
+
+template <typename T, bool>
+struct TCanonizeFpBitsImpl {
+};
+
+template <typename T>
+struct TCanonizeFpBitsImpl<T, true> {
+ static void Do(void* buffer) {
+ using TTraits = TFpTraits<T>;
+ using TIntegral = typename TTraits::TIntegral;
+ const TIntegral value = *(TIntegral*)(buffer);
+ if (value == TTraits::SignMask) {
+ *(TIntegral*)buffer = 0;
+ return;
+ }
+
+ const TIntegral exp = (value >> TTraits::Mantissa) & TTraits::MaxExp;
+ // inf or nan
+ if (exp == TTraits::MaxExp) {
+ if (value & TTraits::MaxMantissa) {
+ // quiet nan
+ *(TIntegral*)buffer = TTraits::QNan;
+ }
+ }
+ }
+};
+
+template <typename T>
+struct TCanonizeFpBitsImpl<T, false> {
+ static void Do(void* buffer) {
+ using TNumTraits = std::numeric_limits<T>;
+ const T value = *(T*)buffer;
+ switch (std::fpclassify(value)) {
+ case FP_NAN:
+ static_assert(TNumTraits::has_quiet_NaN, "no QNAN");
+ *(T*)buffer = TNumTraits::quiet_NaN();
+ break;
+ case FP_ZERO:
+ *(T*)buffer = T(0);
+ break;
+ }
+ }
+};
+
+/*
+ Canonize floating point number bits.
+ Converts minus zero to zero and all NaNs to quiet NaN.
+ @param buffer[in/out] - aligned buffer with floating point number
+*/
+template <typename T>
+void CanonizeFpBits(void* buffer) {
+ return TCanonizeFpBitsImpl<T, TFpTraits<T>::Supported>::Do(buffer);
+}
+
+}
diff --git a/yql/essentials/utils/fp_bits_ut.cpp b/yql/essentials/utils/fp_bits_ut.cpp
new file mode 100644
index 0000000000..d6d94b56f4
--- /dev/null
+++ b/yql/essentials/utils/fp_bits_ut.cpp
@@ -0,0 +1,105 @@
+#include "fp_bits.h"
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/system/valgrind.h>
+
+namespace NYql {
+
+namespace {
+template <typename T>
+void CanonizeFpBitsTest() {
+ enum EValues {
+ Zero,
+ MZero,
+ One,
+ Half,
+ Two,
+ PMin,
+ PMax,
+ DNorm,
+ PInf,
+ NInf,
+ QNan,
+ SNan,
+ INan,
+ Max
+ };
+
+ T values[EValues::Max];
+ T newValues[EValues::Max];
+ int valuesClass[EValues::Max];
+ values[Zero] = T(0);
+ values[MZero] = -values[Zero];
+ values[One] = T(1);
+ values[Half] = values[One] / 2;
+ values[Two] = values[One] * 2;
+ values[PMin] = std::numeric_limits<T>::min();
+ values[DNorm] = values[PMin] / 2;
+ values[PMax] = std::numeric_limits<T>::max();
+ values[PInf] = std::numeric_limits<T>::infinity();
+ values[NInf] = -std::numeric_limits<T>::infinity();
+ values[QNan] = std::numeric_limits<T>::quiet_NaN();
+ values[SNan] = std::numeric_limits<T>::signaling_NaN();
+ values[INan] = std::numeric_limits<T>::infinity() / std::numeric_limits<T>::infinity();
+
+ for (int v = Zero; v < Max; ++v) {
+ int cls;
+ if (v <= MZero) {
+ cls = FP_ZERO;
+ } else if (v < DNorm) {
+ cls = FP_NORMAL;
+ } else if (v == DNorm) {
+ cls = FP_SUBNORMAL;
+ } else if (v < QNan) {
+ cls = FP_INFINITE;
+ } else {
+ cls = FP_NAN;
+ }
+
+ valuesClass[v] = cls;
+ }
+
+ for (int v = Zero; v < Max; ++v) {
+ UNIT_ASSERT_VALUES_EQUAL(std::fpclassify(values[v]), valuesClass[v]);
+ }
+
+ for (int v = Zero; v < Max; ++v) {
+ newValues[v] = values[v];
+ CanonizeFpBits<T>(&newValues[v]);
+ }
+
+ for (int v = Zero; v < Max; ++v) {
+ UNIT_ASSERT_VALUES_EQUAL(std::fpclassify(newValues[v]), valuesClass[v]);
+ }
+
+ for (int v = Zero; v < Max; ++v) {
+ int originalV = v;
+ if (v == MZero) {
+ originalV = Zero;
+ } else if (v >= QNan) {
+ originalV = QNan;
+ }
+
+ UNIT_ASSERT(std::memcmp((const void*)&newValues[v], (const void*)&values[originalV], std::min(size_t(10), sizeof(T))) == 0);
+ }
+}
+}
+
+Y_UNIT_TEST_SUITE(TFpBits) {
+ Y_UNIT_TEST(CanonizeFloat) {
+ CanonizeFpBitsTest<float>();
+ }
+
+ Y_UNIT_TEST(CanonizeDouble) {
+ CanonizeFpBitsTest<double>();
+ }
+
+ Y_UNIT_TEST(CanonizeLongDouble) {
+ if (NValgrind::ValgrindIsOn()) {
+ return; // TODO KIKIMR-3431
+ }
+ CanonizeFpBitsTest<long double>();
+ }
+}
+
+}
diff --git a/yql/essentials/utils/future_action.cpp b/yql/essentials/utils/future_action.cpp
new file mode 100644
index 0000000000..26deb8de7e
--- /dev/null
+++ b/yql/essentials/utils/future_action.cpp
@@ -0,0 +1 @@
+#include "future_action.h"
diff --git a/yql/essentials/utils/future_action.h b/yql/essentials/utils/future_action.h
new file mode 100644
index 0000000000..d4a8dd69aa
--- /dev/null
+++ b/yql/essentials/utils/future_action.h
@@ -0,0 +1,59 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+
+namespace NYql {
+/*
+Let's call 'action' attached functor to future which should be called in the main thread.
+This trick allows us to execute attached actions not in the future's execution thread but rather in the main thread
+and preserve single-threaded logic.
+*/
+
+/*
+Make ready future with constant action inside.
+*/
+template <typename T>
+NThreading::TFuture<std::function<T()>> MakeFutureWithConstantAction(const T& value) {
+ return NThreading::MakeFuture<std::function<T()>>([value]() {
+ return value;
+ });
+}
+
+template <typename T, typename A>
+auto AddActionToFuture(NThreading::TFuture<T> f, const A& action) {
+ using V = decltype(action(std::declval<T>()));
+
+ return f.Apply([action](NThreading::TFuture<T> f) {
+ std::function<V()> r = [f, action]() {
+ return action(f.GetValue());
+ };
+
+ return r;
+ });
+}
+
+/*
+Apply continuation with constant action
+*/
+template <typename T, typename V>
+NThreading::TFuture<std::function<V()>> AddConstantActionToFuture(NThreading::TFuture<T> f, const V& value) {
+ return AddActionToFuture(f, [value](const T&) { return value; });
+}
+
+/*
+Transform action result by applying mapper
+*/
+template <typename R, typename TMapper, typename ...Args>
+auto MapFutureAction(NThreading::TFuture<std::function<R(Args&&...)>> f, const TMapper& mapper) {
+ using V = decltype(mapper(std::declval<R>()));
+
+ return f.Apply([mapper](NThreading::TFuture<std::function<R(Args&&...)>> f) {
+ std::function<V(Args&&...)> r = [f, mapper](Args&& ...args) {
+ return mapper(f.GetValue()(std::forward<Args>(args)...));
+ };
+
+ return r;
+ });
+}
+
+}
diff --git a/yql/essentials/utils/hash.cpp b/yql/essentials/utils/hash.cpp
new file mode 100644
index 0000000000..b0bc284d4f
--- /dev/null
+++ b/yql/essentials/utils/hash.cpp
@@ -0,0 +1,21 @@
+#include "hash.h"
+#include <util/system/getpid.h>
+#include <util/system/env.h>
+
+namespace NYql {
+
+#ifndef NDEBUG
+size_t VaryingHash(size_t src) {
+ struct TPid {
+ size_t Value;
+
+ TPid()
+ : Value(GetEnv("YQL_MUTATE_HASHCODE") ? IntHash(GetPID()) : 0)
+ {}
+ };
+
+ return Singleton<TPid>()->Value ^ src;
+}
+#endif
+
+}
diff --git a/yql/essentials/utils/hash.h b/yql/essentials/utils/hash.h
new file mode 100644
index 0000000000..45cd95b777
--- /dev/null
+++ b/yql/essentials/utils/hash.h
@@ -0,0 +1,87 @@
+#pragma once
+#include <unordered_set>
+#include <unordered_map>
+#include <util/generic/hash.h>
+#include <util/generic/hash_multi_map.h>
+#include <util/generic/hash_set.h>
+
+namespace NYql {
+
+#ifndef NDEBUG
+size_t VaryingHash(size_t src);
+#else
+inline size_t VaryingHash(size_t src) {
+ return src;
+}
+#endif
+
+template <typename T, typename THasher = ::THash<T>>
+struct TVaryingHash {
+ THasher Underlying;
+
+ TVaryingHash() = default;
+ TVaryingHash(const TVaryingHash&) = default;
+ TVaryingHash(const THasher& underlying)
+ : Underlying(underlying)
+ {}
+
+ TVaryingHash& operator=(const TVaryingHash& other) = default;
+
+ size_t operator()(const T& elem) const {
+ return VaryingHash(Underlying(elem));
+ }
+};
+
+template <class TKey,
+ class TValue,
+ class THasher = std::hash<TKey>,
+ class TEqual = std::equal_to<TKey>,
+ class TAlloc = std::allocator<std::pair<const TKey, TValue>>>
+using TVaryingUnorderedMap = std::unordered_map<TKey, TValue, TVaryingHash<TKey, THasher>, TEqual, TAlloc>;
+
+template <class TKey,
+ class TValue,
+ class THasher = std::hash<TKey>,
+ class TEqual = std::equal_to<TKey>,
+ class TAlloc = std::allocator<std::pair<const TKey, TValue>>>
+using TVaryingUnorderedMultiMap = std::unordered_multimap<TKey, TValue, TVaryingHash<TKey, THasher>, TEqual, TAlloc>;
+
+template <class TKey,
+ class THasher = std::hash<TKey>,
+ class TEqual = std::equal_to<TKey>,
+ class TAlloc = std::allocator<TKey>>
+using TVaryingUnorderedSet = std::unordered_set<TKey, TVaryingHash<TKey, THasher>, TEqual, TAlloc>;
+
+template <class TKey,
+ class THasher = std::hash<TKey>,
+ class TEqual = std::equal_to<TKey>,
+ class TAlloc = std::allocator<TKey>>
+using TVaryingUnorderedMultiSet = std::unordered_multiset<TKey, TVaryingHash<TKey, THasher>, TEqual, TAlloc>;
+
+template <class TKey,
+ class TValue,
+ class THasher = THash<TKey>,
+ class TEqual = TEqualTo<TKey>,
+ class TAlloc = std::allocator<std::pair<const TKey, TValue>>>
+using TVaryingHashMap = THashMap<TKey, TValue, TVaryingHash<TKey, THasher>, TEqual, TAlloc>;
+
+template <class TKey,
+ class TValue,
+ class THasher = THash<TKey>,
+ class TEqual = TEqualTo<TKey>,
+ class TAlloc = std::allocator<std::pair<const TKey, TValue>>>
+using TVaryingHashMultiMap = THashMultiMap<TKey, TValue, TVaryingHash<TKey, THasher>, TEqual, TAlloc>;
+
+template <class TKey,
+ class THasher = THash<TKey>,
+ class TEqual = TEqualTo<TKey>,
+ class TAlloc = std::allocator<TKey>>
+using TVaryingHashSet = THashSet<TKey, TVaryingHash<TKey, THasher>, TEqual, TAlloc>;
+
+template <class TKey,
+ class THasher = THash<TKey>,
+ class TEqual = TEqualTo<TKey>,
+ class TAlloc = std::allocator<TKey>>
+using TVaryingHashMultiSet = THashMultiSet<TKey, TVaryingHash<TKey, THasher>, TEqual, TAlloc>;
+
+} // namespace NYql
diff --git a/yql/essentials/utils/limiting_allocator.cpp b/yql/essentials/utils/limiting_allocator.cpp
new file mode 100644
index 0000000000..0ff84f9037
--- /dev/null
+++ b/yql/essentials/utils/limiting_allocator.cpp
@@ -0,0 +1,35 @@
+#include "limiting_allocator.h"
+
+#include <util/memory/pool.h>
+#include <util/generic/yexception.h>
+
+namespace {
+class TLimitingAllocator : public IAllocator {
+public:
+ TLimitingAllocator(size_t limit, IAllocator* allocator) : Alloc_(allocator), Limit_(limit) {};
+ TBlock Allocate(size_t len) override final {
+ if (Allocated_ + len > Limit_) {
+ throw std::runtime_error("Out of memory");
+ }
+ Allocated_ += len;
+ return Alloc_->Allocate(len);
+ }
+
+ void Release(const TBlock& block) override final {
+ Y_ENSURE(Allocated_ >= block.Len);
+ Allocated_ -= block.Len;
+ Alloc_->Release(block);
+ }
+
+private:
+ IAllocator* Alloc_;
+ size_t Allocated_ = 0;
+ const size_t Limit_;
+};
+}
+
+namespace NYql {
+std::unique_ptr<IAllocator> MakeLimitingAllocator(size_t limit, IAllocator* underlying) {
+ return std::make_unique<TLimitingAllocator>(limit, underlying);
+}
+}
diff --git a/yql/essentials/utils/limiting_allocator.h b/yql/essentials/utils/limiting_allocator.h
new file mode 100644
index 0000000000..7d94aa7f2a
--- /dev/null
+++ b/yql/essentials/utils/limiting_allocator.h
@@ -0,0 +1,8 @@
+#pragma once
+
+#include <util/memory/pool.h>
+#include <memory>
+
+namespace NYql {
+std::unique_ptr<IAllocator> MakeLimitingAllocator(size_t limit, IAllocator* underlying);
+}
diff --git a/yql/essentials/utils/log/context.cpp b/yql/essentials/utils/log/context.cpp
new file mode 100644
index 0000000000..c8ad460445
--- /dev/null
+++ b/yql/essentials/utils/log/context.cpp
@@ -0,0 +1,89 @@
+#include "context.h"
+#include "log.h"
+
+#include <util/thread/singleton.h>
+
+
+namespace NYql {
+namespace NLog {
+namespace {
+
+struct TThrowedLogContext {
+ TString LocationWithLogContext; // separated with ': '
+};
+
+} // namspace
+
+void OutputLogCtx(IOutputStream* out, bool withBraces, bool skipSessionId) {
+ const NImpl::TLogContextListItem* ctxList = NImpl::GetLogContextList();
+
+ if (ctxList->HasNext()) {
+ if (withBraces) {
+ (*out) << '{';
+ }
+
+ // skip header stub element
+ NImpl::TLogContextListItem* ctxItem = ctxList->Next;
+
+ bool isFirst = true;
+ while (ctxItem != ctxList) {
+ for (const TString& name: *ctxItem) {
+ if (!skipSessionId && !name.empty()) {
+ if (!isFirst) {
+ (*out) << '/';
+ }
+ (*out) << name;
+ isFirst = false;
+ }
+ skipSessionId = false;
+ }
+ ctxItem = ctxItem->Next;
+ }
+
+ if (withBraces) {
+ (*out) << TStringBuf("} ");
+ }
+ }
+}
+
+NImpl::TLogContextListItem* NImpl::GetLogContextList() {
+ return FastTlsSingleton<NImpl::TLogContextListItem>();
+}
+
+std::pair<TString, TString> CurrentLogContextPath() {
+ TString sessionId;
+ const NImpl::TLogContextListItem* possibleRootLogCtx = NImpl::GetLogContextList()->Next;
+ if (auto rootLogCtx = dynamic_cast<const NImpl::TLogContextSessionItem*>(possibleRootLogCtx)) {
+ if (rootLogCtx->HasSessionId()) {
+ sessionId = (*rootLogCtx->begin());
+ }
+ }
+
+ TStringStream ss;
+ OutputLogCtx(&ss, false, !sessionId.empty());
+ return std::make_pair(sessionId, ss.Str());
+}
+
+TString ThrowedLogContextPath() {
+ TThrowedLogContext* tlc = FastTlsSingleton<TThrowedLogContext>();
+ return std::move(tlc->LocationWithLogContext);
+}
+
+
+TAutoPtr<TLogElement> TContextPreprocessor::Preprocess(
+ TAutoPtr<TLogElement> element)
+{
+ OutputLogCtx(element.Get(), true);
+ return element;
+}
+
+void TYqlLogContextLocation::SetThrowedLogContextPath() const {
+ TStringStream ss;
+ ss << Location_ << TStringBuf(": ");
+ OutputLogCtx(&ss, true);
+ TThrowedLogContext* tlc = FastTlsSingleton<TThrowedLogContext>();
+ tlc->LocationWithLogContext = ss.Str();
+}
+
+} // namespace NLog
+} // namespace NYql
diff --git a/yql/essentials/utils/log/context.h b/yql/essentials/utils/log/context.h
new file mode 100644
index 0000000000..75f2e28364
--- /dev/null
+++ b/yql/essentials/utils/log/context.h
@@ -0,0 +1,259 @@
+#pragma once
+
+#include <util/system/defaults.h>
+#include <util/generic/strbuf.h>
+#include <util/stream/output.h>
+#include <util/system/src_location.h>
+#include <util/system/yassert.h>
+#include <array>
+
+// continues existing contexts chain
+
+#define YQL_LOG_CTX_SCOPE(...) \
+ auto Y_CAT(c, __LINE__) = ::NYql::NLog::MakeCtx(__VA_ARGS__); \
+ Y_UNUSED(Y_CAT(c, __LINE__))
+
+#define YQL_LOG_CTX_BLOCK(...) \
+ if (auto Y_GENERATE_UNIQUE_ID(c) = ::NYql::NLog::MakeCtx(__VA_ARGS__)) { \
+ goto Y_CAT(YQL_LOG_CTX_LABEL, __LINE__); \
+ } else Y_CAT(YQL_LOG_CTX_LABEL, __LINE__):
+
+
+// starts new contexts chain, after leaving current scope restores
+// previous contexts chain
+
+#define YQL_LOG_CTX_ROOT_SESSION_SCOPE(sessionId, ...) \
+ auto Y_CAT(c, __LINE__) = ::NYql::NLog::MakeRootCtx(sessionId, ##__VA_ARGS__); \
+ Y_UNUSED(Y_CAT(c, __LINE__))
+
+#define YQL_LOG_CTX_ROOT_SCOPE(...) \
+ auto Y_CAT(c, __LINE__) = ::NYql::NLog::MakeRootCtx("", __VA_ARGS__); \
+ Y_UNUSED(Y_CAT(c, __LINE__))
+
+#define YQL_LOG_CTX_ROOT_BLOCK(...) \
+ if (auto Y_GENERATE_UNIQUE_ID(c) = ::NYql::NLog::MakeRootCtx(__VA_ARGS__)) { \
+ goto Y_CAT(YQL_LOG_CTX_LABEL, __LINE__); \
+ } else Y_CAT(YQL_LOG_CTX_LABEL, __LINE__):
+
+
+// adds current contexts path to exception message before throwing it
+
+#define YQL_LOG_CTX_THROW throw ::NYql::NLog::TYqlLogContextLocation(__LOCATION__) +
+
+class TLogElement;
+
+namespace NYql {
+namespace NLog {
+namespace NImpl {
+
+/**
+ * @brief Represents item of logging context list.
+ */
+class TLogContextListItem {
+public:
+ TLogContextListItem* Next;
+ TLogContextListItem* Prev;
+ size_t NamesCount;
+
+ explicit TLogContextListItem(size_t namesCount = 0, size_t headerSize = 0)
+ : Next(this)
+ , Prev(this)
+ , NamesCount(namesCount)
+ , HeaderSize_(headerSize)
+ {
+ // initialize HeaderSize_ if child didn't
+ if (headerSize == 0) {
+ HeaderSize_ = sizeof(*this);
+ }
+ }
+
+ virtual ~TLogContextListItem() {
+ }
+
+ const TString* begin() const {
+ auto* ptr = reinterpret_cast<const ui8*>(this);
+ return reinterpret_cast<const TString*>(ptr + HeaderSize_);
+ }
+
+ const TString* end() const {
+ return begin() + NamesCount;
+ }
+
+ bool HasNext() const {
+ return Next != this;
+ }
+
+ void LinkBefore(TLogContextListItem* item) {
+ Y_DEBUG_ABORT_UNLESS(!HasNext());
+ Next = item;
+ Prev = item->Prev;
+ Prev->Next = this;
+ Next->Prev = this;
+ }
+
+ void Unlink() {
+ if (!HasNext()) return;
+
+ Prev->Next = Next;
+ Next->Prev = Prev;
+ Next = Prev = this;
+ }
+
+private:
+ // Additional memory before Names_ used in child class
+ size_t HeaderSize_;
+};
+
+/**
+ * @brief Returns pointer to thread local log context list.
+ */
+TLogContextListItem* GetLogContextList();
+
+/**
+ * @brief Context element with stored SessionId.
+*/
+class TLogContextSessionItem : public TLogContextListItem {
+public:
+ TLogContextSessionItem(size_t size, bool hasSessionId_)
+ : TLogContextListItem(size, sizeof(*this)) {
+ HasSessionId_ = hasSessionId_;
+ }
+
+ bool HasSessionId() const {
+ return HasSessionId_;
+ }
+
+private:
+ bool HasSessionId_;
+};
+
+} // namspace NImpl
+
+/**
+ * @brief YQL logger context element. Each element can contains several names.
+ */
+template <size_t Size>
+class TLogContext: public NImpl::TLogContextListItem {
+public:
+ template <typename... TArgs>
+ TLogContext(TArgs... args)
+ : TLogContextListItem(Size)
+ , Names_{{ TString{std::forward<TArgs>(args)}... }}
+ {
+ LinkBefore(NImpl::GetLogContextList());
+ }
+
+ ~TLogContext() {
+ Unlink();
+ }
+
+ explicit inline operator bool() const noexcept {
+ return true;
+ }
+
+private:
+ std::array<TString, Size> Names_;
+};
+
+/**
+ * @brief Special Root context elements which replaces previous log context
+ * list head by itself and restores previous one on destruction.
+ */
+template <size_t Size>
+class TRootLogContext: public NImpl::TLogContextSessionItem {
+public:
+ template <typename... TArgs>
+ TRootLogContext(const TString& sessionId, TArgs... args)
+ : TLogContextSessionItem(Size, !sessionId.empty())
+ , Names_{{ sessionId, TString{std::forward<TArgs>(args)}... }}
+ {
+ NImpl::TLogContextListItem* ctxList = NImpl::GetLogContextList();
+ PrevLogContextHead_.Prev = ctxList->Prev;
+ PrevLogContextHead_.Next = ctxList->Next;
+ ctxList->Next = ctxList->Prev = ctxList;
+ LinkBefore(ctxList);
+ }
+
+ ~TRootLogContext() {
+ Unlink();
+ NImpl::TLogContextListItem* ctxList = NImpl::GetLogContextList();
+ ctxList->Prev = PrevLogContextHead_.Prev;
+ ctxList->Next = PrevLogContextHead_.Next;
+ }
+
+ explicit inline operator bool() const noexcept {
+ return true;
+ }
+
+private:
+ std::array<TString, Size> Names_;
+ NImpl::TLogContextListItem PrevLogContextHead_;
+};
+
+/**
+ * @brief Helper function to construct TLogContext from variable
+ * arguments list.
+ */
+template <typename... TArgs>
+inline auto MakeCtx(TArgs&&... args) -> TLogContext<sizeof...(args)> {
+ return TLogContext<sizeof...(args)>(std::forward<TArgs>(args)...);
+}
+
+template <typename... TArgs>
+inline auto MakeRootCtx(const TString& sessionId, TArgs&&... args) -> TRootLogContext<sizeof...(args) + 1> {
+ return TRootLogContext<sizeof...(args) + 1>(sessionId, std::forward<TArgs>(args)...);
+}
+
+inline auto MakeRootCtx(const std::pair<TString, TString>& ctx) -> TRootLogContext<2> {
+ return TRootLogContext<2>(ctx.first, ctx.second);
+}
+
+/**
+ * @brief Returns pair with sessionId and
+ * current logger contexts path as string. Each element
+ * is separated with '/'.
+ */
+std::pair<TString, TString> CurrentLogContextPath();
+
+/**
+ * @brief If last throwing exception was performed with YQL_LOG_CTX_THROW
+ * macro this function returns location and context of that throw point.
+ */
+TString ThrowedLogContextPath();
+
+/**
+ * @brief Adds context preffix before logging message.
+ */
+struct TContextPreprocessor {
+ static TAutoPtr<TLogElement> Preprocess(TAutoPtr<TLogElement> element);
+};
+
+/**
+ * @brief Outputs current logger context into stream
+ */
+void OutputLogCtx(IOutputStream* out, bool withBraces, bool skipSessionId = false);
+
+/**
+ * @brief Outputs current logger context into exception message.
+ */
+class TYqlLogContextLocation {
+public:
+ TYqlLogContextLocation(const TSourceLocation& location)
+ : Location_(location.File, location.Line)
+ {
+ }
+
+ void SetThrowedLogContextPath() const;
+
+ template <class T>
+ inline T&& operator+(T&& t) {
+ SetThrowedLogContextPath();
+ return std::forward<T>(t);
+ }
+
+private:
+ TSourceLocation Location_;
+};
+
+} // namespace NLog
+} // namespace NYql
diff --git a/yql/essentials/utils/log/log.cpp b/yql/essentials/utils/log/log.cpp
new file mode 100644
index 0000000000..4a20ae504c
--- /dev/null
+++ b/yql/essentials/utils/log/log.cpp
@@ -0,0 +1,368 @@
+#include "log.h"
+
+#include <yql/essentials/utils/log/proto/logger_config.pb.h>
+
+#include <library/cpp/logger/stream.h>
+#include <library/cpp/logger/system.h>
+#include <library/cpp/logger/composite.h>
+#include <util/datetime/systime.h>
+#include <util/generic/strbuf.h>
+#include <util/stream/format.h>
+#include <util/system/getpid.h>
+#include <util/system/mutex.h>
+#include <util/system/progname.h>
+#include <util/system/thread.i>
+
+#include <stdio.h>
+#include <time.h>
+
+static TMutex g_InitLoggerMutex;
+static int g_LoggerInitialized = 0;
+
+namespace {
+
+class TLimitedLogBackend final : public TLogBackend {
+public:
+ TLimitedLogBackend(TAutoPtr<TLogBackend> b, TAtomic& flag, ui64 limit) noexcept
+ : Backend(b)
+ , Flag(flag)
+ , Limit(limit)
+ {
+ }
+
+ ~TLimitedLogBackend() final {
+ }
+
+ void ReopenLog() final {
+ Backend->ReopenLog();
+ }
+
+ void WriteData(const TLogRecord& rec) final {
+ const auto remaining = AtomicGet(Limit);
+ const bool final = remaining > 0 && AtomicSub(Limit, rec.Len) <= 0;
+ if (remaining > 0 || rec.Priority <= TLOG_WARNING) {
+ Backend->WriteData(rec);
+ }
+ if (final) {
+ AtomicSet(Flag, 1);
+ }
+ }
+
+private:
+ THolder<TLogBackend> Backend;
+ TAtomic& Flag;
+ TAtomic Limit;
+};
+
+// Conversions between NYql::NProto::TLoggingConfig enums and NYql::NLog enums
+
+NYql::NLog::ELevel ConvertLevel(NYql::NProto::TLoggingConfig::ELevel level) {
+ using namespace NYql::NProto;
+ using namespace NYql::NLog;
+ switch (level) {
+ case TLoggingConfig::FATAL: return ELevel::FATAL;
+ case TLoggingConfig::ERROR: return ELevel::ERROR;
+ case TLoggingConfig::WARN: return ELevel::WARN;
+ case TLoggingConfig::INFO: return ELevel::INFO;
+ case TLoggingConfig::DEBUG: return ELevel::DEBUG;
+ case TLoggingConfig::TRACE: return ELevel::TRACE;
+ }
+
+ ythrow yexception() << "unknown log level: "
+ << TLoggingConfig::ELevel_Name(level);
+}
+
+NYql::NLog::EComponent ConvertComponent(NYql::NProto::TLoggingConfig::EComponent c) {
+ using namespace NYql::NProto;
+ using namespace NYql::NLog;
+ switch (c) {
+ case TLoggingConfig::DEFAULT: return EComponent::Default;
+ case TLoggingConfig::CORE: return EComponent::Core;
+ case TLoggingConfig::CORE_EVAL: return EComponent::CoreEval;
+ case TLoggingConfig::CORE_PEEPHOLE: return EComponent::CorePeepHole;
+ case TLoggingConfig::CORE_EXECUTION: return EComponent::CoreExecution;
+ case TLoggingConfig::SQL: return EComponent::Sql;
+ case TLoggingConfig::PROVIDER_COMMON: return EComponent::ProviderCommon;
+ case TLoggingConfig::PROVIDER_CONFIG: return EComponent::ProviderConfig;
+ case TLoggingConfig::PROVIDER_RESULT: return EComponent::ProviderResult;
+ case TLoggingConfig::PROVIDER_YT: return EComponent::ProviderYt;
+ case TLoggingConfig::PROVIDER_KIKIMR: return EComponent::ProviderKikimr;
+ case TLoggingConfig::PROVIDER_KQP: return EComponent::ProviderKqp;
+ case TLoggingConfig::PROVIDER_RTMR: return EComponent::ProviderRtmr;
+ case TLoggingConfig::PERFORMANCE: return EComponent::Perf;
+ case TLoggingConfig::NET: return EComponent::Net;
+ case TLoggingConfig::PROVIDER_STAT: return EComponent::ProviderStat;
+ case TLoggingConfig::PROVIDER_SOLOMON: return EComponent::ProviderSolomon;
+ case TLoggingConfig::PROVIDER_DQ: return EComponent::ProviderDq;
+ case TLoggingConfig::PROVIDER_CLICKHOUSE: return EComponent::ProviderClickHouse;
+ case TLoggingConfig::PROVIDER_YDB: return EComponent::ProviderYdb;
+ case TLoggingConfig::PROVIDER_PQ: return EComponent::ProviderPq;
+ case TLoggingConfig::PROVIDER_S3: return EComponent::ProviderS3;
+ case TLoggingConfig::CORE_DQ: return EComponent::CoreDq;
+ case TLoggingConfig::HTTP_GATEWAY: return EComponent::HttpGateway;
+ case TLoggingConfig::PROVIDER_GENERIC: return EComponent::ProviderGeneric;
+ case TLoggingConfig::PROVIDER_PG: return EComponent::ProviderPg;
+ }
+
+ ythrow yexception() << "unknown log component: "
+ << TLoggingConfig::EComponent_Name(c);
+}
+
+TString ConvertDestinationType(NYql::NProto::TLoggingConfig::ELogTo c) {
+ switch (c) {
+ case NYql::NProto::TLoggingConfig::STDOUT: return "cout";
+ case NYql::NProto::TLoggingConfig::STDERR: return "cerr";
+ case NYql::NProto::TLoggingConfig::CONSOLE: return "console";
+ default : {
+ ythrow yexception() << "unsupported ELogTo destination in Convert";
+ }
+ }
+
+ ythrow yexception() << "unknown ELogTo destination";
+}
+
+NYql::NProto::TLoggingConfig::TLogDestination CreateLogDestination(const TString& c) {
+ NYql::NProto::TLoggingConfig::TLogDestination destination;
+ if (c == "cout") {
+ destination.SetType(NYql::NProto::TLoggingConfig::STDOUT);
+ } else if (c == "cerr") {
+ destination.SetType(NYql::NProto::TLoggingConfig::STDERR);
+ } else if (c == "console") {
+ destination.SetType(NYql::NProto::TLoggingConfig::CONSOLE);
+ } else {
+ destination.SetType(NYql::NProto::TLoggingConfig::FILE);
+ destination.SetTarget(c);
+ }
+ return destination;
+}
+
+} // namspace
+
+namespace NYql {
+namespace NLog {
+
+void WriteLocalTime(IOutputStream* out) {
+ struct timeval now;
+ gettimeofday(&now, nullptr);
+
+ struct tm tm;
+ time_t seconds = static_cast<time_t>(now.tv_sec);
+ localtime_r(&seconds, &tm);
+
+ char buf[sizeof("2016-01-02 03:04:05.006")];
+ int n = strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S.", &tm);
+ snprintf(buf + n, sizeof(buf) - n, "%03" PRIu32, static_cast<ui32>(now.tv_usec) / 1000);
+
+ out->Write(buf, sizeof(buf) - 1);
+}
+
+/**
+ * TYqlLogElement
+ * automaticaly adds new line char
+ */
+class TYqlLogElement: public TLogElement {
+public:
+ TYqlLogElement(const TLog* parent, ELevel level)
+ : TLogElement(parent, ELevelHelpers::ToLogPriority(level))
+ {
+ }
+
+ ~TYqlLogElement() {
+ *this << '\n';
+ }
+};
+
+TYqlLog::TYqlLog()
+ : TLog()
+ , ProcName_()
+ , ProcId_()
+ , WriteTruncMsg_(0) {}
+
+TYqlLog::TYqlLog(const TString& logType, const TComponentLevels& levels)
+ : TLog(logType)
+ , ProcName_(GetProgramName())
+ , ProcId_(GetPID())
+ , WriteTruncMsg_(0)
+{
+ for (size_t component = 0; component < levels.size(); ++component) {
+ SetComponentLevel(EComponentHelpers::FromInt(component), levels[component]);
+ }
+}
+
+TYqlLog::TYqlLog(TAutoPtr<TLogBackend> backend, const TComponentLevels& levels)
+ : TLog(backend)
+ , ProcName_(GetProgramName())
+ , ProcId_(GetPID())
+ , WriteTruncMsg_(0)
+{
+ for (size_t component = 0; component < levels.size(); ++component) {
+ SetComponentLevel(EComponentHelpers::FromInt(component), levels[component]);
+ }
+}
+
+void TYqlLog::UpdateProcInfo(const TString& procName) {
+ ProcName_ = procName;
+ ProcId_ = GetPID();
+}
+
+TAutoPtr<TLogElement> TYqlLog::CreateLogElement(
+ EComponent component, ELevel level,
+ TStringBuf file, int line) const
+{
+ const bool writeMsg = AtomicCas(&WriteTruncMsg_, 0, 1);
+ auto element = MakeHolder<TYqlLogElement>(this, writeMsg ? ELevel::FATAL : level);
+ if (writeMsg) {
+ WriteLogPrefix(element.Get(), EComponent::Default, ELevel::FATAL, __FILE__, __LINE__);
+ *element << "Log is truncated by limit\n";
+ *element << ELevelHelpers::ToLogPriority(level);
+ }
+
+ WriteLogPrefix(element.Get(), component, level, file, line);
+ return element.Release();
+}
+
+void TYqlLog::WriteLogPrefix(IOutputStream* out, EComponent component, ELevel level, TStringBuf file, int line) const {
+ // LOG FORMAT:
+ // {datetime} {level} {procname}(pid={pid}, tid={tid}) [{component}] {source_location}: {message}\n
+ //
+
+ WriteLocalTime(out);
+ *out << ' '
+ << ELevelHelpers::ToString(level) << ' '
+ << ProcName_ << TStringBuf("(pid=") << ProcId_
+ << TStringBuf(", tid=")
+#ifdef _unix_
+ << Hex(SystemCurrentThreadIdImpl())
+#else
+ << SystemCurrentThreadIdImpl()
+#endif
+ << TStringBuf(") [") << EComponentHelpers::ToString(component)
+ << TStringBuf("] ")
+ << file.RAfter(LOCSLASH_C) << ':' << line << TStringBuf(": ");
+}
+
+void TYqlLog::SetMaxLogLimit(ui64 limit) {
+ auto backend = TLog::ReleaseBackend();
+ TLog::ResetBackend(THolder(new TLimitedLogBackend(backend, WriteTruncMsg_, limit)));
+}
+
+void InitLogger(const TString& logType, bool startAsDaemon) {
+ NProto::TLoggingConfig config;
+ *config.AddLogDest() = CreateLogDestination(logType);
+
+ InitLogger(config, startAsDaemon);
+}
+
+void InitLogger(const NProto::TLoggingConfig& config, bool startAsDaemon) {
+ with_lock(g_InitLoggerMutex) {
+ ++g_LoggerInitialized;
+ if (g_LoggerInitialized > 1) {
+ return;
+ }
+
+ TComponentLevels levels;
+ if (config.HasAllComponentsLevel()) {
+ levels.fill(ConvertLevel(config.GetAllComponentsLevel()));
+ } else {
+ levels.fill(ELevel::INFO);
+ }
+
+ for (const auto& cmpLevel: config.GetLevels()) {
+ auto component = ConvertComponent(cmpLevel.GetC());
+ auto level = ConvertLevel(cmpLevel.GetL());
+ levels[EComponentHelpers::ToInt(component)] = level;
+ }
+ TLoggerOperator<TYqlLog>::Set(new TYqlLog("null", levels));
+
+ std::vector<THolder<TLogBackend>> backends;
+
+ // Set stderr log destination if none was described in config
+ if (config.LogDestSize() == 0) {
+ backends.emplace_back(CreateLogBackend("cerr", LOG_MAX_PRIORITY, false));
+ }
+
+ for (const auto& logDest : config.GetLogDest()) {
+ // Generate the backend we need and temporary store it
+ switch (logDest.GetType()) {
+ case NProto::TLoggingConfig::STDERR:
+ case NProto::TLoggingConfig::STDOUT:
+ case NProto::TLoggingConfig::CONSOLE: {
+ if (!startAsDaemon) {
+ backends.emplace_back(CreateLogBackend(ConvertDestinationType(logDest.GetType()), LOG_MAX_PRIORITY, false));
+ }
+ break;
+ }
+ case NProto::TLoggingConfig::FILE: {
+ backends.emplace_back(CreateLogBackend(logDest.GetTarget(), LOG_MAX_PRIORITY, false));
+ break;
+ }
+ case NProto::TLoggingConfig::SYSLOG: {
+ backends.emplace_back(MakeHolder<TSysLogBackend>(GetProgramName().data(), TSysLogBackend::TSYSLOG_LOCAL1));
+ break;
+ }
+ default: {
+ break;
+ }
+ }
+ }
+
+ // Combine created backends and set them for logger
+ auto& logger = TLoggerOperator<TYqlLog>::Log();
+ if (backends.size() == 1) {
+ logger.ResetBackend(std::move(backends[0]));
+ } else if (backends.size() > 1) {
+ THolder<TCompositeLogBackend> compositeBackend = MakeHolder<TCompositeLogBackend>();
+ for (auto& backend : backends) {
+ compositeBackend->AddLogBackend(std::move(backend));
+ }
+ logger.ResetBackend(std::move(compositeBackend));
+ }
+ }
+}
+
+void InitLogger(TAutoPtr<TLogBackend> backend) {
+ with_lock(g_InitLoggerMutex) {
+ ++g_LoggerInitialized;
+ if (g_LoggerInitialized > 1) {
+ return;
+ }
+
+ TComponentLevels levels;
+ levels.fill(ELevel::INFO);
+ TLoggerOperator<TYqlLog>::Set(new TYqlLog(backend, levels));
+ }
+}
+
+void InitLogger(IOutputStream* out) {
+ InitLogger(new TStreamLogBackend(out));
+}
+
+void CleanupLogger() {
+ with_lock(g_InitLoggerMutex) {
+ --g_LoggerInitialized;
+ if (g_LoggerInitialized > 0) {
+ return;
+ }
+
+ TLoggerOperator<TYqlLog>::Set(new TYqlLog());
+ }
+}
+
+void ReopenLog() {
+ with_lock(g_InitLoggerMutex) {
+ TLoggerOperator<TYqlLog>::Log().ReopenLog();
+ }
+}
+
+} // namespace NLog
+} // namespace NYql
+
+/**
+ * creates default YQL logger writing to /dev/null
+ */
+template <>
+NYql::NLog::TYqlLog* CreateDefaultLogger<NYql::NLog::TYqlLog>() {
+ NYql::NLog::TComponentLevels levels;
+ levels.fill(NYql::NLog::ELevel::INFO);
+ return new NYql::NLog::TYqlLog("null", levels);
+}
diff --git a/yql/essentials/utils/log/log.h b/yql/essentials/utils/log/log.h
new file mode 100644
index 0000000000..aecf8e39dc
--- /dev/null
+++ b/yql/essentials/utils/log/log.h
@@ -0,0 +1,195 @@
+#pragma once
+
+#include "log_component.h"
+#include "log_level.h"
+#include "context.h"
+#include "profile.h"
+
+#include <library/cpp/logger/global/common.h>
+
+#include <library/cpp/deprecated/atomic/atomic.h>
+#include <util/stream/output.h>
+#include <util/generic/strbuf.h>
+
+#include <array>
+
+
+#define YQL_LOG_IMPL(logger, component, level, preprocessor, file, line) \
+ logger.NeedToLog(component, level) && NPrivateGlobalLogger::TEatStream() | \
+ (*preprocessor::Preprocess(logger.CreateLogElement(component, level, file, line)))
+
+#define YQL_LOG_IF_IMPL(logger, component, level, preprocessor, condition, file, line) \
+ logger.NeedToLog(component, level) && (condition) && NPrivateGlobalLogger::TEatStream() | \
+ (*preprocessor::Preprocess(logger.CreateLogElement(component, level, file, line)))
+
+// with component logger
+
+#define YQL_CLOG_PREP(level, component, preprocessor) YQL_LOG_IMPL(\
+ ::NYql::NLog::YqlLogger(), \
+ ::NYql::NLog::EComponent::component, \
+ ::NYql::NLog::ELevel::level, \
+ preprocessor, \
+ __FILE__, __LINE__)
+
+#define YQL_CLOG(level, component) \
+ YQL_CLOG_PREP(level, component, ::NYql::NLog::TContextPreprocessor)
+
+#define YQL_CLOG_ACTIVE(level, component) ::NYql::NLog::YqlLogger().NeedToLog( \
+ ::NYql::NLog::EComponent::component, \
+ ::NYql::NLog::ELevel::level)
+
+// with component/level values logger
+
+#define YQL_CVLOG_PREP(level, component, preprocessor) YQL_LOG_IMPL(\
+ ::NYql::NLog::YqlLogger(), \
+ component, \
+ level, \
+ preprocessor, \
+ __FILE__, __LINE__)
+
+#define YQL_CVLOG(level, component) \
+ YQL_CVLOG_PREP(level, component, ::NYql::NLog::TContextPreprocessor)
+
+#define YQL_CVLOG_ACTIVE(level, component) ::NYql::NLog::YqlLogger().NeedToLog( \
+ component, \
+ level)
+
+// default logger
+
+#define YQL_LOG_PREP(level, preprocessor) \
+ YQL_CLOG_PREP(level, Default, preprocessor)
+
+#define YQL_LOG(level) \
+ YQL_LOG_PREP(level, ::NYql::NLog::TContextPreprocessor)
+
+#define YQL_LOG_ACTIVE(level) YQL_CLOG_ACTIVE(level, Default)
+
+// conditional logger
+
+#define YQL_CLOG_PREP_IF(level, component, preprocessor, condition) YQL_LOG_IF_IMPL(\
+ ::NYql::NLog::YqlLogger(), \
+ ::NYql::NLog::EComponent::component, \
+ ::NYql::NLog::ELevel::level, \
+ preprocessor, \
+ condition, \
+ __FILE__, __LINE__)
+
+#define YQL_CLOG_IF(level, component, condition) \
+ YQL_CLOG_PREP_IF(level, component, ::NYql::NLog::TContextPreprocessor, condition)
+
+#define YQL_LOG_PREP_IF(level, preprocessor, condition) \
+ YQL_CLOG_PREP_IF(level, Default, preprocessor, condition)
+
+#define YQL_LOG_IF(level, condition) \
+ YQL_LOG_PREP_IF(level, ::NYql::NLog::TContextPreprocessor, condition)
+
+
+namespace NYql {
+
+namespace NProto {
+ class TLoggingConfig;
+} // NProto
+
+namespace NLog {
+
+using TComponentLevels =
+ std::array<ELevel, EComponentHelpers::ToInt(EComponent::MaxValue)>;
+
+void WriteLocalTime(IOutputStream* out);
+
+/**
+ * @brief Component based logger frontend.
+ */
+class TYqlLog: public TLog {
+public:
+ TYqlLog();
+ TYqlLog(const TString& logType, const TComponentLevels& levels);
+ TYqlLog(TAutoPtr<TLogBackend> backend, const TComponentLevels& levels);
+
+ // XXX: not thread-safe
+ void UpdateProcInfo(const TString& procName);
+
+ ELevel GetComponentLevel(EComponent component) const {
+ return ELevelHelpers::FromInt(AtomicGet(ComponentLevels_[EComponentHelpers::ToInt(component)]));
+ }
+
+ void SetComponentLevel(EComponent component, ELevel level) {
+ AtomicSet(ComponentLevels_[EComponentHelpers::ToInt(component)], ELevelHelpers::ToInt(level));
+ }
+
+ bool NeedToLog(EComponent component, ELevel level) const {
+ return ELevelHelpers::Lte(level, GetComponentLevel(component));
+ }
+
+ void SetMaxLogLimit(ui64 limit);
+
+ TAutoPtr<TLogElement> CreateLogElement(EComponent component, ELevel level, TStringBuf file, int line) const;
+
+ void WriteLogPrefix(IOutputStream* out, EComponent component, ELevel level, TStringBuf file, int line) const;
+
+private:
+ TString ProcName_;
+ pid_t ProcId_;
+ std::array<TAtomic, EComponentHelpers::ToInt(EComponent::MaxValue)> ComponentLevels_{0};
+ mutable TAtomic WriteTruncMsg_;
+};
+
+/**
+ * @brief returns reference to YQL logger instance.
+ */
+inline TYqlLog& YqlLogger() {
+ return static_cast<TYqlLog&>(TLoggerOperator<TYqlLog>::Log());
+}
+
+/**
+ * @brief returns true it YQL logger already initialized.
+ */
+inline bool IsYqlLoggerInitialized() {
+ return TLoggerOperator<TYqlLog>::Usage();
+}
+
+/**
+ * @brief Initialize logger with selected backend type.
+ *
+ * @param log - one of { syslog, console, cout, cerr, null, /path/to/file }
+ * @param startAsDaemon - true if process is demonized
+ */
+void InitLogger(const TString& log, bool startAsDaemon = false);
+
+/**
+ * @brief Initialize logger with backends described in config.
+*/
+void InitLogger(const NProto::TLoggingConfig& loggingConfig, bool startAsDaemon = false);
+
+/**
+ * @brief Initialize logger with concrete backend.
+ *
+ * @param backend - logger backend
+ */
+void InitLogger(TAutoPtr<TLogBackend> backend);
+
+/**
+ * @brief Initialize logger with concrete output stream.
+ *
+ * @param out - output stream
+ */
+void InitLogger(IOutputStream* out);
+
+void CleanupLogger();
+
+void ReopenLog();
+
+class YqlLoggerScope {
+public:
+ YqlLoggerScope(const TString& log, bool startAsDaemon = false) { InitLogger(log, startAsDaemon); }
+ YqlLoggerScope(TAutoPtr<TLogBackend> backend) { InitLogger(backend); }
+ YqlLoggerScope(IOutputStream* out) { InitLogger(out); }
+
+ ~YqlLoggerScope() { CleanupLogger(); }
+};
+
+} // namespace NLog
+} // namespace NYql
+
+template <>
+NYql::NLog::TYqlLog* CreateDefaultLogger<NYql::NLog::TYqlLog>();
diff --git a/yql/essentials/utils/log/log_component.h b/yql/essentials/utils/log/log_component.h
new file mode 100644
index 0000000000..46e366cbd8
--- /dev/null
+++ b/yql/essentials/utils/log/log_component.h
@@ -0,0 +1,130 @@
+#pragma once
+
+#include <util/generic/strbuf.h>
+#include <util/generic/yexception.h>
+
+
+namespace NYql {
+namespace NLog {
+
+// keep this enum in sync with simmilar enum from ydb/library/yql/utils/log/proto/logger_config.proto
+enum class EComponent {
+ Default = 0,
+ Core,
+ CoreExecution,
+ Sql,
+ ProviderCommon,
+ ProviderConfig,
+ ProviderResult,
+ ProviderYt,
+ ProviderKikimr,
+ ProviderKqp,
+ ProviderRtmr,
+ Performance, Perf = Performance,
+ Net,
+ ProviderStat,
+ ProviderSolomon,
+ CoreEval,
+ CorePeepHole,
+ ProviderDq,
+ ProviderClickHouse,
+ ProviderYdb,
+ ProviderPq,
+ ProviderS3,
+ CoreDq,
+ HttpGateway,
+ ProviderGeneric,
+ ProviderPg,
+ // <--- put other log components here
+ MaxValue
+};
+
+struct EComponentHelpers {
+ static constexpr int ToInt(EComponent component) {
+ return static_cast<int>(component);
+ }
+
+ static constexpr EComponent FromInt(int component) {
+ return (component >= ToInt(EComponent::Default) &&
+ component < ToInt(EComponent::MaxValue))
+ ? static_cast<EComponent>(component)
+ : EComponent::Default;
+ }
+
+ static TStringBuf ToString(EComponent component) {
+ switch (component) {
+ case EComponent::Default: return TStringBuf("default");
+ case EComponent::Core: return TStringBuf("core");
+ case EComponent::CoreEval: return TStringBuf("core eval");
+ case EComponent::CorePeepHole: return TStringBuf("core peephole");
+ case EComponent::CoreExecution: return TStringBuf("core exec");
+ case EComponent::Sql: return TStringBuf("sql");
+ case EComponent::ProviderCommon: return TStringBuf("common provider");
+ case EComponent::ProviderConfig: return TStringBuf("CONFIG");
+ case EComponent::ProviderResult: return TStringBuf("RESULT");
+ case EComponent::ProviderYt: return TStringBuf("YT");
+ case EComponent::ProviderKikimr: return TStringBuf("KIKIMR");
+ case EComponent::ProviderKqp: return TStringBuf("KQP");
+ case EComponent::ProviderRtmr: return TStringBuf("RTMR");
+ case EComponent::Performance: return TStringBuf("perf");
+ case EComponent::Net: return TStringBuf("net");
+ case EComponent::ProviderStat: return TStringBuf("STATFACE");
+ case EComponent::ProviderSolomon: return TStringBuf("SOLOMON");
+ case EComponent::ProviderDq: return TStringBuf("DQ");
+ case EComponent::ProviderClickHouse: return TStringBuf("CLICKHOUSE");
+ case EComponent::ProviderYdb: return TStringBuf("YDB");
+ case EComponent::ProviderPq: return TStringBuf("PQ");
+ case EComponent::ProviderS3: return TStringBuf("S3");
+ case EComponent::CoreDq: return TStringBuf("core dq");
+ case EComponent::HttpGateway: return TStringBuf("http gw");
+ case EComponent::ProviderGeneric: return TStringBuf("generic");
+ case EComponent::ProviderPg: return TStringBuf("PG");
+ default:
+ ythrow yexception() << "invalid log component value: "
+ << ToInt(component);
+ }
+ }
+
+ static EComponent FromString(TStringBuf str) {
+ if (str == TStringBuf("default")) return EComponent::Default;
+ if (str == TStringBuf("core")) return EComponent::Core;
+ if (str == TStringBuf("core eval")) return EComponent::CoreEval;
+ if (str == TStringBuf("core peephole")) return EComponent::CorePeepHole;
+ if (str == TStringBuf("core exec")) return EComponent::CoreExecution;
+ if (str == TStringBuf("sql")) return EComponent::Sql;
+ if (str == TStringBuf("common provider")) return EComponent::ProviderCommon;
+ if (str == TStringBuf("CONFIG")) return EComponent::ProviderConfig;
+ if (str == TStringBuf("RESULT")) return EComponent::ProviderResult;
+ if (str == TStringBuf("YT")) return EComponent::ProviderYt;
+ if (str == TStringBuf("KIKIMR")) return EComponent::ProviderKikimr;
+ if (str == TStringBuf("KQP")) return EComponent::ProviderKqp;
+ if (str == TStringBuf("RTMR")) return EComponent::ProviderRtmr;
+ if (str == TStringBuf("perf")) return EComponent::Performance;
+ if (str == TStringBuf("net")) return EComponent::Net;
+ if (str == TStringBuf("STATFACE")) return EComponent::ProviderStat;
+ if (str == TStringBuf("SOLOMON")) return EComponent::ProviderSolomon;
+ if (str == TStringBuf("DQ")) return EComponent::ProviderDq;
+ if (str == TStringBuf("CLICKHOUSE")) return EComponent::ProviderClickHouse;
+ if (str == TStringBuf("YDB")) return EComponent::ProviderYdb;
+ if (str == TStringBuf("PQ")) return EComponent::ProviderPq;
+ if (str == TStringBuf("S3")) return EComponent::ProviderS3;
+ if (str == TStringBuf("core dq")) return EComponent::CoreDq;
+ if (str == TStringBuf("http gw")) return EComponent::HttpGateway;
+ if (str == TStringBuf("generic")) return EComponent::ProviderGeneric;
+ if (str == TStringBuf("PG")) return EComponent::ProviderPg;
+ ythrow yexception() << "unknown log component: '" << str << '\'';
+ }
+
+ template <typename TFunctor>
+ static void ForEach(TFunctor&& f) {
+ static const int minValue = ToInt(EComponent::Default);
+ static const int maxValue = ToInt(EComponent::MaxValue);
+
+ for (int c = minValue; c < maxValue; c++) {
+ f(FromInt(c));
+ }
+ }
+};
+
+} // namespace NLog
+} // namespace NYql
diff --git a/yql/essentials/utils/log/log_level.h b/yql/essentials/utils/log/log_level.h
new file mode 100644
index 0000000000..ccb12e4690
--- /dev/null
+++ b/yql/essentials/utils/log/log_level.h
@@ -0,0 +1,98 @@
+#pragma once
+
+#include <library/cpp/logger/priority.h>
+
+#include <util/generic/strbuf.h>
+#include <util/generic/yexception.h>
+
+
+namespace NYql {
+namespace NLog {
+
+enum class ELevel {
+ FATAL = TLOG_EMERG,
+ ERROR = TLOG_ERR,
+ WARN = TLOG_WARNING,
+ NOTICE = TLOG_NOTICE,
+ INFO = TLOG_INFO,
+ DEBUG = TLOG_DEBUG,
+ TRACE = TLOG_RESOURCES,
+};
+
+struct ELevelHelpers {
+ static constexpr bool Lte(ELevel l1, ELevel l2) {
+ return ToInt(l1) <= ToInt(l2);
+ }
+
+ static constexpr ELogPriority ToLogPriority(ELevel level) {
+ return static_cast<ELogPriority>(ToInt(level));
+ }
+
+ static ELevel FromLogPriority(ELogPriority priority) {
+ return FromInt(static_cast<int>(priority));
+ }
+
+ static constexpr int ToInt(ELevel level) {
+ return static_cast<int>(level);
+ }
+
+ static ELevel FromInt(int level) {
+ switch (level) {
+ case TLOG_EMERG:
+ case TLOG_ALERT:
+ case TLOG_CRIT: return ELevel::FATAL;
+
+ case TLOG_ERR: return ELevel::ERROR;
+ case TLOG_WARNING: return ELevel::WARN;
+
+ case TLOG_NOTICE:
+ case TLOG_INFO: return ELevel::INFO;
+
+ case TLOG_DEBUG: return ELevel::DEBUG;
+ case TLOG_RESOURCES: return ELevel::TRACE;
+
+ default:
+ return ELevel::INFO;
+ }
+ }
+
+ static TStringBuf ToString(ELevel level) {
+ // aligned 5-letters string
+ switch (level) {
+ case ELevel::FATAL: return TStringBuf("FATAL");
+ case ELevel::ERROR: return TStringBuf("ERROR");
+ case ELevel::WARN: return TStringBuf("WARN ");
+ case ELevel::NOTICE:return TStringBuf("NOTE ");
+ case ELevel::INFO: return TStringBuf("INFO ");
+ case ELevel::DEBUG: return TStringBuf("DEBUG");
+ case ELevel::TRACE: return TStringBuf("TRACE");
+ }
+ ythrow yexception() << "unknown log level: " << ToInt(level);
+ }
+
+ static ELevel FromString(TStringBuf str) {
+ // aligned 5-letters string
+ if (str == TStringBuf("FATAL")) return ELevel::FATAL;
+ if (str == TStringBuf("ERROR")) return ELevel::ERROR;
+ if (str == TStringBuf("WARN ")) return ELevel::WARN;
+ if (str == TStringBuf("NOTE ")) return ELevel::NOTICE;
+ if (str == TStringBuf("INFO ")) return ELevel::INFO;
+ if (str == TStringBuf("DEBUG")) return ELevel::DEBUG;
+ if (str == TStringBuf("TRACE")) return ELevel::TRACE;
+ ythrow yexception() << "unknown log level: " << str;
+ }
+
+ template <typename TFunctor>
+ static void ForEach(TFunctor&& f) {
+ static const int minValue = ToInt(ELevel::FATAL);
+ static const int maxValue = ToInt(ELevel::TRACE);
+
+ for (int l = minValue; l <= maxValue; l++) {
+ f(FromInt(l));
+ }
+ }
+};
+
+
+} // namspace NLog
+} // namspace NYql
diff --git a/yql/essentials/utils/log/log_ut.cpp b/yql/essentials/utils/log/log_ut.cpp
new file mode 100644
index 0000000000..d77fbea761
--- /dev/null
+++ b/yql/essentials/utils/log/log_ut.cpp
@@ -0,0 +1,667 @@
+#include "log.h"
+#include "context.h"
+#include "profile.h"
+#include <yql/essentials/utils/log/ut/log_parser.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/logger/stream.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/yexception.h>
+#include <util/system/getpid.h>
+#include <util/string/split.h>
+#include <util/string/cast.h>
+#include <util/string/subst.h>
+
+#include <regex>
+
+
+using namespace NYql;
+using namespace NLog;
+
+Y_UNIT_TEST_SUITE(TLogTest)
+{
+ Y_UNIT_TEST(Format) {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+ YqlLogger().UpdateProcInfo("my_proc");
+
+ TString message = "some performance info";
+ YQL_LOG(INFO) << message;
+
+ TLogRow logRow = ParseLogRow(out.Str());
+
+ TDuration elapsed(logRow.Time - TInstant::Now());
+ UNIT_ASSERT(elapsed < TDuration::MilliSeconds(5));
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.ProcName, "my_proc");
+ UNIT_ASSERT_EQUAL(logRow.ProcId, GetPID());
+ UNIT_ASSERT(logRow.ThreadId > 0);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(
+ logRow.FileName,
+ TStringBuf(__FILE__).RNextTok(LOCSLASH_C));
+ UNIT_ASSERT(logRow.LineNumber != 0);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, message);
+ }
+
+ Y_UNIT_TEST(Levels) {
+ TStringStream out;
+ YqlLoggerScope logger(&out); // default log level INFO
+
+ YQL_LOG(FATAL) << "fatal message";
+ YQL_LOG(ERROR) << "error message";
+ YQL_LOG(WARN) << "warning message";
+ YQL_LOG(INFO) << "info message";
+ YQL_LOG(DEBUG) << "debug message";
+ YQL_LOG(TRACE) << "trace message";
+
+ TString fatalStr, errorStr, warnStr, infoStr, _;
+ Split(out.Str(), '\n', fatalStr, errorStr, warnStr, infoStr, _);
+
+ {
+ TLogRow logRow = ParseLogRow(fatalStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::FATAL);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "fatal message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(errorStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::ERROR);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "error message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(warnStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "warning message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(infoStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "info message");
+ }
+ }
+
+ Y_UNIT_TEST(Components) {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+
+ YQL_CLOG(INFO, Default) << "default message";
+ YQL_CLOG(INFO, Core) << "core message";
+ YQL_CLOG(INFO, Sql) << "sql message";
+ YQL_CLOG(INFO, ProviderCommon) << "common message";
+ YQL_CLOG(INFO, ProviderYt) << "yt message";
+ YQL_CLOG(INFO, ProviderKikimr) << "kikimr message";
+ YQL_CLOG(INFO, ProviderRtmr) << "rtmr message";
+ YQL_CLOG(INFO, Performance) << "performance message";
+ YQL_CLOG(INFO, Perf) << "perf message";
+
+ TString defaultStr, coreStr, sqlStr, commonStr, ytStr,
+ kikimrStr, rtmrStr, performanceStr, perfStr, _;
+ Split(out.Str(), '\n', defaultStr, coreStr, sqlStr,
+ commonStr, ytStr,
+ kikimrStr, rtmrStr,
+ performanceStr, perfStr, _);
+
+ {
+ TLogRow logRow = ParseLogRow(defaultStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "default message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(coreStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "core message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(sqlStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Sql);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "sql message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(commonStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::ProviderCommon);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "common message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(ytStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::ProviderYt);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "yt message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(kikimrStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::ProviderKikimr);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "kikimr message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(rtmrStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::ProviderRtmr);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "rtmr message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(performanceStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "performance message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(perfStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "perf message");
+ }
+ }
+
+ Y_UNIT_TEST(Conditional) {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+
+ YQL_LOG_IF(INFO, true) << "default info message";
+ YQL_LOG_IF(INFO, false) << "must not be logged";
+
+ YQL_CLOG_IF(INFO, Perf, true) << "perf info message";
+ YQL_CLOG_IF(INFO, Perf, false) << "perf info message";
+
+ TString defaultStr, perfStr, _;
+ Split(out.Str(), '\n', defaultStr, perfStr, _);
+
+ {
+ TLogRow logRow = ParseLogRow(defaultStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "default info message");
+ }
+ {
+ TLogRow logRow = ParseLogRow(perfStr);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "perf info message");
+ }
+ }
+
+ Y_UNIT_TEST(Contexts) {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "");
+ YQL_LOG(INFO) << "level0 - begin";
+ {
+ YQL_LOG_CTX_SCOPE("ctx1");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1");
+ YQL_LOG(INFO) << "level1 - begin";
+
+ YQL_LOG_CTX_BLOCK(TStringBuf("ctx2")) {
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1/ctx2");
+ YQL_LOG(WARN) << "level2";
+ }
+
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1");
+ YQL_LOG(INFO) << "level1 - end";
+ }
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "");
+ YQL_LOG(INFO) << "level0 - end";
+
+ TString row1Str, row2Str, row3Str, row4Str, row5Str, _;
+ Split(out.Str(), '\n', row1Str, row2Str, row3Str, row4Str, row5Str, _);
+
+ {
+ TLogRow logRow = ParseLogRow(row1Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "level0 - begin");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row2Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1} level1 - begin");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row3Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1/ctx2} level2");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row4Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1} level1 - end");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row5Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "level0 - end");
+ }
+ }
+
+ Y_UNIT_TEST(UnknownSessionContexts) {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+
+ {
+ YQL_LOG_CTX_ROOT_SCOPE("ctx");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx");
+ YQL_LOG(INFO) << "level0 - begin";
+
+ {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(CurrentLogContextPath());
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx");
+
+ YQL_LOG(INFO) << "level1 - begin";
+ YQL_LOG_CTX_BLOCK("ctx1") {
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx/ctx1");
+
+ YQL_LOG(WARN) << "level2";
+ }
+
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx");
+ YQL_LOG(INFO) << "level1 - end";
+ }
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx");
+ YQL_LOG(INFO) << "level0 - end";
+ }
+
+ TString row1Str, row2Str, row3Str, row4Str, row5Str, _;
+ Split(out.Str(), '\n', row1Str, row2Str, row3Str, row4Str, row5Str, _);
+ {
+ TLogRow logRow = ParseLogRow(row1Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx} level0 - begin");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row2Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx} level1 - begin");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row3Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx/ctx1} level2");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row4Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx} level1 - end");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row5Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx} level0 - end");
+ }
+ }
+
+ Y_UNIT_TEST(SessionContexts) {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+
+ {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE("sessionId", "ctx");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "sessionId");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx");
+ YQL_LOG(INFO) << "level0 - begin";
+
+ {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(CurrentLogContextPath());
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "sessionId");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx");
+
+ YQL_LOG(INFO) << "level1 - begin";
+ YQL_LOG_CTX_BLOCK("ctx1") {
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "sessionId");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx/ctx1");
+
+ YQL_LOG(WARN) << "level2";
+ }
+
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "sessionId");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx");
+ YQL_LOG(INFO) << "level1 - end";
+ }
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().first, "sessionId");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx");
+ YQL_LOG(INFO) << "level0 - end";
+ }
+
+ TString row1Str, row2Str, row3Str, row4Str, row5Str, _;
+ Split(out.Str(), '\n', row1Str, row2Str, row3Str, row4Str, row5Str, _);
+ {
+ TLogRow logRow = ParseLogRow(row1Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{sessionId/ctx} level0 - begin");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row2Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{sessionId/ctx} level1 - begin");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row3Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{sessionId/ctx/ctx1} level2");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row4Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{sessionId/ctx} level1 - end");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row5Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{sessionId/ctx} level0 - end");
+ }
+ }
+
+ Y_UNIT_TEST(ThrowWithContext) {
+ bool isThrown = false;
+ YQL_LOG_CTX_SCOPE("first");
+ try {
+ YQL_LOG_CTX_SCOPE("second");
+ YQL_LOG_CTX_THROW yexception() << "some message";
+ } catch (const yexception& e) {
+ isThrown = true;
+
+ UNIT_ASSERT_STRINGS_EQUAL(e.AsStrBuf(), "some message");
+
+ TString throwedLogCtx = ThrowedLogContextPath();
+ TStringBuf file, line, context;
+ TStringBuf(throwedLogCtx).Split(".cpp:", file, line);
+ line.Split(':', line, context);
+
+ TString expectedFile(__LOCATION__.File);
+ SubstGlobal(expectedFile, LOCSLASH_C, '/');
+ UNIT_ASSERT_STRINGS_EQUAL(TString(file)+".cpp", expectedFile);
+ int lineNumber;
+ UNIT_ASSERT(TryFromString<int>(line, lineNumber));
+ UNIT_ASSERT(lineNumber > 0);
+ UNIT_ASSERT_STRINGS_EQUAL(context, " {first/second} ");
+
+ // second call without throw returns empty string
+ throwedLogCtx = ThrowedLogContextPath();
+ UNIT_ASSERT(throwedLogCtx.empty());
+ }
+
+ UNIT_ASSERT_C(isThrown, "exception was not thrown");
+ }
+
+ Y_UNIT_TEST(ContextOverride) {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "");
+ {
+ YQL_LOG_CTX_SCOPE("ctx1");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1");
+ YQL_LOG(INFO) << "level1 - begin";
+
+ YQL_LOG_CTX_BLOCK(TStringBuf("ctx2")) {
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1/ctx2");
+ YQL_LOG(WARN) << "level2 - begin";
+
+ {
+ YQL_LOG_CTX_ROOT_SCOPE("ctx3");
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx3");
+ YQL_LOG(ERROR) << "level3";
+ }
+
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1/ctx2");
+ YQL_LOG(WARN) << "level2 - end";
+ }
+
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "ctx1");
+ YQL_LOG(INFO) << "level1 - end";
+ }
+ UNIT_ASSERT_STRINGS_EQUAL(CurrentLogContextPath().second, "");
+
+ TString row1Str, row2Str, row3Str, row4Str, row5Str, _;
+ Split(out.Str(), '\n', row1Str, row2Str, row3Str, row4Str, row5Str, _);
+
+ {
+ TLogRow logRow = ParseLogRow(row1Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1} level1 - begin");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row2Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1/ctx2} level2 - begin");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row3Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::ERROR);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx3} level3");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row4Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1/ctx2} level2 - end");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row5Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "{ctx1} level1 - end");
+ }
+ }
+
+ Y_UNIT_TEST(Profiling) {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+
+ {
+ YQL_PROFILE_SCOPE(INFO, "scope1");
+ }
+
+ YQL_PROFILE_BLOCK(WARN, "block1") {
+ Sleep(TDuration::MilliSeconds(2));
+ }
+
+ YQL_PROFILE_BLOCK(ERROR, "block2") {
+ Sleep(TDuration::MilliSeconds(1200));
+ }
+
+ bool isExecuted = false;
+ YQL_PROFILE_BLOCK(TRACE, "block3") { // log will be filtered out
+ isExecuted = true;
+ }
+ UNIT_ASSERT(isExecuted);
+
+ TString row1Str, row2Str, row3Str, _;
+ Split(out.Str(), '\n', row1Str, row2Str, row3Str, _);
+
+ {
+ TLogRow logRow = ParseLogRow(row1Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance);
+ std::regex re("Execution of \\[scope1\\] took [0-9\\.]+us");
+ bool isMatch = std::regex_match(logRow.Message.c_str(), re);
+ UNIT_ASSERT_C(isMatch, "Unexpected message: " << logRow.Message);
+ }
+ {
+ TLogRow logRow = ParseLogRow(row2Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance);
+ std::regex re("Execution of \\[block1\\] took [0-9\\.]+ms");
+ bool isMatch = std::regex_match(logRow.Message.c_str(), re);
+ UNIT_ASSERT_C(isMatch, "Unexpected message: " << logRow.Message);
+ }
+ {
+ TLogRow logRow = ParseLogRow(row3Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::ERROR);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance);
+ std::regex re("Execution of \\[block2\\] took [0-9\\.]+s");
+ bool isMatch = std::regex_match(logRow.Message.c_str(), re);
+ UNIT_ASSERT_C(isMatch, "Unexpected message: " << logRow.Message);
+ }
+ }
+
+
+ int Func1(int a, char b) {
+ YQL_PROFILE_FUNC(INFO);
+ return a + b;
+ }
+
+ int Func2(int a, char b) {
+ YQL_PROFILE_FUNCSIG(WARN);
+ return a + b;
+ }
+
+ Y_UNIT_TEST(ProfilingFuncs) {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+
+ Func1(1, 2);
+ Func2(1, 2);
+
+ TString row1Str, row2Str, _;
+ Split(out.Str(), '\n', row1Str, row2Str, _);
+
+ {
+ TLogRow logRow = ParseLogRow(row1Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance);
+ std::regex re("Execution of \\[Func1\\] took [0-9\\.]+us");
+ bool isMatch = std::regex_match(logRow.Message.c_str(), re);
+ UNIT_ASSERT_C(isMatch, "Unexpected message: " << logRow.Message);
+ }
+ {
+ TLogRow logRow = ParseLogRow(row2Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Perf);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Performance);
+#ifdef _win_
+ std::regex re("Execution of \\[int __cdecl NTestSuiteTLogTest::Func2\\(int, char\\)\\] took [0-9\\.]+us");
+#else
+ std::regex re("Execution of \\[int NTestSuiteTLogTest::Func2\\(int, char\\)\\] took [0-9\\.]+us");
+#endif
+ bool isMatch = std::regex_match(logRow.Message.c_str(), re);
+ UNIT_ASSERT_C(isMatch, "Unexpected message: " << logRow.Message);
+ }
+ }
+
+ Y_UNIT_TEST(Limit1) {
+ size_t limit = 0;
+ {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+ YqlLogger().UpdateProcInfo("proc");
+ YQL_CLOG(INFO, Core) << "message1";
+ limit = out.Str().length() * 2 - 7; // Not more than 2 log lines
+ }
+
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+ YqlLogger().UpdateProcInfo("proc");
+ YqlLogger().SetMaxLogLimit(limit);
+
+ YQL_CLOG(INFO, Core) << "message1";
+ YQL_CLOG(INFO, Core) << "message2";
+ YQL_CLOG(INFO, Core) << "message3";
+
+ TString row1Str, row2Str, row3Str, _;
+ Split(out.Str(), '\n', row1Str, row2Str, row3Str, _);
+
+ {
+ TLogRow logRow = ParseLogRow(row1Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "message1");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row2Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "message2");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row3Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::FATAL);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "Log is truncated by limit");
+ }
+ }
+
+ Y_UNIT_TEST(Limit2) {
+ size_t limit = 0;
+ {
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+ YqlLogger().UpdateProcInfo("proc");
+ YQL_CLOG(INFO, Core) << "message1";
+ limit = out.Str().length() * 2 - 7; // Not more than 2 log lines
+ }
+
+ TStringStream out;
+ YqlLoggerScope logger(&out);
+ YqlLogger().UpdateProcInfo("proc");
+ YqlLogger().SetMaxLogLimit(limit);
+
+ YQL_CLOG(INFO, Core) << "message1";
+ YQL_CLOG(INFO, Core) << "message2";
+ YQL_CLOG(WARN, Core) << "message3";
+
+ TString row1Str, row2Str, row3Str, row4Str, _;
+ Split(out.Str(), '\n', row1Str, row2Str, row3Str, row4Str, _);
+
+ {
+ TLogRow logRow = ParseLogRow(row1Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "message1");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row2Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "message2");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row3Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::FATAL);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "Log is truncated by limit");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row4Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::WARN);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Core);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "message3");
+ }
+ }
+}
diff --git a/yql/essentials/utils/log/profile.cpp b/yql/essentials/utils/log/profile.cpp
new file mode 100644
index 0000000000..4dd412b8f6
--- /dev/null
+++ b/yql/essentials/utils/log/profile.cpp
@@ -0,0 +1,45 @@
+#include "profile.h"
+#include "log.h"
+
+#include <util/stream/format.h>
+
+
+#define YQL_PERF_LOG(level, file, line) YQL_LOG_IMPL( \
+ ::NYql::NLog::YqlLogger(), ::NYql::NLog::EComponent::Perf, level, \
+ ::NYql::NLog::TContextPreprocessor, file, line)
+
+
+namespace NYql {
+namespace NLog {
+
+TProfilingScope::~TProfilingScope() {
+ if (Name_ == nullptr) {
+ return;
+ }
+
+ double elapsed = static_cast<double>(::MicroSeconds() - StartedAt_);
+ TStringBuf unit("us");
+ if (elapsed > 1000000) {
+ elapsed /= 1000000;
+ unit = TStringBuf("s");
+ } else if (elapsed > 1000) {
+ elapsed /= 1000;
+ unit = TStringBuf("ms");
+ }
+
+ auto doLog = [&]() {
+ YQL_PERF_LOG(Level_, File_, Line_)
+ << TStringBuf("Execution of [") << Name_
+ << TStringBuf("] took ") << Prec(elapsed, 3) << unit;
+ };
+
+ if (!LogCtxPath_.first.empty() || !LogCtxPath_.second.empty()) {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(LogCtxPath_);
+ doLog();
+ } else {
+ doLog();
+ }
+}
+
+} // namspace NLog
+} // namspace NYql
diff --git a/yql/essentials/utils/log/profile.h b/yql/essentials/utils/log/profile.h
new file mode 100644
index 0000000000..dda7e03f9d
--- /dev/null
+++ b/yql/essentials/utils/log/profile.h
@@ -0,0 +1,74 @@
+#pragma once
+
+#include "log_level.h"
+#include "context.h"
+
+#include <util/system/datetime.h>
+
+
+#define YQL_PROFILE_SCOPE(level, name) \
+ ::NYql::NLog::TProfilingScope Y_GENERATE_UNIQUE_ID(ps)( \
+ name, ::NYql::NLog::ELevel::level, __FILE__, __LINE__)
+
+#define YQL_PROFILE_BLOCK_IMPL(level, name) \
+ ::NYql::NLog::TProfilingScope( \
+ name, ::NYql::NLog::ELevel::level, __FILE__, __LINE__)
+
+#define YQL_PROFILE_SCOPE_VAL(level, name) \
+ TAutoPtr<::NYql::NLog::TProfilingScope>(new ::NYql::NLog::TProfilingScope(\
+ name, ::NYql::NLog::ELevel::level, __FILE__, __LINE__, \
+ ::NYql::NLog::CurrentLogContextPath()))
+
+#define YQL_PROFILE_BIND_VAL(future, scopeVal) \
+ future.Apply([scopeVal](const decltype(future)& f) { \
+ return f.GetValue(); \
+ });
+
+#define YQL_PROFILE_BLOCK(level, name) \
+ if (auto Y_GENERATE_UNIQUE_ID(t) = YQL_PROFILE_SCOPE_VAL(level, name)) { \
+ goto Y_CAT(YQL_LOG_CTX_LABEL, __LINE__); \
+ } else Y_CAT(YQL_LOG_CTX_LABEL, __LINE__):
+
+#define YQL_PROFILE_FUNC(level) YQL_PROFILE_SCOPE(level, __FUNCTION__)
+#define YQL_PROFILE_FUNCSIG(level) YQL_PROFILE_SCOPE(level, Y_FUNC_SIGNATURE)
+
+#define YQL_PROFILE_FUNC_VAL(level) YQL_PROFILE_SCOPE_VAL(level, __FUNCTION__)
+#define YQL_PROFILE_FUNCSIG_VAL(level) YQL_PROFILE_SCOPE_VAL(level, Y_FUNC_SIGNATURE)
+
+
+namespace NYql {
+namespace NLog {
+
+/**
+ * @brief Adds elapsed execution time to log when goes outside of scope.
+ */
+class TProfilingScope {
+public:
+ TProfilingScope(const char* name, ELevel level, const char* file, int line,
+ std::pair<TString, TString> logCtxPath = std::make_pair(TString(), TString()))
+ : Name_(name)
+ , Level_(level)
+ , File_(file)
+ , Line_(line)
+ , StartedAt_(::MicroSeconds())
+ , LogCtxPath_(std::move(logCtxPath))
+ {
+ }
+
+ ~TProfilingScope();
+
+ explicit inline operator bool() const noexcept {
+ return true;
+ }
+
+private:
+ const char* Name_;
+ ELevel Level_;
+ const char* File_;
+ int Line_;
+ ui64 StartedAt_;
+ std::pair<TString, TString> LogCtxPath_;
+};
+
+} // namspace NLog
+} // namspace NYql
diff --git a/yql/essentials/utils/log/proto/logger_config.proto b/yql/essentials/utils/log/proto/logger_config.proto
new file mode 100644
index 0000000000..29aebf4088
--- /dev/null
+++ b/yql/essentials/utils/log/proto/logger_config.proto
@@ -0,0 +1,65 @@
+package NYql.NProto;
+option java_package = "ru.yandex.yql.proto";
+
+message TLoggingConfig {
+ enum ELogTo {
+ STDERR = 1;
+ STDOUT = 2;
+ CONSOLE = 3;
+ FILE = 4;
+ SYSLOG = 5;
+ YQL_UA_LOGGER = 6;
+ }
+
+ enum ELevel {
+ FATAL = 0;
+ ERROR = 1;
+ WARN = 2;
+ INFO = 3;
+ DEBUG = 4;
+ TRACE = 5;
+ }
+
+ enum EComponent {
+ DEFAULT = 0;
+ CORE = 1;
+ CORE_EXECUTION = 2;
+ SQL = 3;
+ PROVIDER_COMMON = 4;
+ PROVIDER_CONFIG = 5;
+ PROVIDER_RESULT = 6;
+ PROVIDER_YT = 7;
+ PROVIDER_KIKIMR = 8;
+ PROVIDER_KQP = 9;
+ PROVIDER_RTMR = 10;
+ PERFORMANCE = 11;
+ NET = 12;
+ PROVIDER_STAT = 13;
+ PROVIDER_SOLOMON = 14;
+ CORE_EVAL = 15;
+ CORE_PEEPHOLE = 16;
+ PROVIDER_DQ = 17;
+ PROVIDER_CLICKHOUSE = 18;
+ PROVIDER_YDB = 19;
+ PROVIDER_PQ = 20;
+ PROVIDER_S3 = 21;
+ CORE_DQ = 22;
+ HTTP_GATEWAY = 23;
+ PROVIDER_GENERIC = 24;
+ PROVIDER_PG = 25;
+ }
+
+ message TComponentLevel {
+ optional EComponent C = 1;
+ optional ELevel L = 2;
+ }
+
+ message TLogDestination {
+ optional ELogTo Type = 1;
+ optional string Target = 2;
+ }
+
+ repeated TLogDestination LogDest = 2; // If none is passed InitLogger sets default STDERR backend
+ repeated TComponentLevel Levels = 3;
+ optional ELevel AllComponentsLevel = 4 [default = INFO];
+}
diff --git a/yql/essentials/utils/log/proto/ya.make b/yql/essentials/utils/log/proto/ya.make
new file mode 100644
index 0000000000..f251cf71c5
--- /dev/null
+++ b/yql/essentials/utils/log/proto/ya.make
@@ -0,0 +1,11 @@
+PROTO_LIBRARY()
+
+SRCS(
+ logger_config.proto
+)
+
+IF (NOT PY_PROTOS_FOR)
+ EXCLUDE_TAGS(GO_PROTO)
+ENDIF()
+
+END()
diff --git a/yql/essentials/utils/log/tls_backend.cpp b/yql/essentials/utils/log/tls_backend.cpp
new file mode 100644
index 0000000000..a92f123c9b
--- /dev/null
+++ b/yql/essentials/utils/log/tls_backend.cpp
@@ -0,0 +1,49 @@
+#include "tls_backend.h"
+
+#include <util/system/tls.h>
+
+
+namespace NYql {
+namespace NLog {
+namespace {
+
+Y_POD_STATIC_THREAD(TLogBackend*) CurrentBackend;
+
+} // namspace
+
+TLogBackend* SetLogBackendForCurrentThread(TLogBackend* backend) {
+ TLogBackend* prev = *(&CurrentBackend);
+ *(&CurrentBackend) = backend;
+ return prev;
+}
+
+void TTlsLogBackend::WriteData(const TLogRecord& rec) {
+ TLogBackend* backend = *(&CurrentBackend);
+ if (backend) {
+ backend->WriteData(rec);
+ } else {
+ DefaultBackend_->WriteData(rec);
+ }
+}
+
+void TTlsLogBackend::ReopenLog() {
+ TLogBackend* backend = *(&CurrentBackend);
+ if (backend) {
+ backend->ReopenLog();
+ } else {
+ DefaultBackend_->ReopenLog();
+ }
+}
+
+ELogPriority TTlsLogBackend::FiltrationLevel() const {
+ TLogBackend* backend = *(&CurrentBackend);
+ if (backend) {
+ return backend->FiltrationLevel();
+ } else {
+ return DefaultBackend_->FiltrationLevel();
+ }
+ return LOG_MAX_PRIORITY;
+}
+
+} // namspace NLog
+} // namspace NYql
diff --git a/yql/essentials/utils/log/tls_backend.h b/yql/essentials/utils/log/tls_backend.h
new file mode 100644
index 0000000000..802a73aae9
--- /dev/null
+++ b/yql/essentials/utils/log/tls_backend.h
@@ -0,0 +1,67 @@
+#pragma once
+
+#include <library/cpp/logger/backend.h>
+
+#include <util/generic/ptr.h>
+
+#include <utility>
+
+
+namespace NYql {
+namespace NLog {
+
+/**
+ * @brief Dispatches all invocations to default logger backend configured
+ * for current thread. Must be used in conjunction with
+ * SetLogBackendForCurrentThread() function or TScopedBackend class.
+ */
+class TTlsLogBackend: public TLogBackend {
+public:
+ TTlsLogBackend(TAutoPtr<TLogBackend> defaultBackend)
+ : DefaultBackend_(defaultBackend)
+ {
+ Y_DEBUG_ABORT_UNLESS(DefaultBackend_, "default backend is not set");
+ }
+
+ void WriteData(const TLogRecord& rec) override;
+ void ReopenLog() override;
+ ELogPriority FiltrationLevel() const override;
+
+private:
+ TAutoPtr<TLogBackend> DefaultBackend_;
+};
+
+/**
+ * @brief Sets given backend as default for current thread. Must be used in
+ * conjunction with TTlsLogBackend.
+ *
+ * @param backend - pointer to logger backend
+ * @return previous default logger backend
+ */
+TLogBackend* SetLogBackendForCurrentThread(TLogBackend* backend);
+
+/**
+ * @brief Sets itself as default for current thread on instantiation
+ * and restores previous one on destruction. Must be used in
+ * conjunction with TTlsLogBackend.
+ */
+template <typename TBackend>
+class TScopedBackend: public TBackend {
+public:
+ template <typename... TArgs>
+ TScopedBackend(TArgs&&... args)
+ : TBackend(std::forward<TArgs>(args)...)
+ , PrevBacked_(SetLogBackendForCurrentThread(this))
+ {
+ }
+
+ ~TScopedBackend() {
+ SetLogBackendForCurrentThread(PrevBacked_);
+ }
+
+private:
+ TLogBackend* PrevBacked_;
+};
+
+} // namspace NLog
+} // namspace NYql
diff --git a/yql/essentials/utils/log/tls_backend_ut.cpp b/yql/essentials/utils/log/tls_backend_ut.cpp
new file mode 100644
index 0000000000..03e4684d78
--- /dev/null
+++ b/yql/essentials/utils/log/tls_backend_ut.cpp
@@ -0,0 +1,121 @@
+#include "tls_backend.h"
+#include "log.h"
+#include <yql/essentials/utils/log/ut/log_parser.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/logger/stream.h>
+#include <library/cpp/logger/null.h>
+
+#include <util/system/thread.h>
+#include <util/string/split.h>
+
+#include <thread>
+#include <chrono>
+
+
+using namespace NYql;
+using namespace NLog;
+
+class TRunnable {
+public:
+ TRunnable(TStringBuf name, int count)
+ : Name_(name)
+ , Count_(count)
+ {
+ }
+
+ void operator()() {
+ using namespace std::chrono_literals;
+
+ YQL_LOG(INFO) << "this message will be missed";
+ {
+ TScopedBackend<TStreamLogBackend> logBackend(&Logs_);
+ for (int i = 0; i < Count_; i++) {
+ YQL_LOG(INFO) << Name_;
+ std::this_thread::sleep_for(20ms);
+ }
+ }
+ YQL_LOG(INFO) << "this message will be missed";
+ }
+
+ const TString& GetLogs() const {
+ return Logs_.Str();
+ }
+
+private:
+ TString Name_;
+ int Count_;
+ TStringStream Logs_;
+};
+
+Y_UNIT_TEST_SUITE(TTlsLogBackendTest)
+{
+ Y_UNIT_TEST(CaptureOutputs) {
+ YqlLoggerScope logger(new TTlsLogBackend(new TNullLogBackend));
+
+ YQL_LOG(INFO) << "this message will be missed";
+
+ TRunnable r1("t1", 3);
+ std::thread t1(std::ref(r1));
+
+ TRunnable r2("t2", 2);
+ std::thread t2(std::ref(r2));
+
+ t1.join();
+ t2.join();
+
+// Cout << "--[t1 logs]-----------------\n" << r1.GetLogs() << Endl;
+// Cout << "--[t2 logs]-----------------\n" << r2.GetLogs() << Endl;
+
+ { // t1
+ TString row1Str, row2Str, row3Str, _;
+ Split(r1.GetLogs(), '\n', row1Str, row2Str, row3Str, _);
+
+ ui64 threadId = 0;
+ {
+ TLogRow logRow = ParseLogRow(row1Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT(logRow.ThreadId > 0);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "t1");
+ threadId = logRow.ThreadId;
+ }
+ {
+ TLogRow logRow = ParseLogRow(row2Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_EQUAL(logRow.ThreadId, threadId);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "t1");
+ }
+ {
+ TLogRow logRow = ParseLogRow(row3Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_EQUAL(logRow.ThreadId, threadId);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "t1");
+ }
+ }
+
+ { // t2
+ TString row1Str, row2Str, _;
+ Split(r2.GetLogs(), '\n', row1Str, row2Str, _);
+
+ ui64 threadId = 0;
+ {
+ TLogRow logRow = ParseLogRow(row1Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT(logRow.ThreadId > 0);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "t2");
+ threadId = logRow.ThreadId;
+ }
+ {
+ TLogRow logRow = ParseLogRow(row2Str);
+ UNIT_ASSERT_EQUAL(logRow.Level, ELevel::INFO);
+ UNIT_ASSERT_EQUAL(logRow.Component, EComponent::Default);
+ UNIT_ASSERT_EQUAL(logRow.ThreadId, threadId);
+ UNIT_ASSERT_STRINGS_EQUAL(logRow.Message, "t2");
+ }
+ }
+ }
+}
diff --git a/yql/essentials/utils/log/ut/log_parser.h b/yql/essentials/utils/log/ut/log_parser.h
new file mode 100644
index 0000000000..87cf52ed8b
--- /dev/null
+++ b/yql/essentials/utils/log/ut/log_parser.h
@@ -0,0 +1,62 @@
+#pragma once
+
+#include <yql/essentials/utils/log/log.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/datetime/base.h>
+
+#include <regex>
+
+
+namespace NYql {
+namespace NLog {
+
+struct TLogRow {
+ TInstant Time;
+ ELevel Level;
+ TString ProcName;
+ pid_t ProcId;
+ ui64 ThreadId;
+ EComponent Component;
+ TString FileName;
+ ui32 LineNumber;
+ TString Message;
+};
+
+static TLogRow ParseLogRow(const TString& str) {
+ static std::regex rowRe(
+ "^([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\\.[0-9]{3}) " // (1) time
+ "([A-Z ]{5}) " // (2) level
+ "([a-zA-Z0-9_\\.-]+)" // (3) process name
+ ".pid=([0-9]+)," // (4) process id
+ " tid=(0?x?[0-9a-fA-F]+). " // (5) thread id
+ ".([a-zA-Z0-9_\\. ]+). " // (6) component name
+ "([^:]+):" // (7) file name
+ "([0-9]+): " // (8) line number
+ "([^\n]*)\n?$" // (9) message
+ , std::regex_constants::extended);
+
+ std::cmatch match;
+ bool isMatch = std::regex_match(str.c_str(), match, rowRe);
+
+ UNIT_ASSERT_C(isMatch, "log row does not match format: '" << str << '\'');
+ UNIT_ASSERT_EQUAL_C(match.size(), 10, "expected 10 groups in log row: '" << str << '\'');
+
+ TLogRow logRow;
+ logRow.Time = TInstant::ParseIso8601(match[1].str()) - TDuration::Hours(4);
+ logRow.Level = ELevelHelpers::FromString(match[2].str());
+ logRow.ProcName = match[3].str();
+ logRow.ProcId = FromString<pid_t>(match[4].str());
+ logRow.ThreadId = match[5].str().substr(0, 2) == "0x" ?
+ IntFromString<ui64, 16, TStringBuf>(match[5].str().substr(2)) :
+ IntFromString<ui64, 10, TStringBuf>(match[5].str());
+ logRow.Component = EComponentHelpers::FromString(match[6].str());
+ logRow.FileName = match[7].str();
+ logRow.LineNumber = FromString<ui32>(match[8].str());
+ logRow.Message = match[9].str();
+ return logRow;
+}
+
+} // namspace NLog
+} // namspace NYql
diff --git a/yql/essentials/utils/log/ut/ya.make b/yql/essentials/utils/log/ut/ya.make
new file mode 100644
index 0000000000..15b6d07d08
--- /dev/null
+++ b/yql/essentials/utils/log/ut/ya.make
@@ -0,0 +1,8 @@
+UNITTEST_FOR(yql/essentials/utils/log)
+
+SRCS(
+ log_ut.cpp
+ tls_backend_ut.cpp
+)
+
+END()
diff --git a/yql/essentials/utils/log/ya.make b/yql/essentials/utils/log/ya.make
new file mode 100644
index 0000000000..b4c97af67c
--- /dev/null
+++ b/yql/essentials/utils/log/ya.make
@@ -0,0 +1,22 @@
+LIBRARY()
+
+SRCS(
+ context.cpp
+ log.cpp
+ profile.cpp
+ tls_backend.cpp
+)
+
+PEERDIR(
+ contrib/libs/protobuf
+ library/cpp/logger
+ library/cpp/logger/global
+ library/cpp/deprecated/atomic
+ yql/essentials/utils/log/proto
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yql/essentials/utils/md5_stream.cpp b/yql/essentials/utils/md5_stream.cpp
new file mode 100644
index 0000000000..8a1c219164
--- /dev/null
+++ b/yql/essentials/utils/md5_stream.cpp
@@ -0,0 +1,20 @@
+#include "md5_stream.h"
+
+namespace NYql {
+
+TMd5OutputStream::TMd5OutputStream(IOutputStream& delegatee)
+ : Delegatee_(delegatee)
+{
+}
+
+TString TMd5OutputStream::Finalize() {
+ char buf[33] = { 0 };
+ return TString(Accumulator_.End(buf));
+}
+
+void TMd5OutputStream::DoWrite(const void* buf, size_t len) {
+ Delegatee_.Write(buf, len);
+ Accumulator_.Update(buf, len);
+}
+
+}
diff --git a/yql/essentials/utils/md5_stream.h b/yql/essentials/utils/md5_stream.h
new file mode 100644
index 0000000000..ca8b1c4bb9
--- /dev/null
+++ b/yql/essentials/utils/md5_stream.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <util/stream/output.h>
+#include <library/cpp/digest/md5/md5.h>
+
+namespace NYql {
+class TMd5OutputStream : public IOutputStream {
+public:
+ explicit TMd5OutputStream(IOutputStream& delegatee);
+ TString Finalize();
+
+private:
+ void DoWrite(const void* buf, size_t len) override;
+
+private:
+ IOutputStream& Delegatee_;
+ MD5 Accumulator_;
+};
+}
diff --git a/yql/essentials/utils/md5_stream_ut.cpp b/yql/essentials/utils/md5_stream_ut.cpp
new file mode 100644
index 0000000000..1d04c632d4
--- /dev/null
+++ b/yql/essentials/utils/md5_stream_ut.cpp
@@ -0,0 +1,47 @@
+#include "md5_stream.h"
+#include <util/stream/input.h>
+#include <util/stream/str.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NYql;
+
+namespace {
+TString Consume(const TString& input) {
+ TStringInput s(input);
+
+ TString output;
+ TStringOutput outputStream(output);
+ TMd5OutputStream md5Stream(outputStream);
+
+ UNIT_ASSERT_VALUES_EQUAL(input.size(), TransferData(&s, &md5Stream));
+ UNIT_ASSERT_VALUES_EQUAL(input, output);
+ return md5Stream.Finalize();
+}
+}
+
+Y_UNIT_TEST_SUITE(TStreamMd5Tests) {
+ Y_UNIT_TEST(Empty) {
+ const auto md5 = Consume("");
+ const TString emptyStringMd5 = "d41d8cd98f00b204e9800998ecf8427e";
+ UNIT_ASSERT_VALUES_EQUAL(md5, emptyStringMd5);
+ }
+
+ Y_UNIT_TEST(ShortText) {
+ const auto md5 = Consume("hello from Y!");
+ const TString expectedMd5 = "abf59ed7b0daa71085e76e461a737cc2";
+ UNIT_ASSERT_VALUES_EQUAL(md5, expectedMd5);
+ }
+
+ Y_UNIT_TEST(BigText) {
+ // TransferData uses TempBuf of 64K
+ const TString s(1000000, 'A');
+ const auto md5 = Consume(s.c_str());
+ /*
+ $ for i in {1..1000000};do echo -n A >> 1M.txt;done
+ $ md5sum 1M.txt
+ 48fcdb8b87ce8ef779774199a856091d 1M.txt
+ */
+ const TString expectedMd5 = "48fcdb8b87ce8ef779774199a856091d";
+ UNIT_ASSERT_VALUES_EQUAL(md5, expectedMd5);
+ }
+}
diff --git a/yql/essentials/utils/method_index.cpp b/yql/essentials/utils/method_index.cpp
new file mode 100644
index 0000000000..030dbc3111
--- /dev/null
+++ b/yql/essentials/utils/method_index.cpp
@@ -0,0 +1,44 @@
+#include "method_index.h"
+#include <util/generic/yexception.h>
+#include <util/string/hex.h>
+
+namespace NYql {
+
+size_t GetMethodPtrIndex(uintptr_t ptr) {
+#ifdef _win_
+ size_t offset;
+ if (memcmp((void*)ptr, "\x48\x8B\x01\xFF", 4) == 0) {
+ if (*(ui8*)(ptr + 4) == 0x60) {
+ offset = *(ui8*)(ptr + 5);
+ } else if (*(ui8*)(ptr + 4) == 0xa0) {
+ offset = *(ui32*)(ptr + 5);
+ } else {
+ ythrow yexception() << "Unsupported code: " << HexEncode((char*)ptr + 4, 1);
+ }
+ } else if (memcmp((void*)ptr, "\x50\x48\x89\x0c\x24\x48\x8b\x0c\x24\x48\x8b\x01\x48\x8b", 14) == 0) {
+ if (*(ui8*)(ptr + 14) == 0x40) {
+ offset = *(ui8*)(ptr + 15);
+ } else if (*(ui8*)(ptr + 14) == 0x80) {
+ offset = *(ui32*)(ptr + 15);
+ } else {
+ ythrow yexception() << "Unsupported code: " << HexEncode((char*)ptr + 14, 1);
+ }
+ } else if (memcmp((void*)ptr, "\x48\x8b\x01\x48\x8b", 5) == 0) {
+ if (*(ui8*)(ptr + 5) == 0x40) {
+ offset = *(ui8*)(ptr + 6);
+ } else if (*(ui8*)(ptr + 5) == 0x80) {
+ offset = *(ui32*)(ptr + 6);
+ } else {
+ ythrow yexception() << "Unsupported code: " << HexEncode((char*)ptr + 5, 1);
+ }
+ } else {
+ ythrow yexception() << "Unsupported code: " << HexEncode((char*)ptr, 16);
+ }
+
+ return offset / 8 + 1;
+#else
+ return ptr >> 3;
+#endif
+}
+
+}
diff --git a/yql/essentials/utils/method_index.h b/yql/essentials/utils/method_index.h
new file mode 100644
index 0000000000..6539fd3a3d
--- /dev/null
+++ b/yql/essentials/utils/method_index.h
@@ -0,0 +1,24 @@
+#pragma once
+#include <util/system/platform.h>
+#include <cstring>
+#include <cstdint>
+
+namespace NYql {
+
+size_t GetMethodPtrIndex(uintptr_t ptr);
+
+template<typename Method>
+inline size_t GetMethodIndex(Method method) {
+ uintptr_t ptr;
+ std::memcpy(&ptr, &method, sizeof(uintptr_t));
+ return GetMethodPtrIndex(ptr);
+}
+
+template<typename Method>
+inline uintptr_t GetMethodPtr(Method method) {
+ uintptr_t ptr;
+ std::memcpy(&ptr, &method, sizeof(uintptr_t));
+ return ptr;
+}
+
+} \ No newline at end of file
diff --git a/yql/essentials/utils/multi_resource_lock.cpp b/yql/essentials/utils/multi_resource_lock.cpp
new file mode 100644
index 0000000000..84b095a207
--- /dev/null
+++ b/yql/essentials/utils/multi_resource_lock.cpp
@@ -0,0 +1,42 @@
+#include "multi_resource_lock.h"
+
+using namespace NYql;
+
+TMultiResourceLock::TResourceLock TMultiResourceLock::Acquire(TString resourceId) {
+ TLock::TPtr lock = ProvideResourceLock(resourceId);
+
+ // resource-specific mutex should be locked outside of Guard_ lock
+ return { *this, std::move(lock), std::move(resourceId) };
+}
+
+TMultiResourceLock::~TMultiResourceLock() {
+ with_lock(Guard_) {
+ Y_ABORT_UNLESS(Locks_.empty(), "~TMultiResourceLock: we still have %lu unreleased locks", Locks_.size());
+ }
+}
+
+TMultiResourceLock::TLock::TPtr TMultiResourceLock::ProvideResourceLock(const TString& resourceId) {
+ with_lock(Guard_) {
+ auto it = Locks_.find(resourceId);
+ if (it == Locks_.end()) {
+ it = Locks_.emplace(resourceId, MakeIntrusive<TLock>()).first;
+ }
+
+ // important: ref count will be incremented under lock
+ // in this case we have guarantee TryCleanup will not erase this resource just after exit from this method and before entering lock->Mutex_.Acquire()
+ return it->second;
+ }
+}
+
+void TMultiResourceLock::TryCleanup(const TString& resourceId) {
+ with_lock(Guard_) {
+ auto it = Locks_.find(resourceId);
+ if (it == Locks_.end()) {
+ return;
+ }
+
+ if (it->second->IsUnique()) {
+ Locks_.erase(it);
+ }
+ }
+}
diff --git a/yql/essentials/utils/multi_resource_lock.h b/yql/essentials/utils/multi_resource_lock.h
new file mode 100644
index 0000000000..61804fa9a4
--- /dev/null
+++ b/yql/essentials/utils/multi_resource_lock.h
@@ -0,0 +1,81 @@
+#pragma once
+
+#include "yql_panic.h"
+
+#include <util/generic/map.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+#include <util/system/mutex.h>
+
+namespace NYql {
+
+class TMultiResourceLock : private TNonCopyable {
+private:
+ struct TLock : public TThrRefBase {
+ typedef TIntrusivePtr<TLock> TPtr;
+
+ bool IsUnique() const {
+ return RefCount() == 1;
+ }
+
+ TMutex Mutex_;
+ };
+
+public:
+ struct TResourceLock : private TNonCopyable {
+ TResourceLock(TMultiResourceLock& owner, TLock::TPtr lock, TString resourceId)
+ : Owner_(owner)
+ , Lock_(std::move(lock))
+ , ResourceId_(std::move(resourceId))
+ {
+ Y_ENSURE(Lock_);
+ Lock_->Mutex_.Acquire();
+ }
+
+ TResourceLock(TResourceLock&& other)
+ : Owner_(other.Owner_)
+ , Lock_(std::move(other.Lock_))
+ , ResourceId_(std::move(other.ResourceId_))
+ {
+
+ }
+
+ TResourceLock& operator=(TResourceLock&&) = delete;
+
+ ~TResourceLock() {
+ if (!Lock_) {
+ return;
+ }
+
+ Lock_->Mutex_.Release();
+ // decrement ref count before TryCleanup
+ Lock_ = nullptr;
+ Owner_.TryCleanup(ResourceId_);
+ }
+
+ private:
+ TMultiResourceLock& Owner_;
+ TLock::TPtr Lock_;
+ TString ResourceId_;
+ };
+
+ TResourceLock Acquire(TString resourceId);
+
+ template <typename F>
+ auto RunWithLock(TString resourceId, const F& f) -> decltype(f()) {
+ auto lock = Acquire(std::move(resourceId));
+ return f();
+ }
+
+ ~TMultiResourceLock();
+
+private:
+ TLock::TPtr ProvideResourceLock(const TString& resourceId);
+ void TryCleanup(const TString& resourceId);
+
+private:
+ TMutex Guard_;
+ TMap<TString, TLock::TPtr> Locks_;
+};
+
+}
diff --git a/yql/essentials/utils/multi_resource_lock_ut.cpp b/yql/essentials/utils/multi_resource_lock_ut.cpp
new file mode 100644
index 0000000000..0af9cea3ff
--- /dev/null
+++ b/yql/essentials/utils/multi_resource_lock_ut.cpp
@@ -0,0 +1,54 @@
+#include "multi_resource_lock.h"
+#include <util/generic/xrange.h>
+#include <library/cpp/threading/future/async.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NYql {
+using namespace NThreading;
+
+Y_UNIT_TEST_SUITE(TMultiResourceLock) {
+ Y_UNIT_TEST(ManyResources) {
+ TMultiResourceLock multiLock;
+ const int workersCount = 3;
+ TVector<TVector<int>> workersData;
+ workersData.resize(workersCount);
+
+ TAdaptiveThreadPool queue;
+ queue.Start(0);
+
+ TVector<NThreading::TFuture<void>> workers;
+ workers.reserve(workersCount);
+ TManualEvent startEvent;
+
+ for (int i = 0; i < workersCount; ++i) {
+ TString resourceId = ToString(i);
+ TVector<int>& data = workersData.at(i);
+ NThreading::TFuture<void> f = NThreading::Async([&, resourceId]() {
+ startEvent.Wait();
+
+ for (int j = 0; j < 1000; ++j) {
+ const auto& l = multiLock.Acquire(resourceId);
+ Y_UNUSED(l);
+ data.push_back(j);
+ }
+ }, queue);
+
+ workers.push_back(std::move(f));
+ }
+
+ startEvent.Signal();
+
+ NThreading::TFuture<void> all = NThreading::WaitExceptionOrAll(workers);
+ all.GetValueSync();
+ queue.Stop();
+
+ // analyze workersData:
+ auto range0_999 = xrange(0, 1000);
+ for (auto& w : workersData) {
+ UNIT_ASSERT_VALUES_EQUAL(w.size(), 1000);
+ UNIT_ASSERT(std::equal(range0_999.begin(), range0_999.end(), w.begin()));
+ }
+ }
+}
+
+}
diff --git a/yql/essentials/utils/network/bind_in_range.cpp b/yql/essentials/utils/network/bind_in_range.cpp
new file mode 100644
index 0000000000..88cf641ce3
--- /dev/null
+++ b/yql/essentials/utils/network/bind_in_range.cpp
@@ -0,0 +1,27 @@
+#include "bind_in_range.h"
+
+#include <yql/essentials/utils/log/log.h>
+#include <util/datetime/base.h>
+
+namespace NYql {
+
+TVector<NBus::TBindResult> BindInRange(TRangeWalker<int>& portWalker) {
+ const int cyclesLimit = 3;
+ const int rangeSize = portWalker.GetRangeSize();
+ const auto cycleDelay = TDuration::Seconds(3);
+
+ for (int cycle = 0; cycle < cyclesLimit; ++cycle) {
+ for (int i = 0; i < rangeSize; ++i) {
+ try {
+ return NBus::BindOnPort(portWalker.MoveToNext(), false).second;
+ } catch (const TSystemError&) {
+ YQL_LOG(DEBUG) << CurrentExceptionMessage();
+ }
+ }
+
+ Sleep(cycleDelay);
+ }
+
+ ythrow yexception() << "Unable to bind within port range [" << portWalker.GetStart() << ", " << portWalker.GetFinish() << "]";
+}
+}
diff --git a/yql/essentials/utils/network/bind_in_range.h b/yql/essentials/utils/network/bind_in_range.h
new file mode 100644
index 0000000000..5621529dd5
--- /dev/null
+++ b/yql/essentials/utils/network/bind_in_range.h
@@ -0,0 +1,8 @@
+#pragma once
+
+#include <yql/essentials/utils/range_walker.h>
+#include <library/cpp/messagebus/network.h>
+
+namespace NYql {
+TVector<NBus::TBindResult> BindInRange(TRangeWalker<int>& portWalker);
+}
diff --git a/yql/essentials/utils/network/ya.make b/yql/essentials/utils/network/ya.make
new file mode 100644
index 0000000000..282e1502b3
--- /dev/null
+++ b/yql/essentials/utils/network/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ bind_in_range.cpp
+ bind_in_range.h
+)
+
+PEERDIR(
+ library/cpp/messagebus
+)
+
+END()
diff --git a/yql/essentials/utils/parse_double.cpp b/yql/essentials/utils/parse_double.cpp
new file mode 100644
index 0000000000..90923160c5
--- /dev/null
+++ b/yql/essentials/utils/parse_double.cpp
@@ -0,0 +1,76 @@
+#include "parse_double.h"
+
+#include <util/string/ascii.h>
+#include <util/string/cast.h>
+
+namespace NYql {
+
+namespace {
+template <typename T>
+bool GenericTryFloatFromString(TStringBuf buf, T& value) {
+ value = 0;
+ if (!buf.size()) {
+ return false;
+ }
+
+ if (TryFromString(buf.data(), buf.size(), value)) {
+ return true;
+ }
+
+ const char* ptr = buf.data();
+ ui32 size = buf.size();
+ char sign = '+';
+ if (*ptr == '+' || *ptr == '-') {
+ sign = *ptr;
+ ++ptr;
+ --size;
+ }
+
+ if (size != 3) {
+ return false;
+ }
+
+ // NaN or Inf (ignoring case)
+ if (AsciiToUpper(ptr[0]) == 'N' && AsciiToUpper(ptr[1]) == 'A' && AsciiToUpper(ptr[2]) == 'N') {
+ value = std::numeric_limits<T>::quiet_NaN();
+ } else if (AsciiToUpper(ptr[0]) == 'I' && AsciiToUpper(ptr[1]) == 'N' && AsciiToUpper(ptr[2]) == 'F') {
+ value = std::numeric_limits<T>::infinity();
+ } else {
+ return false;
+ }
+
+ if (sign == '-') {
+ value = -value;
+ }
+
+ return true;
+}
+}
+
+float FloatFromString(TStringBuf buf) {
+ float result = 0;
+ if (!TryFloatFromString(buf, result)) {
+ throw yexception() << "unable to parse float from '" << buf << "'";
+ }
+
+ return result;
+}
+
+double DoubleFromString(TStringBuf buf) {
+ double result = 0;
+ if (!TryDoubleFromString(buf, result)) {
+ throw yexception() << "unable to parse double from '" << buf << "'";
+ }
+
+ return result;
+}
+
+bool TryFloatFromString(TStringBuf buf, float& value) {
+ return GenericTryFloatFromString(buf, value);
+}
+
+bool TryDoubleFromString(TStringBuf buf, double& value) {
+ return GenericTryFloatFromString(buf, value);
+}
+
+}
diff --git a/yql/essentials/utils/parse_double.h b/yql/essentials/utils/parse_double.h
new file mode 100644
index 0000000000..61d1d940c9
--- /dev/null
+++ b/yql/essentials/utils/parse_double.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <util/generic/strbuf.h>
+
+namespace NYql {
+
+/*
+These parse functions can understand nan, inf, -inf case-insensitively
+They do not parse empty string to zero
+*/
+
+float FloatFromString(TStringBuf buf);
+double DoubleFromString(TStringBuf buf);
+
+bool TryFloatFromString(TStringBuf buf, float& value);
+bool TryDoubleFromString(TStringBuf buf, double& value);
+
+}
diff --git a/yql/essentials/utils/parse_double_ut.cpp b/yql/essentials/utils/parse_double_ut.cpp
new file mode 100644
index 0000000000..4aecf64f88
--- /dev/null
+++ b/yql/essentials/utils/parse_double_ut.cpp
@@ -0,0 +1,55 @@
+#include "parse_double.h"
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NYql {
+
+Y_UNIT_TEST_SUITE(TParseDouble) {
+
+ template <typename T, typename F>
+ void ParseAndCheck(TStringBuf buf, F f, T expected) {
+ T result = 0;
+ UNIT_ASSERT(f(buf, result));
+ UNIT_ASSERT_DOUBLES_EQUAL(expected, result, 1e-6);
+ }
+
+ Y_UNIT_TEST(ExactValues) {
+ ParseAndCheck(TStringBuf("nan"), TryFloatFromString, std::numeric_limits<float>::quiet_NaN());
+ ParseAndCheck(TStringBuf("nAn"), TryDoubleFromString, std::numeric_limits<double>::quiet_NaN());
+
+ ParseAndCheck(TStringBuf("+nan"), TryFloatFromString, std::numeric_limits<float>::quiet_NaN());
+ ParseAndCheck(TStringBuf("+NAN"), TryDoubleFromString, std::numeric_limits<double>::quiet_NaN());
+
+ ParseAndCheck(TStringBuf("-nan"), TryFloatFromString, std::numeric_limits<float>::quiet_NaN());
+ ParseAndCheck(TStringBuf("-NaN"), TryDoubleFromString, std::numeric_limits<double>::quiet_NaN());
+
+ ParseAndCheck(TStringBuf("inf"), TryFloatFromString, std::numeric_limits<float>::infinity());
+ ParseAndCheck(TStringBuf("iNf"), TryDoubleFromString, std::numeric_limits<double>::infinity());
+
+ ParseAndCheck(TStringBuf("+inf"), TryFloatFromString, std::numeric_limits<float>::infinity());
+ ParseAndCheck(TStringBuf("+INF"), TryDoubleFromString, std::numeric_limits<double>::infinity());
+
+ ParseAndCheck(TStringBuf("-inf"), TryFloatFromString, -std::numeric_limits<float>::infinity());
+ ParseAndCheck(TStringBuf("-InF"), TryDoubleFromString, -std::numeric_limits<double>::infinity());
+
+ ParseAndCheck<float>(TStringBuf("-12.3456"), TryFloatFromString, -12.3456);
+ ParseAndCheck(TStringBuf("-12.3456"), TryDoubleFromString, -12.3456);
+
+ ParseAndCheck<float>(TStringBuf("1.23e-2"), TryFloatFromString, 0.0123);
+ ParseAndCheck(TStringBuf("1.23e-2"), TryDoubleFromString, 0.0123);
+
+ UNIT_ASSERT_EQUAL(FloatFromString(TStringBuf("iNf")), std::numeric_limits<float>::infinity());
+ UNIT_ASSERT_EQUAL(DoubleFromString(TStringBuf("iNf")), std::numeric_limits<float>::infinity());
+ }
+
+ Y_UNIT_TEST(Errors) {
+ UNIT_ASSERT_EXCEPTION_CONTAINS(FloatFromString(TStringBuf("")), std::exception, "unable to parse float from ''");
+ UNIT_ASSERT_EXCEPTION_CONTAINS(DoubleFromString(TStringBuf("")), std::exception, "unable to parse double from ''");
+
+ UNIT_ASSERT_EXCEPTION_CONTAINS(FloatFromString(TStringBuf("info")), std::exception, "unable to parse float from 'info'");
+ UNIT_ASSERT_EXCEPTION_CONTAINS(DoubleFromString(TStringBuf("-nana")), std::exception, "unable to parse double from '-nana'");
+
+ UNIT_ASSERT_EXCEPTION_CONTAINS(FloatFromString(TStringBuf(nullptr)), std::exception, "unable to parse float from ''");
+ UNIT_ASSERT_EXCEPTION_CONTAINS(DoubleFromString(TStringBuf(nullptr)), std::exception, "unable to parse double from ''");
+ }
+}
+}
diff --git a/yql/essentials/utils/prefetch.h b/yql/essentials/utils/prefetch.h
new file mode 100644
index 0000000000..50eea7df5c
--- /dev/null
+++ b/yql/essentials/utils/prefetch.h
@@ -0,0 +1,13 @@
+#pragma once
+
+namespace NYql {
+
+inline void PrefetchForRead(const void* ptr) {
+ __builtin_prefetch(ptr, 0, 3);
+}
+
+inline void PrefetchForWrite(void* ptr) {
+ __builtin_prefetch(ptr, 1, 3);
+}
+
+} // namespace NYql
diff --git a/yql/essentials/utils/proc_alive.cpp b/yql/essentials/utils/proc_alive.cpp
new file mode 100644
index 0000000000..7efb2584bc
--- /dev/null
+++ b/yql/essentials/utils/proc_alive.cpp
@@ -0,0 +1,37 @@
+#include "proc_alive.h"
+
+#include <util/system/platform.h>
+#include <util/system/compat.h>
+#include <util/system/error.h>
+#include <util/system/winint.h>
+
+#include <errno.h>
+
+
+namespace NYql {
+
+bool IsProcessAlive(TProcessId pid) {
+#ifdef _unix_
+ // If sending a null signal fails with the error ESRCH, then we know
+ // the process doesn’t exist. If the call fails with the error
+ // EPERM - the process exists, but we don’t have permission to send
+ // a signal to it.
+ kill(pid, 0);
+ return errno != ESRCH;
+#elif defined(_win_)
+ HANDLE process = OpenProcess(SYNCHRONIZE, FALSE, pid);
+ if (process == NULL) {
+ return false;
+ }
+
+ DWORD ret = WaitForSingleObject(process, 0);
+ CloseHandle(process);
+ return ret == WAIT_TIMEOUT;
+#else
+ Y_UNUSED(pid);
+ return false;
+#endif
+}
+
+} // NYql
+
diff --git a/yql/essentials/utils/proc_alive.h b/yql/essentials/utils/proc_alive.h
new file mode 100644
index 0000000000..c4b798b4ca
--- /dev/null
+++ b/yql/essentials/utils/proc_alive.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <util/system/getpid.h>
+
+namespace NYql {
+
+bool IsProcessAlive(TProcessId pid);
+
+}
diff --git a/yql/essentials/utils/rand_guid.cpp b/yql/essentials/utils/rand_guid.cpp
new file mode 100644
index 0000000000..d89eefbd3b
--- /dev/null
+++ b/yql/essentials/utils/rand_guid.cpp
@@ -0,0 +1,32 @@
+#include "rand_guid.h"
+
+#include <util/system/datetime.h>
+#include <util/system/getpid.h>
+#include <util/system/unaligned_mem.h>
+#include <util/generic/guid.h>
+
+namespace NYql {
+
+TAtomic TRandGuid::Counter = 0;
+
+TRandGuid::TRandGuid() {
+ ResetSeed();
+}
+
+void TRandGuid::ResetSeed() {
+ new (&Rnd_) TMersenne<ui64>(GetCycleCount() + MicroSeconds() + GetPID());
+}
+
+TString TRandGuid::GenGuid() {
+ TGUID ret = {};
+ WriteUnaligned<ui64>(ret.dw, GetRnd().GenRand());
+ ret.dw[2] = (ui32)GetRnd().GenRand();
+ ret.dw[3] = AtomicIncrement(Counter);
+
+ return GetGuidAsString(ret);
+}
+
+ui64 TRandGuid::GenNumber() {
+ return GetRnd().GenRand();
+}
+}
diff --git a/yql/essentials/utils/rand_guid.h b/yql/essentials/utils/rand_guid.h
new file mode 100644
index 0000000000..30496bdd5f
--- /dev/null
+++ b/yql/essentials/utils/rand_guid.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <library/cpp/deprecated/atomic/atomic.h>
+#include <util/random/mersenne.h>
+
+#include <type_traits>
+
+namespace NYql {
+class TRandGuid {
+public:
+ TRandGuid();
+ TRandGuid(TRandGuid&&) = default;
+ TRandGuid& operator=(TRandGuid&&) = default;
+
+ void ResetSeed();
+
+ TString GenGuid();
+ ui64 GenNumber();
+
+private:
+ TMersenne<ui64>& GetRnd() {
+ return reinterpret_cast<TMersenne<ui64>&>(Rnd_);
+ }
+
+private:
+ std::aligned_storage<sizeof(TMersenne<ui64>) ,alignof(TMersenne<ui64>)>::type Rnd_;
+
+ static TAtomic Counter;
+};
+}
diff --git a/yql/essentials/utils/range_walker.h b/yql/essentials/utils/range_walker.h
new file mode 100644
index 0000000000..268fe36472
--- /dev/null
+++ b/yql/essentials/utils/range_walker.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include <util/generic/yexception.h>
+
+namespace NYql {
+
+template <typename T>
+class TRangeWalker {
+private:
+ const T Start_;
+ const T Finish_;
+ T Current_;
+
+public:
+ TRangeWalker(T start, T finish)
+ : Start_(start)
+ , Finish_(finish)
+ , Current_(start)
+ {
+ if (Start_ > Finish_) {
+ ythrow yexception() << "Invalid range for walker";
+ }
+ }
+
+ T GetStart() const {
+ return Start_;
+ }
+
+ T GetFinish() const {
+ return Finish_;
+ }
+
+ T GetRangeSize() const {
+ return Finish_ - Start_ + 1;
+ }
+
+ T MoveToNext() {
+ T result = Current_++;
+
+ if (Current_ > Finish_) {
+ Current_ = Start_;
+ }
+
+ return result;
+ }
+};
+}
diff --git a/yql/essentials/utils/range_walker_ut.cpp b/yql/essentials/utils/range_walker_ut.cpp
new file mode 100644
index 0000000000..d6a86cc804
--- /dev/null
+++ b/yql/essentials/utils/range_walker_ut.cpp
@@ -0,0 +1,35 @@
+#include "range_walker.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NYql;
+
+Y_UNIT_TEST_SUITE(TRangeWalkerTests) {
+ Y_UNIT_TEST(InvalidRange) {
+ UNIT_ASSERT_EXCEPTION_CONTAINS(TRangeWalker<int>(2, 1), yexception, "Invalid range for walker");
+ }
+
+ Y_UNIT_TEST(SingleValueRange) {
+ TRangeWalker<int> w(5, 5);
+ UNIT_ASSERT_EQUAL(5, w.GetStart());
+ UNIT_ASSERT_EQUAL(5, w.GetFinish());
+ UNIT_ASSERT_EQUAL(1, w.GetRangeSize());
+
+ for (int i = 0; i < 10; ++i) {
+ UNIT_ASSERT_EQUAL(5, w.MoveToNext());
+ }
+ }
+
+ Y_UNIT_TEST(ManyValuesRange) {
+ TRangeWalker<int> w(5, 7);
+ UNIT_ASSERT_EQUAL(5, w.GetStart());
+ UNIT_ASSERT_EQUAL(7, w.GetFinish());
+ UNIT_ASSERT_EQUAL(3, w.GetRangeSize());
+
+ for (int i = 0; i < 10; ++i) {
+ UNIT_ASSERT_EQUAL(5, w.MoveToNext());
+ UNIT_ASSERT_EQUAL(6, w.MoveToNext());
+ UNIT_ASSERT_EQUAL(7, w.MoveToNext());
+ }
+ }
+}
diff --git a/yql/essentials/utils/resetable_setting.h b/yql/essentials/utils/resetable_setting.h
new file mode 100644
index 0000000000..0112105a3b
--- /dev/null
+++ b/yql/essentials/utils/resetable_setting.h
@@ -0,0 +1,67 @@
+#pragma once
+
+#include "yql_panic.h"
+
+#include <util/generic/maybe.h>
+#include <util/generic/variant.h>
+
+namespace NYql {
+
+template <typename S, typename R>
+class TResetableSettingBase {
+protected:
+ using TSet = S;
+ using TReset = R;
+
+public:
+ void Set(const TSet& value) {
+ Value.ConstructInPlace(value);
+ }
+
+ void Reset(const TReset& value) {
+ Value.ConstructInPlace(value);
+ }
+
+ bool Defined() const {
+ return Value.Defined();
+ }
+
+ explicit operator bool() const {
+ return Defined();
+ }
+
+ bool IsSet() const {
+ YQL_ENSURE(Defined());
+ return Value->index() == 0;
+ }
+
+ const TSet& GetValueSet() const {
+ YQL_ENSURE(IsSet());
+ return std::get<TSet>(*Value);
+ }
+
+ const TReset& GetValueReset() const {
+ YQL_ENSURE(!IsSet());
+ return std::get<TReset>(*Value);
+ }
+
+private:
+ TMaybe<std::variant<TSet, TReset>> Value;
+};
+
+template <typename S, typename R>
+class TResetableSetting: public TResetableSettingBase<S, R> {
+};
+
+template <typename S>
+class TResetableSetting<S, void>: public TResetableSettingBase<S, TNothing> {
+private:
+ const TNothing& GetValueReset() const;
+
+public:
+ void Reset() {
+ TResetableSettingBase<S, TNothing>::Reset(Nothing());
+ }
+};
+
+}
diff --git a/yql/essentials/utils/retry.cpp b/yql/essentials/utils/retry.cpp
new file mode 100644
index 0000000000..ac40d79b12
--- /dev/null
+++ b/yql/essentials/utils/retry.cpp
@@ -0,0 +1 @@
+#include "retry.h"
diff --git a/yql/essentials/utils/retry.h b/yql/essentials/utils/retry.h
new file mode 100644
index 0000000000..aa8391fa48
--- /dev/null
+++ b/yql/essentials/utils/retry.h
@@ -0,0 +1,17 @@
+#pragma once
+
+namespace NYql {
+
+template <typename TRetriableException, typename TAction, typename TExceptionHandler>
+auto WithRetry(int attempts, TAction&& a, TExceptionHandler&& exceptionHandler) {
+ for (int i = 1; i < attempts; ++i) {
+ try {
+ return a();
+ } catch (const TRetriableException& e) {
+ exceptionHandler(e, i, attempts);
+ }
+ }
+
+ return a();
+}
+}
diff --git a/yql/essentials/utils/retry_ut.cpp b/yql/essentials/utils/retry_ut.cpp
new file mode 100644
index 0000000000..47cb35fd73
--- /dev/null
+++ b/yql/essentials/utils/retry_ut.cpp
@@ -0,0 +1,73 @@
+#include "retry.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NYql;
+
+namespace {
+
+class TMyError : public yexception {
+};
+
+}
+
+Y_UNIT_TEST_SUITE(TRetryTests) {
+ Y_UNIT_TEST(ZeroAttempts) {
+ auto r = WithRetry<TMyError>(0,
+ []() { return TString("abc"); },
+ [](auto, auto, auto) { UNIT_FAIL("Exception handler invoked"); });
+
+ UNIT_ASSERT_VALUES_EQUAL("abc", r);
+ }
+
+ Y_UNIT_TEST(NoRetries) {
+ auto r = WithRetry<TMyError>(5,
+ []() { return TString("abc"); },
+ [](auto, auto, auto) { UNIT_FAIL("Exception handler invoked"); });
+
+ UNIT_ASSERT_VALUES_EQUAL("abc", r);
+ }
+
+ Y_UNIT_TEST(NoRetriesButException) {
+ UNIT_ASSERT_EXCEPTION_CONTAINS(WithRetry<TMyError>(5,
+ []() { throw yexception() << "xxxx"; },
+ [](auto, auto, auto) { UNIT_FAIL("Exception handler invoked"); }), yexception, "xxxx");
+ }
+
+ Y_UNIT_TEST(FewRetries) {
+ int counter = 0;
+ int exceptions = 0;
+ auto r = WithRetry<TMyError>(3, [&]() {
+ if (counter++ < 2) {
+ throw TMyError() << "yyyy";
+ }
+
+ return counter;
+ }, [&](const auto& e, int attempt, int attemptCount) {
+ ++exceptions;
+ UNIT_ASSERT_VALUES_EQUAL(e.what(), "yyyy");
+ UNIT_ASSERT_VALUES_EQUAL(attempt, counter);
+ UNIT_ASSERT_VALUES_EQUAL(attemptCount, 3);
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(2, exceptions);
+ UNIT_ASSERT_VALUES_EQUAL(3, r);
+ UNIT_ASSERT_VALUES_EQUAL(3, counter);
+ }
+
+ Y_UNIT_TEST(ManyRetries) {
+ int counter = 0;
+ int exceptions = 0;
+ UNIT_ASSERT_EXCEPTION_CONTAINS(WithRetry<TMyError>(3, [&]() {
+ throw TMyError() << "yyyy" << counter++;
+ }, [&](const auto& e, int attempt, int attemptCount) {
+ ++exceptions;
+ UNIT_ASSERT_STRING_CONTAINS(e.what(), "yyyy");
+ UNIT_ASSERT_VALUES_EQUAL(attempt, counter);
+ UNIT_ASSERT_VALUES_EQUAL(attemptCount, 3);
+ }), TMyError, "yyyy2");
+
+ UNIT_ASSERT_VALUES_EQUAL(2, exceptions);
+ UNIT_ASSERT_VALUES_EQUAL(3, counter);
+ }
+}
diff --git a/yql/essentials/utils/rope/rope_over_buffer.cpp b/yql/essentials/utils/rope/rope_over_buffer.cpp
new file mode 100644
index 0000000000..e8074372fd
--- /dev/null
+++ b/yql/essentials/utils/rope/rope_over_buffer.cpp
@@ -0,0 +1,43 @@
+#include "rope_over_buffer.h"
+
+#include <yql/essentials/utils/yql_panic.h>
+
+namespace NYql {
+
+namespace {
+
+class TContigousChunkOverBuf : public IContiguousChunk {
+public:
+ TContigousChunkOverBuf(const std::shared_ptr<const void>& owner, TContiguousSpan span)
+ : Owner_(owner)
+ , Span_(span)
+ {
+ }
+private:
+ TContiguousSpan GetData() const override {
+ return Span_;
+ }
+
+ TMutableContiguousSpan GetDataMut() override {
+ YQL_ENSURE(false, "Payload mutation is not supported");
+ }
+
+ size_t GetOccupiedMemorySize() const override {
+ return Span_.GetSize();
+ }
+
+ const std::shared_ptr<const void> Owner_;
+ const TContiguousSpan Span_;
+};
+
+} // namespace
+
+
+TRope MakeReadOnlyRope(const std::shared_ptr<const void>& owner, const char* data, size_t size) {
+ if (!size) {
+ return TRope();
+ }
+ return TRope(new TContigousChunkOverBuf(owner, {data, size}));
+}
+
+} // namespace NYql
diff --git a/yql/essentials/utils/rope/rope_over_buffer.h b/yql/essentials/utils/rope/rope_over_buffer.h
new file mode 100644
index 0000000000..9d18e52263
--- /dev/null
+++ b/yql/essentials/utils/rope/rope_over_buffer.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include <contrib/ydb/library/actors/util/rope.h>
+
+#include <memory>
+
+namespace NYql {
+
+TRope MakeReadOnlyRope(const std::shared_ptr<const void>& owner, const char* data, size_t size);
+
+} \ No newline at end of file
diff --git a/yql/essentials/utils/rope/ya.make b/yql/essentials/utils/rope/ya.make
new file mode 100644
index 0000000000..9b92a50c48
--- /dev/null
+++ b/yql/essentials/utils/rope/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ rope_over_buffer.cpp
+)
+
+PEERDIR(
+ contrib/ydb/library/actors/util
+ yql/essentials/utils
+)
+
+END()
+
diff --git a/yql/essentials/utils/signals/signals.cpp b/yql/essentials/utils/signals/signals.cpp
new file mode 100644
index 0000000000..9ebe667120
--- /dev/null
+++ b/yql/essentials/utils/signals/signals.cpp
@@ -0,0 +1,300 @@
+#include "signals.h"
+#include "utils.h"
+
+#include <yql/essentials/utils/log/log.h>
+#include <yql/essentials/utils/backtrace/backtrace.h>
+#include <contrib/ydb/library/yql/providers/yt/lib/log/yt_logger.h>
+
+#include <util/stream/output.h>
+#include <util/generic/yexception.h>
+#include <util/datetime/base.h>
+#include <util/network/socket.h>
+#include <util/system/getpid.h>
+
+#ifdef _linux_
+# include <sys/prctl.h>
+#endif
+
+#include <string.h>
+#include <signal.h>
+#include <errno.h>
+#include <stdlib.h>
+
+
+namespace NYql {
+
+volatile sig_atomic_t NeedTerminate = 0;
+volatile sig_atomic_t NeedQuit = 0;
+volatile sig_atomic_t NeedReconfigure = 0;
+volatile sig_atomic_t NeedReopenLog = 0;
+volatile sig_atomic_t NeedReapZombies = 0;
+volatile sig_atomic_t NeedInterrupt = 0;
+
+volatile sig_atomic_t CatchInterrupt = 0;
+
+TPipe SignalPipeW;
+TPipe SignalPipeR;
+
+namespace {
+
+void SignalHandler(int signo)
+{
+ switch (signo) {
+ case SIGTERM:
+ NeedTerminate = 1;
+ break;
+
+ case SIGQUIT:
+ NeedQuit = 1;
+ break;
+
+#ifdef _unix_
+ case SIGHUP:
+ NeedReconfigure = 1;
+ break;
+
+ case SIGUSR1:
+ NeedReopenLog = 1;
+ break;
+
+ case SIGCHLD:
+ NeedReapZombies = 1;
+ break;
+#endif
+
+ case SIGINT:
+ if (CatchInterrupt) {
+ NeedInterrupt = 1;
+ } else {
+ fprintf(stderr, "%s (pid=%d) captured SIGINT\n",
+ GetProcTitle(), getpid());
+ signal(signo, SIG_DFL);
+ raise(signo);
+ }
+ break;
+
+ default:
+ break;
+ }
+}
+
+void SignalHandlerWithSelfPipe(int signo)
+{
+ SignalHandler(signo);
+
+ int savedErrno = errno;
+ if (write(SignalPipeW.GetHandle(), "x", 1) == -1 && errno != EAGAIN) {
+ static TStringBuf msg("cannot write to signal pipe");
+#ifndef STDERR_FILENO
+#define STDERR_FILENO 2
+#endif
+ write(STDERR_FILENO, msg.data(), msg.size());
+ abort();
+ }
+ errno = savedErrno;
+}
+
+
+#ifndef _unix_
+const char* strsignal(int signo)
+{
+ switch (signo) {
+ case SIGTERM: return "SIGTERM";
+ case SIGINT: return "SIGINT";
+ case SIGQUIT: return "SIGQUIT";
+ default:
+ return "UNKNOWN";
+ }
+}
+#endif
+
+namespace {
+
+class TEmergencyLogOutput: public IOutputStream {
+public:
+ TEmergencyLogOutput()
+ : Current_(Buf_)
+ , End_(Y_ARRAY_END(Buf_))
+ {
+ }
+
+ ~TEmergencyLogOutput() {
+ }
+
+private:
+ inline size_t Avail() const noexcept {
+ return End_ - Current_;
+ }
+
+ void DoFlush() override {
+ if (Current_ != Buf_) {
+ NYql::NLog::YqlLogger().Write(TLOG_EMERG, Buf_, Current_ - Buf_);
+ Current_ = Buf_;
+ }
+ }
+
+ void DoWrite(const void* buf, size_t len) override {
+ len = Min(len, Avail());
+ if (len) {
+ char* end = Current_ + len;
+ memcpy(Current_, buf, len);
+ Current_ = end;
+ }
+ }
+
+private:
+ char Buf_[1 << 20];
+ char* Current_;
+ char* const End_;
+
+};
+
+TEmergencyLogOutput EMERGENCY_LOG_OUT;
+
+}
+
+void LogBacktraceOnSignal(int signum)
+{
+ if (NYql::NLog::IsYqlLoggerInitialized()) {
+ EMERGENCY_LOG_OUT << strsignal(signum) << TStringBuf(" (pid=") << GetPID() << TStringBuf("): ");
+ NYql::NBacktrace::KikimrBackTraceFormatImpl(&EMERGENCY_LOG_OUT);
+ EMERGENCY_LOG_OUT.Flush();
+ }
+ NYql::FlushYtDebugLog();
+ /* Now reraise the signal. We reactivate the signal’s default handling,
+ which is to terminate the process. We could just call exit or abort,
+ but reraising the signal sets the return status from the process
+ correctly. */
+ raise(signum);
+}
+
+
+#ifdef _unix_
+int SetSignalHandler(int signo, void (*handler)(int))
+{
+ struct sigaction sa;
+ sa.sa_flags = SA_RESTART;
+ sa.sa_handler = handler;
+ sigemptyset(&sa.sa_mask);
+
+ return sigaction(signo, &sa, nullptr);
+}
+
+#else
+int SetSignalHandler(int signo, void (*handler)(int))
+{
+ return (signal(signo, handler) == SIG_ERR) ? -1 : 0;
+}
+
+#endif
+
+struct TSignalHandlerDesc
+{
+ int signo;
+ void (*handler)(int);
+};
+
+void SetSignalHandlers(const TSignalHandlerDesc* handlerDescs)
+{
+ sigset_t interestedSignals;
+ SigEmptySet(&interestedSignals);
+
+ for (int i = 0; handlerDescs[i].signo != -1; i++) {
+ int signo = handlerDescs[i].signo;
+ SigAddSet(&interestedSignals, signo);
+
+ if (SetSignalHandler(signo, handlerDescs[i].handler) == -1) {
+ ythrow TSystemError() << "Cannot set handler for signal "
+ << strsignal(signo);
+ }
+ }
+
+ if (SigProcMask(SIG_BLOCK, &interestedSignals, NULL) == -1) {
+ ythrow TSystemError() << "Cannot set sigprocmask";
+ }
+
+ NYql::NBacktrace::AddAfterFatalCallback([](int signo){ LogBacktraceOnSignal(signo); });
+ NYql::NBacktrace::RegisterKikimrFatalActions();
+}
+
+} // namespace
+
+
+void InitSignals()
+{
+ TSignalHandlerDesc handlerDescs[] = {
+ { SIGTERM, SignalHandler },
+ { SIGINT, SignalHandler },
+ { SIGQUIT, SignalHandler },
+#ifdef _unix_
+ { SIGPIPE, SIG_IGN },
+ { SIGHUP, SignalHandler },
+ { SIGUSR1, SignalHandler },
+ { SIGCHLD, SignalHandler },
+#endif
+ { -1, nullptr }
+ };
+
+ SetSignalHandlers(handlerDescs);
+}
+
+void InitSignalsWithSelfPipe()
+{
+ TSignalHandlerDesc handlerDescs[] = {
+ { SIGTERM, SignalHandlerWithSelfPipe },
+ { SIGINT, SignalHandlerWithSelfPipe },
+ { SIGQUIT, SignalHandlerWithSelfPipe },
+#ifdef _unix_
+ { SIGPIPE, SIG_IGN },
+ { SIGHUP, SignalHandlerWithSelfPipe },
+ { SIGUSR1, SignalHandlerWithSelfPipe },
+ { SIGCHLD, SignalHandlerWithSelfPipe },
+#endif
+ { -1, nullptr }
+ };
+
+ TPipe::Pipe(SignalPipeR, SignalPipeW);
+ SetNonBlock(SignalPipeR.GetHandle());
+ SetNonBlock(SignalPipeW.GetHandle());
+
+ SetSignalHandlers(handlerDescs);
+}
+
+void CatchInterruptSignal(bool doCatch) {
+ CatchInterrupt = doCatch;
+}
+
+void SigSuspend(const sigset_t* mask)
+{
+#ifdef _unix_
+ sigsuspend(mask);
+#else
+ Y_UNUSED(mask);
+ Sleep(TDuration::Seconds(1));
+#endif
+}
+
+void AllowAnySignals()
+{
+ sigset_t blockMask;
+ SigEmptySet(&blockMask);
+
+ if (SigProcMask(SIG_SETMASK, &blockMask, NULL) == -1) {
+ ythrow TSystemError() << "Cannot set sigprocmask";
+ }
+}
+
+bool HasPendingQuitOrTerm() {
+#ifdef _unix_
+ sigset_t signals;
+ SigEmptySet(&signals);
+ if (sigpending(&signals)) {
+ ythrow TSystemError() << "Error in sigpending";
+ }
+
+ return (SigIsMember(&signals, SIGQUIT) == 1) || (SigIsMember(&signals, SIGTERM) == 1);
+#else
+ return false;
+#endif
+}
+} // namespace NYql
diff --git a/yql/essentials/utils/signals/signals.h b/yql/essentials/utils/signals/signals.h
new file mode 100644
index 0000000000..612f906207
--- /dev/null
+++ b/yql/essentials/utils/signals/signals.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <util/system/defaults.h>
+#include <util/system/sigset.h>
+#include <util/system/pipe.h>
+
+#include <signal.h>
+
+
+namespace NYql {
+
+#ifdef _win_
+using sig_atomic_t = int;
+#endif
+
+extern volatile sig_atomic_t NeedTerminate;
+extern volatile sig_atomic_t NeedQuit;
+extern volatile sig_atomic_t NeedReconfigure;
+extern volatile sig_atomic_t NeedReopenLog;
+extern volatile sig_atomic_t NeedReapZombies;
+extern volatile sig_atomic_t NeedInterrupt;
+
+extern TPipe SignalPipeW;
+extern TPipe SignalPipeR;
+
+void InitSignals();
+void InitSignalsWithSelfPipe();
+void CatchInterruptSignal(bool doCatch);
+
+void SigSuspend(const sigset_t* mask);
+void AllowAnySignals();
+bool HasPendingQuitOrTerm();
+
+} // namespace NYql
diff --git a/yql/essentials/utils/signals/utils.cpp b/yql/essentials/utils/signals/utils.cpp
new file mode 100644
index 0000000000..b1de131b30
--- /dev/null
+++ b/yql/essentials/utils/signals/utils.cpp
@@ -0,0 +1,122 @@
+#include "utils.h"
+
+#include <util/generic/yexception.h>
+#include <util/string/subst.h>
+
+#include <google/protobuf/text_format.h>
+
+#include <library/cpp/protobuf/json/proto2json.h>
+#include <library/cpp/json/yson/json2yson.h>
+
+#include <string.h>
+
+extern char** environ;
+
+namespace NYql {
+
+static char** g_OriginalArgv = nullptr;
+static char* g_OriginalArgvLast = nullptr;
+
+/*
+ * To change the process title in Linux and Darwin we have to set argv[1]
+ * to NULL and to copy the title to the same place where the argv[0] points to.
+ * However, argv[0] may be too small to hold a new title. Fortunately, Linux
+ * and Darwin store argv[] and environ[] one after another. So we should
+ * ensure that is the continuous memory and then we allocate the new memory
+ * for environ[] and copy it. After this we could use the memory starting
+ * from argv[0] for our process title.
+ *
+ * continuous memory block for process title
+ * ________________________________/\____________________________________
+ * / \
+ * +---------+---------+-----+------+------------+------------+-----+------+
+ * | argv[0] | argv[1] | ... | NULL | environ[0] | environ[1] | ... | NULL |
+ * +---------+---------+-----+------+------------+------------+-----+------+
+ * \_________________ _________________/
+ * \/
+ * must be relocated elsewhere
+ */
+void ProcTitleInit(int argc, const char* argv[])
+{
+ Y_UNUSED(argc);
+ Y_ABORT_UNLESS(!g_OriginalArgv, "ProcTitleInit() was already called");
+
+ g_OriginalArgv = const_cast<char**>(argv);
+
+ size_t size = 0;
+ for (int i = 0; environ[i]; i++) {
+ size += strlen(environ[i]) + 1;
+ }
+
+ char* newEnviron = new char[size];
+ g_OriginalArgvLast = g_OriginalArgv[0];
+
+ for (int i = 0; g_OriginalArgv[i]; i++) {
+ if (g_OriginalArgvLast == g_OriginalArgv[i]) {
+ g_OriginalArgvLast = g_OriginalArgv[i] + strlen(g_OriginalArgv[i]) + 1;
+ }
+ }
+
+ for (int i = 0; environ[i]; i++) {
+ if (g_OriginalArgvLast == environ[i]) {
+ size_t size = strlen(environ[i]) + 1;
+ g_OriginalArgvLast = environ[i] + size;
+
+ strncpy(newEnviron, environ[i], size);
+ environ[i] = newEnviron;
+ newEnviron += size;
+ }
+ }
+
+ g_OriginalArgvLast--;
+}
+
+void SetProcTitle(const char* title)
+{
+ if (!g_OriginalArgv) return;
+
+ char* p = g_OriginalArgv[0];
+ p += strlcpy(p, "yqlworker: ", g_OriginalArgvLast - p);
+ p += strlcpy(p, title, g_OriginalArgvLast - p);
+
+ if (g_OriginalArgvLast - p > 0) {
+ memset(p, 0, g_OriginalArgvLast - p);
+ }
+
+ g_OriginalArgv[1] = nullptr;
+}
+
+void AddProcTitleSuffix(const char* suffix)
+{
+ if (!g_OriginalArgv) return;
+
+ char* p = g_OriginalArgv[0];
+ p += strlcat(p, " ", g_OriginalArgvLast - p);
+ p += strlcat(p, suffix, g_OriginalArgvLast - p);
+}
+
+const char* GetProcTitle()
+{
+ return g_OriginalArgv ? g_OriginalArgv[0] : "UNKNOWN";
+}
+
+TString PbMessageToStr(const google::protobuf::Message& msg)
+{
+ TString str;
+ ::google::protobuf::TextFormat::Printer printer;
+ printer.SetSingleLineMode(true);
+ printer.PrintToString(msg, &str);
+ return str;
+}
+
+TString Proto2Yson(const google::protobuf::Message& proto) {
+ NJson::TJsonValue json;
+ NProtobufJson::Proto2Json(proto, json);
+
+ TString ysonResult;
+ TStringOutput stream(ysonResult);
+ NJson2Yson::SerializeJsonValueAsYson(json, &stream);
+ return ysonResult;
+}
+
+} // namespace NYql
diff --git a/yql/essentials/utils/signals/utils.h b/yql/essentials/utils/signals/utils.h
new file mode 100644
index 0000000000..75c55244fa
--- /dev/null
+++ b/yql/essentials/utils/signals/utils.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <util/generic/fwd.h>
+
+namespace google {
+namespace protobuf {
+ class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace NYql {
+
+void ProcTitleInit(int argc, const char* argv[]);
+void SetProcTitle(const char* title);
+void AddProcTitleSuffix(const char* suffix);
+const char* GetProcTitle();
+
+TString PbMessageToStr(const google::protobuf::Message& msg);
+TString Proto2Yson(const google::protobuf::Message& proto);
+
+} // namespace NYql
diff --git a/yql/essentials/utils/signals/ya.make b/yql/essentials/utils/signals/ya.make
new file mode 100644
index 0000000000..3c72783dbf
--- /dev/null
+++ b/yql/essentials/utils/signals/ya.make
@@ -0,0 +1,20 @@
+LIBRARY()
+
+SRCS(
+ signals.cpp
+ signals.h
+ utils.cpp
+ utils.h
+)
+
+PEERDIR(
+ contrib/libs/protobuf
+ library/cpp/logger/global
+ library/cpp/protobuf/json
+ library/cpp/json/yson
+ yql/essentials/utils/log
+ yql/essentials/utils/backtrace
+ contrib/ydb/library/yql/providers/yt/lib/log
+)
+
+END()
diff --git a/yql/essentials/utils/sort.cpp b/yql/essentials/utils/sort.cpp
new file mode 100644
index 0000000000..67992aff39
--- /dev/null
+++ b/yql/essentials/utils/sort.cpp
@@ -0,0 +1 @@
+#include "sort.h" \ No newline at end of file
diff --git a/yql/essentials/utils/sort.h b/yql/essentials/utils/sort.h
new file mode 100644
index 0000000000..4adb7c9225
--- /dev/null
+++ b/yql/essentials/utils/sort.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <contrib/libs/miniselect/include/miniselect/floyd_rivest_select.h>
+
+namespace NYql {
+
+template <class RandomIt>
+void FastNthElement(RandomIt first, RandomIt middle, RandomIt last) {
+ ::miniselect::floyd_rivest_select(first, middle, last);
+}
+
+template <class RandomIt, class Compare>
+void FastNthElement(RandomIt first, RandomIt middle, RandomIt last, Compare compare) {
+ ::miniselect::floyd_rivest_select(first, middle, last, compare);
+}
+
+template <class RandomIt>
+void FastPartialSort(RandomIt first, RandomIt middle, RandomIt last) {
+ ::miniselect::floyd_rivest_partial_sort(first, middle, last);
+}
+
+template <class RandomIt, class Compare>
+void FastPartialSort(RandomIt first, RandomIt middle, RandomIt last, Compare compare) {
+ ::miniselect::floyd_rivest_partial_sort(first, middle, last, compare);
+}
+
+}
diff --git a/yql/essentials/utils/swap_bytes.cpp b/yql/essentials/utils/swap_bytes.cpp
new file mode 100644
index 0000000000..c37aebcb6d
--- /dev/null
+++ b/yql/essentials/utils/swap_bytes.cpp
@@ -0,0 +1 @@
+#include "swap_bytes.h"
diff --git a/yql/essentials/utils/swap_bytes.h b/yql/essentials/utils/swap_bytes.h
new file mode 100644
index 0000000000..bf09bb321a
--- /dev/null
+++ b/yql/essentials/utils/swap_bytes.h
@@ -0,0 +1,20 @@
+#pragma once
+#include <util/system/compiler.h>
+#include <util/system/defaults.h>
+
+namespace NYql {
+
+// clang generates bswap for ui32 and ui64
+template <typename TUnsigned>
+Y_FORCE_INLINE
+TUnsigned SwapBytes(TUnsigned value) {
+ TUnsigned result;
+ auto* from = (ui8*)&value + sizeof(TUnsigned) - 1;
+ auto* to = (ui8*)&result;
+ for (size_t i = 0; i < sizeof(TUnsigned); ++i) {
+ *to++ = *from--;
+ }
+ return result;
+}
+
+}
diff --git a/yql/essentials/utils/sys/become_user.cpp b/yql/essentials/utils/sys/become_user.cpp
new file mode 100644
index 0000000000..bbb6b5735c
--- /dev/null
+++ b/yql/essentials/utils/sys/become_user.cpp
@@ -0,0 +1,188 @@
+#include "become_user.h"
+
+#ifdef _linux_
+#include <yql/essentials/utils/sys/linux_version.h>
+
+#include <util/generic/yexception.h>
+#include <util/system/user.h>
+
+#include <memory>
+#include <vector>
+#include <errno.h>
+
+#include <grp.h>
+#include <pwd.h>
+#include <unistd.h>
+
+#include <sys/prctl.h>
+#include <contrib/libs/libcap/include/sys/capability.h>
+#include <contrib/libs/libcap/include/sys/securebits.h>
+
+// strange, but sometimes we have to specify values manually
+#define PR_CAP_AMBIENT 47
+#define PR_CAP_AMBIENT_IS_SET 1
+#define PR_CAP_AMBIENT_RAISE 2
+#define PR_CAP_AMBIENT_LOWER 3
+#define PR_CAP_AMBIENT_CLEAR_ALL 4
+
+namespace NYql {
+
+namespace {
+
+void SetCapFlag(cap_t caps, cap_flag_t flag, cap_value_t value) {
+ if (cap_set_flag(caps, flag, 1, &value, CAP_SET) < 0) {
+ throw TSystemError() << "cap_set_flag() failed, flag = " << static_cast<int>(flag) << ", value = " << value;
+ }
+}
+
+void SetCapFlags(cap_t caps, cap_value_t value) {
+ SetCapFlag(caps, CAP_EFFECTIVE, value);
+ SetCapFlag(caps, CAP_PERMITTED, value);
+ SetCapFlag(caps, CAP_INHERITABLE, value);
+}
+
+void ClearAmbientCapFlags() {
+ // from man: PR_CAP_AMBIENT (since Linux 4.3)
+ if (IsLinuxKernelBelow4_3()) {
+ return;
+ }
+
+ if (prctl(PR_CAP_AMBIENT, PR_CAP_AMBIENT_CLEAR_ALL, 0, 0, 0) < 0) {
+ throw TSystemError() << "prctl(PR_CAP_AMBIENT, PR_CAP_AMBIENT_CLEAR_ALL, ....) failed";
+ }
+}
+
+void SetAmbientCapFlag(cap_value_t value) {
+ if (IsLinuxKernelBelow4_3()) {
+ return;
+ }
+
+ if (prctl(PR_CAP_AMBIENT, PR_CAP_AMBIENT_RAISE, value, 0, 0) < 0) {
+ throw TSystemError() << "prctl(PR_CAP_AMBIENT, PR_CAP_AMBIENT_RAISE, ....) failed, value = " << value;
+ }
+}
+
+void SetCapFlagsVector(const std::vector<cap_value_t>& flags) {
+ cap_t caps = cap_init();
+ std::unique_ptr<std::remove_reference_t<decltype(*caps)>, decltype(&cap_free)> capsHolder(caps, &cap_free);
+
+ if (!caps) {
+ throw TSystemError() << "cap_init() failed";
+ }
+
+ cap_clear(caps);
+
+ for (auto f : flags) {
+ SetCapFlags(caps, f);
+ }
+
+ if (cap_set_proc(caps) < 0) {
+ throw TSystemError() << "cap_set_proc() failed";
+ }
+
+ ClearAmbientCapFlags();
+ for (auto f : flags) {
+ SetAmbientCapFlag(f);
+ }
+}
+
+void EnsureCapFlagsVectorCannotBeRaised(const std::vector<cap_value_t>& flags) {
+ for (auto f : flags) {
+ try {
+ // one-by-one
+ SetCapFlagsVector({ f });
+ } catch (const TSystemError&) {
+ continue;
+ }
+
+ throw yexception() << "Cap flag " << f << " raised unexpectedly";
+ }
+}
+
+void DoBecomeUser(const char* username, const char* groupname) {
+ errno = 0;
+ passwd* pw = getpwnam(username);
+ if (pw == nullptr) {
+ if (errno == 0) {
+ ythrow yexception() << "unknown user: " << username;
+ } else {
+ ythrow TSystemError() << "can't get user info";
+ }
+ }
+
+ if (groupname == nullptr || strlen(groupname) == 0) {
+ groupname = username;
+ }
+
+ errno = 0;
+ group* gr = getgrnam(groupname);
+ if (gr == nullptr) {
+ if (errno == 0) {
+ ythrow yexception() << "unknown group: " << groupname;
+ } else {
+ ythrow TSystemError() << "can't get group info";
+ }
+ }
+
+ if (setgid(gr->gr_gid) == -1) {
+ ythrow TSystemError() << "can't change process group";
+ }
+
+ if (initgroups(username, gr->gr_gid) == -1) {
+ ythrow TSystemError() << "can't initgroups";
+ }
+
+ if (setuid(pw->pw_uid) == -1) {
+ ythrow TSystemError() << "can't change process user";
+ }
+
+ if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) {
+ ythrow TSystemError() << "can't set dumpable flag for a process";
+ }
+}
+
+}
+
+void BecomeUser(const TString& username, const TString& groupname) {
+ DoBecomeUser(username.data(), groupname.data());
+}
+
+void TurnOnBecomeUserAmbientCaps() {
+ SetCapFlagsVector({ CAP_SETUID, CAP_SETGID, CAP_SETPCAP, CAP_KILL });
+ if (prctl(PR_SET_SECUREBITS, SECBIT_NO_SETUID_FIXUP | SECBIT_NO_SETUID_FIXUP_LOCKED, 0, 0, 0) == -1) {
+ ythrow TSystemError() << "can't set secure bits for a process";
+ }
+}
+
+void TurnOffBecomeUserAbility() {
+ ClearAmbientCapFlags();
+ SetCapFlagsVector({});
+ EnsureCapFlagsVectorCannotBeRaised({ CAP_SETUID, CAP_SETGID, CAP_SETPCAP, CAP_KILL });
+
+ // ensure we cannot get root access back
+ if (setuid(0) != -1) {
+ ythrow TSystemError() << "unexpected switch to root in TurnOffBecomeUserAbility";
+ }
+}
+
+void DumpCaps(const TString& title) {
+ cap_t caps = cap_get_proc();
+ std::unique_ptr<std::remove_reference_t<decltype(*caps)>, decltype(&cap_free)> capsHolder(caps, &cap_free);
+
+ ssize_t size;
+ char* capsText = cap_to_text(caps, &size);
+ Cerr << title << ": current user: " << GetUsername() << ", proc caps: " << capsText << Endl;
+
+ cap_free(capsText);
+}
+
+void SendSignalOnParentThreadExit(int signo)
+{
+ if (::prctl(PR_SET_PDEATHSIG, signo) == -1) {
+ ythrow TSystemError() << "Cannot set signal " << strsignal(signo) << " for parent death using prctl";
+ }
+}
+
+}
+
+#endif
diff --git a/yql/essentials/utils/sys/become_user.h b/yql/essentials/utils/sys/become_user.h
new file mode 100644
index 0000000000..c5c2025d8b
--- /dev/null
+++ b/yql/essentials/utils/sys/become_user.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include <util/generic/string.h>
+
+namespace NYql {
+
+// works on Linux only
+
+// assume we have enough capabilities to do so: CAP_SETUID, CAP_SETGID
+void BecomeUser(const TString& username, const TString& groupname);
+
+// should be called by root (more specifically caps required: CAP_SETPCAP)
+// special ambient capabilities will be set up: CAP_SETUID, CAP_SETGID, CAP_KILL
+// they will be preserved by fork and exec*
+void TurnOnBecomeUserAmbientCaps();
+
+// forget ambient capabilities and ensure we cannot setuid to root
+void TurnOffBecomeUserAbility();
+
+// dump to stderr current secirity context incluing uid/guid/caps
+void DumpCaps(const TString& title);
+
+// subscribe child process on receiving signal on parent process death (particularly on parent thread exit)
+void SendSignalOnParentThreadExit(int signo);
+
+}
diff --git a/yql/essentials/utils/sys/become_user_dummy.cpp b/yql/essentials/utils/sys/become_user_dummy.cpp
new file mode 100644
index 0000000000..897d9c3977
--- /dev/null
+++ b/yql/essentials/utils/sys/become_user_dummy.cpp
@@ -0,0 +1,26 @@
+#include "become_user.h"
+#ifndef _linux_
+namespace NYql {
+
+void BecomeUser(const TString& username, const TString& groupname) {
+ Y_UNUSED(username);
+ Y_UNUSED(groupname);
+}
+
+void TurnOnBecomeUserAmbientCaps() {
+}
+
+void TurnOffBecomeUserAbility() {
+}
+
+void DumpCaps(const TString& title) {
+ Y_UNUSED(title);
+}
+
+void SendSignalOnParentThreadExit(int signo)
+{
+ Y_UNUSED(signo);
+}
+
+}
+#endif
diff --git a/yql/essentials/utils/sys/linux_version.cpp b/yql/essentials/utils/sys/linux_version.cpp
new file mode 100644
index 0000000000..5d10af8294
--- /dev/null
+++ b/yql/essentials/utils/sys/linux_version.cpp
@@ -0,0 +1,46 @@
+#include "linux_version.h"
+
+#include <util/generic/yexception.h>
+#include <util/system/platform.h>
+
+#ifdef _linux_
+# include <sys/utsname.h>
+#endif
+
+namespace NYql {
+ std::tuple<int, int, int> DetectLinuxKernelVersion3() {
+#ifdef _linux_
+ // see https://github.com/torvalds/linux/blob/master/Makefile
+ // version is composed as follows:
+ // VERSION = 4
+ // PATCHLEVEL = 18
+ // SUBLEVEL = 0
+ // EXTRAVERSION = -rc4
+ // KERNELVERSION = $(VERSION)$(if $(PATCHLEVEL),.$(PATCHLEVEL)$(if $(SUBLEVEL),.$(SUBLEVEL)))$(EXTRAVERSION)
+
+ utsname buf = {};
+ if (uname(&buf)) {
+ ythrow TSystemError() << "uname call failed";
+ }
+
+ int v = 0;
+ int p = 0;
+ int s = 0;
+ if (sscanf(buf.release, "%d.%d.%d", &v, &p, &s) != 3) {
+ ythrow yexception() << "Failed to parse linux kernel version " << buf.release;
+ }
+ return std::make_tuple(v, p, s);
+#else
+ return {};
+#endif
+ }
+
+ std::pair<int, int> DetectLinuxKernelVersion2() {
+ auto v = DetectLinuxKernelVersion3();
+ return std::make_pair(std::get<0>(v), std::get<1>(v));
+ }
+
+ bool IsLinuxKernelBelow4_3() {
+ return DetectLinuxKernelVersion2() < std::make_pair(4, 3);
+ }
+}
diff --git a/yql/essentials/utils/sys/linux_version.h b/yql/essentials/utils/sys/linux_version.h
new file mode 100644
index 0000000000..c8c32da125
--- /dev/null
+++ b/yql/essentials/utils/sys/linux_version.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <tuple>
+
+namespace NYql {
+ // returns version, patch level, sublevel, e.g. (4, 4, 114) for `uname -r` == "4.4.114-50"
+ std::tuple<int, int, int> DetectLinuxKernelVersion3();
+
+ // returns version, patch level
+ std::pair<int, int> DetectLinuxKernelVersion2();
+
+ bool IsLinuxKernelBelow4_3();
+}
diff --git a/yql/essentials/utils/sys/ya.make b/yql/essentials/utils/sys/ya.make
new file mode 100644
index 0000000000..698aeb8ba8
--- /dev/null
+++ b/yql/essentials/utils/sys/ya.make
@@ -0,0 +1,20 @@
+LIBRARY()
+
+SRCS(
+ become_user.h
+ become_user_dummy.cpp
+ linux_version.cpp
+ linux_version.h
+)
+
+IF (OS_LINUX)
+ PEERDIR(
+ contrib/libs/libcap
+ )
+
+ SRCS(
+ become_user.cpp
+ )
+ENDIF()
+
+END()
diff --git a/yql/essentials/utils/test_http_server/test_http_server.cpp b/yql/essentials/utils/test_http_server/test_http_server.cpp
new file mode 100644
index 0000000000..5d55230c41
--- /dev/null
+++ b/yql/essentials/utils/test_http_server/test_http_server.cpp
@@ -0,0 +1,151 @@
+#include "test_http_server.h"
+
+#include <library/cpp/http/misc/httpcodes.h>
+#include <library/cpp/http/server/http_ex.h>
+
+#include <util/generic/yexception.h>
+
+namespace NYql {
+
+class TTestHttpServer::TImpl : public THttpServer::ICallBack {
+ class TRequestProcessor : public THttpClientRequestEx {
+ public:
+ explicit TRequestProcessor(TImpl* parent)
+ : Parent_(parent)
+ {
+ Y_UNUSED(Parent_);
+ }
+
+ bool Reply(void* /*tsr*/) override {
+ if (!ProcessHeaders()) {
+ return true;
+ }
+
+ if (!RequestString.StartsWith("GET ")) {
+ return true;
+ }
+
+ TRequest r;
+ for (auto& p : ParsedHeaders) {
+ if (p.first == "Authorization" && p.second.StartsWith("OAuth ")) {
+ r.OAuthToken = p.second.substr(strlen("OAuth "));
+ continue;
+ }
+
+ if (p.first == "If-None-Match") {
+ r.IfNoneMatch = p.second;
+ continue;
+ }
+
+ if (p.first == "If-Modified-Since") {
+ r.IfModifiedSince = p.second;
+ continue;
+ }
+ }
+
+ auto reply = Parent_->ProcessNextRequest(r);
+
+ switch (reply.Code) {
+ case HTTP_OK:
+ Output() << "HTTP/1.1 200 Ok\r\n";
+ break;
+
+ case HTTP_NOT_MODIFIED:
+ Output() << "HTTP/1.1 304 Not modified\r\n";
+ break;
+
+ case HTTP_FORBIDDEN:
+ Output() << "HTTP/1.1 403 Forbidden\r\n";
+ break;
+
+ default:
+ return true;
+ }
+
+ if (reply.ETag) {
+ Output() << "ETag: " + reply.ETag + "\r\n";
+ }
+
+ if (reply.LastModified) {
+ Output() << "Last-Modified: " + reply.LastModified + "\r\n";
+ }
+
+ if (reply.Content || reply.ContentLength) {
+ const int length = reply.ContentLength.GetOrElse(reply.Content.length());
+ Output() << "Content-Length: " << length << "\r\n";
+ }
+
+ Output() << "\r\n";
+ if (reply.Content) {
+ Output() << reply.Content;
+ }
+
+ Output().Finish();
+
+ return true;
+ }
+
+ private:
+ TImpl* Parent_ = nullptr;
+ };
+
+public:
+ explicit TImpl(int port)
+ : HttpServer_(this, THttpServer::TOptions(port))
+ , Port_(port)
+ {
+
+ }
+
+ TClientRequest* CreateClient() override {
+ return new TRequestProcessor(this);
+ }
+
+ void Start() {
+ Y_ENSURE(HttpServer_.Start());
+ }
+
+ void Stop() {
+ HttpServer_.Stop();
+ }
+
+ TString GetUrl() const {
+ return "http://localhost:" + ToString(Port_);
+ }
+
+ void SetRequestHandler(TRequestHandler handler) {
+ RequestHandler_ = std::move(handler);
+ }
+
+private:
+ TReply ProcessNextRequest(const TRequest& request) {
+ return RequestHandler_(request);
+ }
+
+private:
+ THttpServer HttpServer_;
+ const int Port_;
+ TRequestHandler RequestHandler_;
+};
+
+TTestHttpServer::TTestHttpServer(int port)
+ : Impl_(new TImpl(port)) {
+}
+
+TTestHttpServer::~TTestHttpServer() {
+ Impl_->Stop();
+}
+
+void TTestHttpServer::Start() {
+ Impl_->Start();
+}
+
+TString TTestHttpServer::GetUrl() const {
+ return Impl_->GetUrl();
+}
+
+void TTestHttpServer::SetRequestHandler(TRequestHandler handler) {
+ return Impl_->SetRequestHandler(std::move(handler));
+}
+
+}
diff --git a/yql/essentials/utils/test_http_server/test_http_server.h b/yql/essentials/utils/test_http_server/test_http_server.h
new file mode 100644
index 0000000000..385cfaf970
--- /dev/null
+++ b/yql/essentials/utils/test_http_server/test_http_server.h
@@ -0,0 +1,77 @@
+#pragma once
+
+#include <library/cpp/http/misc/httpcodes.h>
+
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+
+#include <functional>
+
+namespace NYql {
+
+class TTestHttpServer {
+public:
+ struct TReply {
+ int Code = 0;
+ TString ETag;
+ TString LastModified;
+ TString Content;
+ TMaybe<int> ContentLength;
+
+ static TReply Ok(const TString& content, TMaybe<int> contentLength = {}) {
+ TReply r;
+ r.Code = HTTP_OK;
+ r.Content = content;
+ r.ContentLength = contentLength;
+ return r;
+ }
+
+ static TReply OkETag(const TString& content, const TString& etag, TMaybe<int> contentLength = {}) {
+ TReply r = Ok(content, contentLength);
+ r.ETag = etag;
+ return r;
+ }
+
+ static TReply OkLastModified(const TString& content, const TString& lastModified, TMaybe<int> contentLength = {}) {
+ TReply r = Ok(content, contentLength);
+ r.LastModified = lastModified;
+ return r;
+ }
+
+ static TReply NotModified(const TString& etag = {}, const TString& lastModified = {}) {
+ TReply r;
+ r.Code = HTTP_NOT_MODIFIED;
+ r.ETag = etag;
+ r.LastModified = lastModified;
+ return r;
+ }
+
+ static TReply Forbidden() {
+ TReply r;
+ r.Code = HTTP_FORBIDDEN;
+ return r;
+ }
+ };
+
+ struct TRequest {
+ TString OAuthToken;
+ TString IfNoneMatch;
+ TString IfModifiedSince;
+ };
+
+ typedef std::function<TReply(const TRequest& request)> TRequestHandler;
+
+public:
+ explicit TTestHttpServer(int port);
+ ~TTestHttpServer();
+ void Start();
+ TString GetUrl() const;
+ void SetRequestHandler(TRequestHandler handler);
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl_;
+};
+
+}
diff --git a/yql/essentials/utils/test_http_server/ya.make b/yql/essentials/utils/test_http_server/ya.make
new file mode 100644
index 0000000000..41c6710c5f
--- /dev/null
+++ b/yql/essentials/utils/test_http_server/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ test_http_server.cpp
+)
+
+PEERDIR(
+ library/cpp/http/server
+ library/cpp/http/misc
+)
+
+END()
diff --git a/yql/essentials/utils/threading/async_queue.cpp b/yql/essentials/utils/threading/async_queue.cpp
new file mode 100644
index 0000000000..4021636b55
--- /dev/null
+++ b/yql/essentials/utils/threading/async_queue.cpp
@@ -0,0 +1,18 @@
+#include "async_queue.h"
+
+namespace NYql {
+
+TAsyncQueue::TAsyncQueue(size_t numThreads, const TString& poolName) {
+ if (1 == numThreads) {
+ MtpQueue_.Reset(new TFakeThreadPool());
+ } else {
+ MtpQueue_.Reset(new TSimpleThreadPool(TThreadPoolParams{poolName}));
+ }
+ MtpQueue_->Start(numThreads);
+}
+
+TAsyncQueue::TPtr TAsyncQueue::Make(size_t numThreads, const TString& poolName) {
+ return new TAsyncQueue(numThreads, poolName);
+}
+
+}
diff --git a/yql/essentials/utils/threading/async_queue.h b/yql/essentials/utils/threading/async_queue.h
new file mode 100644
index 0000000000..3dd75b9e08
--- /dev/null
+++ b/yql/essentials/utils/threading/async_queue.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/threading/future/async.h>
+
+#include <util/thread/pool.h>
+#include <util/generic/ptr.h>
+#include <util/generic/function.h>
+#include <util/system/guard.h>
+#include <util/system/rwlock.h>
+
+#include <exception>
+
+namespace NYql {
+
+class TAsyncQueue: public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<TAsyncQueue>;
+
+ static TPtr Make(size_t numThreads, const TString& poolName);
+
+ void Stop() {
+ auto guard = TWriteGuard(Lock_);
+ if (MtpQueue_) {
+ MtpQueue_->Stop();
+ MtpQueue_.Destroy();
+ }
+ }
+
+ template <typename TCallable>
+ [[nodiscard]]
+ ::NThreading::TFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>> Async(TCallable&& func) {
+ {
+ auto guard = TReadGuard(Lock_);
+ if (MtpQueue_) {
+ return ::NThreading::Async(std::move(func), *MtpQueue_);
+ }
+ }
+
+ return ::NThreading::MakeErrorFuture<::NThreading::TFutureType<::TFunctionResult<TCallable>>>(std::make_exception_ptr(yexception() << "Thread pool is already stopped"));
+ }
+
+private:
+ TAsyncQueue(size_t numThreads, const TString& poolName);
+
+private:
+ TRWMutex Lock_;
+ THolder<IThreadPool> MtpQueue_;
+};
+
+} // NYql
diff --git a/yql/essentials/utils/threading/ya.make b/yql/essentials/utils/threading/ya.make
new file mode 100644
index 0000000000..d1cd6291a7
--- /dev/null
+++ b/yql/essentials/utils/threading/ya.make
@@ -0,0 +1,7 @@
+LIBRARY()
+
+SRCS(
+ async_queue.cpp
+)
+
+END()
diff --git a/yql/essentials/utils/url_builder.cpp b/yql/essentials/utils/url_builder.cpp
new file mode 100644
index 0000000000..31344fbb22
--- /dev/null
+++ b/yql/essentials/utils/url_builder.cpp
@@ -0,0 +1,50 @@
+#include "url_builder.h"
+#include <library/cpp/string_utils/quote/quote.h>
+#include <util/generic/yexception.h>
+
+namespace NYql {
+
+TUrlBuilder::TUrlBuilder(const TString& uri)
+ : MainUri(uri)
+{
+}
+
+TUrlBuilder& TUrlBuilder::AddUrlParam(const TString& name, const TString& value) {
+ Params.emplace_back(TParam {name, value});
+ return *this;
+}
+
+TUrlBuilder& TUrlBuilder::AddPathComponent(const TString& value) {
+ if (!value) {
+ throw yexception() << "Empty path component is not allowed";
+ }
+ TStringBuilder res;
+ res << MainUri;
+ if (!MainUri.EndsWith('/')) {
+ res << '/';
+ }
+ res << UrlEscapeRet(value, true);
+
+ MainUri = std::move(res);
+ return *this;
+}
+
+TString TUrlBuilder::Build() const {
+ if (Params.empty()) {
+ return MainUri;
+ }
+
+ TStringBuilder res;
+ res << MainUri << "?";
+ TStringBuf separator = ""sv;
+ for (const auto& p : Params) {
+ res << separator << p.Name;
+ if (p.Value) {
+ res << "=" << CGIEscapeRet(p.Value);
+ }
+ separator = "&"sv;
+ }
+ return std::move(res);
+}
+
+} // NYql
diff --git a/yql/essentials/utils/url_builder.h b/yql/essentials/utils/url_builder.h
new file mode 100644
index 0000000000..774bc3bcd8
--- /dev/null
+++ b/yql/essentials/utils/url_builder.h
@@ -0,0 +1,26 @@
+#pragma once
+
+#include <vector>
+#include <util/string/builder.h>
+
+namespace NYql {
+
+class TUrlBuilder {
+ struct TParam {
+ TString Name;
+ TString Value;
+ };
+public:
+ explicit TUrlBuilder(const TString& uri);
+
+ // Assuming name is already escaped, do not use strings from user input
+ TUrlBuilder& AddUrlParam(const TString& name, const TString& value = "");
+ TUrlBuilder& AddPathComponent(const TString& value);
+
+ TString Build() const;
+private:
+ std::vector<TParam> Params;
+ TString MainUri;
+};
+
+} // NYql
diff --git a/yql/essentials/utils/url_builder_ut.cpp b/yql/essentials/utils/url_builder_ut.cpp
new file mode 100644
index 0000000000..ad15a91698
--- /dev/null
+++ b/yql/essentials/utils/url_builder_ut.cpp
@@ -0,0 +1,57 @@
+#include "url_builder.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NYql;
+
+Y_UNIT_TEST_SUITE(TUrlBuilder) {
+ Y_UNIT_TEST(UriOnly) {
+ TUrlBuilder builder("https://localhost/abc");
+ UNIT_ASSERT_VALUES_EQUAL(builder.Build(), "https://localhost/abc");
+ }
+
+ Y_UNIT_TEST(Basic) {
+ TUrlBuilder builder("https://localhost/abc");
+ builder.AddUrlParam("param1", "val1");
+ builder.AddUrlParam("param2", "val2");
+
+ UNIT_ASSERT_VALUES_EQUAL(builder.Build(), "https://localhost/abc?param1=val1&param2=val2");
+ }
+
+ Y_UNIT_TEST(BasicWithEncoding) {
+ auto url = TUrlBuilder("https://localhost/abc")
+ .AddUrlParam("param1", "=!@#$%^&*(){}[]\" ")
+ .AddUrlParam("param2", "val2")
+ .Build();
+
+ UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc?param1=%3D!@%23$%25%5E%26*%28%29%7B%7D%5B%5D%22+&param2=val2");
+ }
+
+ Y_UNIT_TEST(EmptyPathComponent) {
+ TUrlBuilder builder("https://localhost/abc");
+ UNIT_ASSERT_EXCEPTION_CONTAINS(builder.AddPathComponent(""), std::exception, "Empty path component is not allowed");
+ auto url = builder.Build();
+ // not changed
+ UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc");
+ }
+
+ Y_UNIT_TEST(SeveralPathComponents) {
+ auto url = TUrlBuilder("https://localhost/abc")
+ .AddPathComponent("oops")
+ .AddPathComponent("long oops")
+ .AddUrlParam("param1", "val1")
+ .AddUrlParam("param1", "long param")
+ .Build();
+ UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc/oops/long%20oops?param1=val1&param1=long+param");
+ }
+
+ Y_UNIT_TEST(SeveralPathComponentsWithSlashInBaseUri) {
+ // base uri ends with '/'
+ auto url = TUrlBuilder("https://localhost/abc/")
+ .AddPathComponent("oops%1234")
+ .AddPathComponent("long&oops=xxx")
+ .AddUrlParam("param1", "a&b=cdef")
+ .Build();
+ UNIT_ASSERT_VALUES_EQUAL(url, "https://localhost/abc/oops%251234/long&oops=xxx?param1=a%26b%3Dcdef");
+ }
+}
diff --git a/yql/essentials/utils/ut/ya.make b/yql/essentials/utils/ut/ya.make
new file mode 100644
index 0000000000..0f7163ef79
--- /dev/null
+++ b/yql/essentials/utils/ut/ya.make
@@ -0,0 +1,14 @@
+UNITTEST_FOR(yql/essentials/utils)
+
+SRCS(
+ fp_bits_ut.cpp
+ md5_stream_ut.cpp
+ multi_resource_lock_ut.cpp
+ parse_double_ut.cpp
+ range_walker_ut.cpp
+ retry_ut.cpp
+ url_builder_ut.cpp
+ utf8_ut.cpp
+)
+
+END()
diff --git a/yql/essentials/utils/utf8.cpp b/yql/essentials/utils/utf8.cpp
new file mode 100644
index 0000000000..af284849a8
--- /dev/null
+++ b/yql/essentials/utils/utf8.cpp
@@ -0,0 +1,260 @@
+#include "utf8.h"
+
+#include <util/charset/wide.h>
+
+#include <ctype.h>
+#include <vector>
+
+namespace NYql {
+
+namespace {
+
+unsigned char GetRange(unsigned char c) {
+ // Referring to DFA of http://bjoern.hoehrmann.de/utf-8/decoder/dfa/
+ // With new mapping 1 -> 0x10, 7 -> 0x20, 9 -> 0x40, such that AND operation can test multiple types.
+ static const unsigned char type[] = {
+ 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0, 0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,
+ 0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,0x10,
+ 0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,0x40,
+ 0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,
+ 0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,0x20,
+ 8,8,2,2,2,2,2,2,2,2,2,2,2,2,2,2, 2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,2,
+ 10,3,3,3,3,3,3,3,3,3,3,3,3,4,3,3, 11,6,6,6,5,8,8,8,8,8,8,8,8,8,8,8,
+ };
+ return type[c];
+}
+
+struct TByteRange {
+ ui8 First = 0;
+ ui8 Last = 0;
+};
+
+struct TUtf8Ranges {
+ size_t BytesCount = 0;
+ TByteRange Bytes[4] = {};
+};
+
+// see https://lemire.me/blog/2018/05/09/how-quickly-can-you-check-that-a-string-is-valid-unicode-utf-8
+inline static const std::vector<TUtf8Ranges> Utf8Ranges = {
+ { 1, { {0x00, 0x7f}, {0x00, 0x00}, {0x00, 0x00}, {0x00, 0x00}, } },
+ { 2, { {0xc2, 0xdf}, {0x80, 0xbf}, {0x00, 0x00}, {0x00, 0x00}, } },
+ { 3, { {0xe0, 0xe0}, {0xa0, 0xbf}, {0x80, 0xbf}, {0x00, 0x00}, } },
+ { 3, { {0xe1, 0xec}, {0x80, 0xbf}, {0x80, 0xbf}, {0x00, 0x00}, } },
+ { 3, { {0xed, 0xed}, {0x80, 0x9f}, {0x80, 0xbf}, {0x00, 0x00}, } },
+ { 3, { {0xee, 0xef}, {0x80, 0xbf}, {0x80, 0xbf}, {0x00, 0x00}, } },
+ { 4, { {0xf0, 0xf0}, {0x90, 0xbf}, {0x80, 0xbf}, {0x80, 0xbf}, } },
+ { 4, { {0xf1, 0xf3}, {0x80, 0xbf}, {0x80, 0xbf}, {0x80, 0xbf}, } },
+ { 4, { {0xf4, 0xf4}, {0x80, 0x8f}, {0x80, 0xbf}, {0x80, 0xbf}, } },
+};
+
+std::optional<std::string> RoundBadUtf8(size_t range, std::string_view inputString, size_t pos,
+ bool roundDown)
+{
+ Y_ENSURE(range > 0);
+ Y_ENSURE(range < Utf8Ranges.size());
+
+ const std::string prefix{inputString.substr(0, pos)};
+ std::string_view suffix = inputString.substr(pos, Utf8Ranges[range].BytesCount);
+
+ std::string newSuffix;
+ if (roundDown) {
+ std::optional<size_t> lastNonMin;
+ for (size_t i = 0; i < suffix.size(); ++i) {
+ if (Utf8Ranges[range].Bytes[i].First < ui8(suffix[i])) {
+ lastNonMin = i;
+ }
+ }
+
+ if (!lastNonMin) {
+ for (size_t i = 0; i < Utf8Ranges[range - 1].BytesCount; ++i) {
+ newSuffix.push_back(Utf8Ranges[range - 1].Bytes[i].Last);
+ }
+ } else {
+ for (size_t i = 0; i < Utf8Ranges[range].BytesCount; ++i) {
+ if (i < *lastNonMin) {
+ ui8 c = suffix[i];
+ newSuffix.push_back(c);
+ } else if (i == *lastNonMin) {
+ ui8 c = suffix[i];
+ newSuffix.push_back(std::min<ui8>(c - 1, Utf8Ranges[range].Bytes[i].Last));
+ } else {
+ newSuffix.push_back(Utf8Ranges[range].Bytes[i].Last);
+ }
+ }
+ }
+ } else {
+ std::optional<size_t> lastNonMax;
+ bool valid = true;
+ for (size_t i = 0; i < suffix.size(); ++i) {
+ ui8 last = Utf8Ranges[range].Bytes[i].Last;
+ ui8 first = Utf8Ranges[range].Bytes[i].First;
+ ui8 curr = ui8(suffix[i]);
+
+ valid = valid && curr <= last && curr >= first;
+ if (curr < last) {
+ lastNonMax = i;
+ }
+ }
+
+ if (valid) {
+ newSuffix = suffix;
+ for (size_t i = suffix.size(); i < Utf8Ranges[range].BytesCount; ++i) {
+ newSuffix.push_back(Utf8Ranges[range].Bytes[i].First);
+ }
+ } else if (!lastNonMax) {
+ return NextValidUtf8(prefix);
+ } else {
+ for (size_t i = 0; i < Utf8Ranges[range].BytesCount; ++i) {
+ if (i < *lastNonMax) {
+ ui8 c = suffix[i];
+ newSuffix.push_back(c);
+ } else if (i == *lastNonMax) {
+ ui8 c = suffix[i];
+ newSuffix.push_back(std::max<ui8>(c + 1, Utf8Ranges[range].Bytes[i].First));
+ } else {
+ newSuffix.push_back(Utf8Ranges[range].Bytes[i].First);
+ }
+ }
+ }
+
+ }
+ return prefix + newSuffix;
+}
+
+}
+
+bool IsUtf8(const std::string_view& str) {
+ for (auto it = str.cbegin(); str.cend() != it;) {
+#define COPY() if (str.cend() != it) { c = *it++; } else { return false; }
+#define TRANS(mask) result &= ((GetRange(static_cast<unsigned char>(c)) & mask) != 0)
+#define TAIL() COPY(); TRANS(0x70)
+ auto c = *it++;
+ if (!(c & 0x80))
+ continue;
+
+ bool result = true;
+ switch (GetRange(static_cast<unsigned char>(c))) {
+ case 2: TAIL(); break;
+ case 3: TAIL(); TAIL(); break;
+ case 4: COPY(); TRANS(0x50); TAIL(); break;
+ case 5: COPY(); TRANS(0x10); TAIL(); TAIL(); break;
+ case 6: TAIL(); TAIL(); TAIL(); break;
+ case 10: COPY(); TRANS(0x20); TAIL(); break;
+ case 11: COPY(); TRANS(0x60); TAIL(); TAIL(); break;
+ default: return false;
+ }
+
+ if (!result) return false;
+#undef COPY
+#undef TRANS
+#undef TAIL
+ }
+ return true;
+}
+
+unsigned char WideCharSize(char head) {
+ switch (GetRange(static_cast<unsigned char>(head))) {
+ case 0: return 1;
+ case 2: return 2;
+ case 3: return 3;
+ case 4: return 3;
+ case 5: return 4;
+ case 6: return 4;
+ case 10: return 3;
+ case 11: return 4;
+ default: return 0;
+ }
+}
+
+std::optional<std::string> RoundToNearestValidUtf8(const std::string_view& str, bool roundDown) {
+ const size_t ss = str.size();
+ for (size_t pos = 0; pos < ss; ) {
+ ui8 c = str[pos];
+
+ for (size_t i = 0; i < Utf8Ranges.size(); ++i) {
+ auto& range = Utf8Ranges[i];
+
+ if (c < range.Bytes[0].First) {
+ return RoundBadUtf8(i, str, pos, roundDown);
+ }
+
+ if (c <= range.Bytes[0].Last) {
+ // valid UTF8 code point start
+ for (size_t j = 1; j < range.BytesCount; ++j) {
+ if (pos + j >= ss) {
+ return RoundBadUtf8(i, str, pos, roundDown);
+ }
+ ui8 cur = str[pos + j];
+ if (!(cur >= range.Bytes[j].First && cur <= range.Bytes[j].Last)) {
+ return RoundBadUtf8(i, str, pos, roundDown);
+ }
+ }
+
+ pos += range.BytesCount;
+ break;
+ } else if (i + 1 == Utf8Ranges.size()) {
+ if (!roundDown) {
+ return NextValidUtf8(str.substr(0, pos));
+ }
+ return RoundBadUtf8(i, str, pos, roundDown);
+ }
+ }
+ }
+ return std::string(str);
+}
+
+std::optional<std::string> NextValidUtf8(const std::string_view& str) {
+ Y_ENSURE(IsUtf8(str));
+ TUtf32String wide = UTF8ToUTF32<false>(str);
+ bool incremented = false;
+ size_t toDrop = 0;
+ for (auto it = wide.rbegin(); it != wide.rend(); ++it) {
+ auto& c = *it;
+ if (c < 0x10ffff) {
+ c = (c == 0xd7ff) ? 0xe000 : (c + 1);
+ incremented = true;
+ break;
+ } else {
+ ++toDrop;
+ }
+ }
+
+ if (!incremented) {
+ return {};
+ }
+
+ Y_ENSURE(toDrop < wide.size());
+ wide.resize(wide.size() - toDrop);
+
+ TString result = WideToUTF8(wide);
+ return std::string(result.data(), result.size());
+}
+
+std::optional<std::string> NextLexicographicString(const std::string_view& str) {
+ bool incremented = false;
+ size_t toDrop = 0;
+ std::string result{str};
+ for (auto it = result.rbegin(); it != result.rend(); ++it) {
+ auto& c = *it;
+ if (ui8(c) < 0xff) {
+ ++c;
+ incremented = true;
+ break;
+ } else {
+ ++toDrop;
+ }
+ }
+
+ if (!incremented) {
+ return {};
+ }
+
+ Y_ENSURE(toDrop < result.size());
+ result.resize(result.size() - toDrop);
+ return result;
+}
+
+}
diff --git a/yql/essentials/utils/utf8.h b/yql/essentials/utils/utf8.h
new file mode 100644
index 0000000000..5c28353416
--- /dev/null
+++ b/yql/essentials/utils/utf8.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include <optional>
+#include <string_view>
+
+namespace NYql {
+
+bool IsUtf8(const std::string_view& str);
+
+unsigned char WideCharSize(char head);
+
+std::optional<std::string> RoundToNearestValidUtf8(const std::string_view& str, bool roundDown);
+std::optional<std::string> NextValidUtf8(const std::string_view& str);
+std::optional<std::string> NextLexicographicString(const std::string_view& str);
+
+}
diff --git a/yql/essentials/utils/utf8_ut.cpp b/yql/essentials/utils/utf8_ut.cpp
new file mode 100644
index 0000000000..7479acd7a1
--- /dev/null
+++ b/yql/essentials/utils/utf8_ut.cpp
@@ -0,0 +1,99 @@
+#include "utf8.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+Y_UNIT_TEST_SUITE(TUtf8Tests) {
+ Y_UNIT_TEST(Simple) {
+ UNIT_ASSERT(NYql::IsUtf8(""));
+ UNIT_ASSERT(NYql::IsUtf8("\x01_ASCII_\x7F"));
+ UNIT_ASSERT(NYql::IsUtf8("Привет!"));
+ UNIT_ASSERT(NYql::IsUtf8("\xF0\x9F\x94\xA2"));
+
+ UNIT_ASSERT(!NYql::IsUtf8("\xf5\x80\x80\x80"));
+ UNIT_ASSERT(!NYql::IsUtf8("\xed\xa6\x80"));
+ UNIT_ASSERT(!NYql::IsUtf8("\xF0\x9F\x94"));
+ UNIT_ASSERT(!NYql::IsUtf8("\xE3\x85\xB6\xE7\x9C\xB0\xE3\x9C\xBA\xE2\xAA\x96\xEE\xA2\x8C\xEC\xAF\xB8\xE1\xB2\xBB\xEC\xA3\x9C\xE3\xAB\x8B\xEC\x95\x92\xE1\x8A\xBF\xE2\x8E\x86\xEC\x9B\x8D\xE2\x8E\xAE\xE3\x8A\xA3\xE0\xAC\xBC\xED\xB6\x85"));
+ UNIT_ASSERT(!NYql::IsUtf8("\xc0\xbe\xd0\xb1\xd0\xbd\xd0\xbe\xd0\xb2\xd0\xbb\xd0\xb5\xd0\xbd\xd0\xb8\xd1\x8e"));
+ }
+
+ Y_UNIT_TEST(CharSize) {
+ UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize(' '), 1);
+ UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize('\x00'), 1);
+ UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize('\x7F'), 1);
+ UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize('\xD1'), 2);
+ UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize('\xF0'), 4);
+ UNIT_ASSERT_VALUES_EQUAL(NYql::WideCharSize('\xFF'), 0);
+ }
+
+ Y_UNIT_TEST(RoundingDown) {
+ auto checkDown = [](std::string_view in, std::string_view out) {
+ auto res = NYql::RoundToNearestValidUtf8(in, true);
+ UNIT_ASSERT(res);
+ UNIT_ASSERT(NYql::IsUtf8(*res));
+ UNIT_ASSERT_VALUES_EQUAL(*res, out);
+ UNIT_ASSERT(*res <= in);
+ };
+ checkDown("привет", "привет");
+ checkDown("тест\x80", "тест\x7f");
+ checkDown("привет\xf5", "привет\xf4\x8f\xbf\xbf");
+ checkDown("тест2\xee\x80\x7f", "тест2\xed\x9f\xbf");
+ checkDown("ага\xf0\xaa\xaa\xff", "ага\xf0\xaa\xaa\xbf");
+ }
+
+ Y_UNIT_TEST(RoundingUp) {
+ auto checkUp = [](std::string_view in, std::string_view out) {
+ auto res = NYql::RoundToNearestValidUtf8(in, false);
+ UNIT_ASSERT(res);
+ UNIT_ASSERT(NYql::IsUtf8(*res));
+ UNIT_ASSERT_VALUES_EQUAL(*res, out);
+ UNIT_ASSERT(*res >= in);
+ };
+
+ checkUp("", "");
+ checkUp("привет", "привет");
+ checkUp("а\xf6", "б");
+ checkUp("\xf4\x8f\xbf\xbfа\xf4\x8f\xbf\xbf\xf5", "\xf4\x8f\xbf\xbfб");
+ UNIT_ASSERT(!NYql::RoundToNearestValidUtf8("\xf4\x8f\xbf\xbf\xf5", false));
+ UNIT_ASSERT(!NYql::RoundToNearestValidUtf8("\xf5", false));
+ checkUp("тест\x80", "тест\xc2\x80");
+ checkUp("тест\xdf", "тест\xdf\x80");
+ checkUp("тест\xf0\x90\xff", "тест\xf0\x91\x80\x80");
+ checkUp("ааа\xff", "ааб");
+ }
+
+ Y_UNIT_TEST(NextValid) {
+ auto checkNext = [](std::string_view in, std::string_view out) {
+ auto res = NYql::NextValidUtf8(in);
+ UNIT_ASSERT(res);
+ UNIT_ASSERT(NYql::IsUtf8(*res));
+ UNIT_ASSERT_VALUES_EQUAL(*res, out);
+ UNIT_ASSERT(*res > in);
+ };
+
+ UNIT_ASSERT(!NYql::NextValidUtf8(""));
+ checkNext("привет", "привеу");
+ checkNext("а", "б");
+ checkNext(std::string_view("\x00", 1), "\x01");
+ checkNext("\xf4\x8f\xbf\xbfа\xf4\x8f\xbf\xbf", "\xf4\x8f\xbf\xbfб");
+ UNIT_ASSERT(!NYql::NextValidUtf8("\xf4\x8f\xbf\xbf"));
+ UNIT_ASSERT(!NYql::NextValidUtf8("\xf4\x8f\xbf\xbf\xf4\x8f\xbf\xbf"));
+ }
+
+ Y_UNIT_TEST(NextValidString) {
+ auto checkNext = [](std::string_view in, std::string_view out) {
+ auto res = NYql::NextLexicographicString(in);
+ UNIT_ASSERT(res);
+ UNIT_ASSERT_VALUES_EQUAL(*res, out);
+ UNIT_ASSERT(*res > in);
+ };
+
+ UNIT_ASSERT(!NYql::NextLexicographicString(""));
+ checkNext("привет", "привеу");
+ checkNext("а", "б");
+ checkNext(std::string_view("\x00", 1), "\x01");
+ checkNext("\xf4\x8f\xbf\xbfа\xf4\x8f\xbf\xbf", "\xf4\x8f\xbf\xbfа\xf4\x8f\xbf\xc0");
+ UNIT_ASSERT(!NYql::NextLexicographicString("\xff"));
+ UNIT_ASSERT(!NYql::NextLexicographicString("\xff\xff"));
+ checkNext(std::string_view("x\x00\xff\xff", 4), "x\x01");
+ }
+}
diff --git a/yql/essentials/utils/ya.make b/yql/essentials/utils/ya.make
new file mode 100644
index 0000000000..8d18004e2b
--- /dev/null
+++ b/yql/essentials/utils/ya.make
@@ -0,0 +1,67 @@
+LIBRARY()
+
+SRCS(
+ cast.h
+ debug_info.cpp
+ debug_info.h
+ exceptions.cpp
+ exceptions.h
+ future_action.cpp
+ future_action.h
+ hash.cpp
+ hash.h
+ limiting_allocator.cpp
+ md5_stream.cpp
+ md5_stream.h
+ method_index.cpp
+ method_index.h
+ multi_resource_lock.cpp
+ multi_resource_lock.h
+ parse_double.cpp
+ parse_double.h
+ proc_alive.cpp
+ proc_alive.h
+ rand_guid.cpp
+ rand_guid.h
+ resetable_setting.h
+ retry.cpp
+ retry.h
+ sort.cpp
+ sort.h
+ swap_bytes.cpp
+ swap_bytes.h
+ url_builder.cpp
+ utf8.cpp
+ yql_panic.cpp
+ yql_panic.h
+ yql_paths.cpp
+ yql_paths.h
+)
+
+PEERDIR(
+ library/cpp/digest/md5
+ library/cpp/string_utils/quote
+ library/cpp/threading/future
+ library/cpp/deprecated/atomic
+ contrib/libs/miniselect
+)
+
+END()
+
+RECURSE(
+ backtrace
+ failure_injector
+ fetch
+ log
+ network
+ rope
+ signals
+ sys
+ test_http_server
+ threading
+)
+
+RECURSE_FOR_TESTS(
+ ut
+)
+
diff --git a/yql/essentials/utils/yql_panic.cpp b/yql/essentials/utils/yql_panic.cpp
new file mode 100644
index 0000000000..83c6acc21f
--- /dev/null
+++ b/yql/essentials/utils/yql_panic.cpp
@@ -0,0 +1,18 @@
+#include "yql_panic.h"
+
+namespace NYql {
+namespace NDetail {
+
+void YqlPanic(const ::NPrivate::TStaticBuf& file, int line, const char* function,
+ const TStringBuf& condition, const TStringBuf& message) {
+ auto err = TYqlPanic() << file.As<TStringBuf>() << ":" << line << " "
+ << function << "(): requirement " << condition << " failed";
+ if (!message.empty()) {
+ err << ", message: " << message;
+ }
+
+ throw err;
+}
+
+} // namespace NDetail
+} // namespace NYql
diff --git a/yql/essentials/utils/yql_panic.h b/yql/essentials/utils/yql_panic.h
new file mode 100644
index 0000000000..e7e559b2c5
--- /dev/null
+++ b/yql/essentials/utils/yql_panic.h
@@ -0,0 +1,22 @@
+#pragma once
+#include <util/generic/yexception.h>
+#include <util/string/builder.h>
+#include <util/system/src_root.h>
+
+namespace NYql {
+
+class TYqlPanic : public yexception
+{};
+
+namespace NDetail {
+ [[noreturn]] void YqlPanic(const ::NPrivate::TStaticBuf& file, int line, const char* function, const TStringBuf& condition, const TStringBuf& message);
+}
+
+#define YQL_ENSURE(CONDITION, ...) \
+ do { \
+ if (Y_UNLIKELY(!(CONDITION))) { \
+ ::NYql::NDetail::YqlPanic(__SOURCE_FILE_IMPL__, __LINE__, __FUNCTION__, #CONDITION, TStringBuilder() << "" __VA_ARGS__); \
+ } \
+ } while (0)
+
+} // namespace NYql
diff --git a/yql/essentials/utils/yql_paths.cpp b/yql/essentials/utils/yql_paths.cpp
new file mode 100644
index 0000000000..cc7dd29bfb
--- /dev/null
+++ b/yql/essentials/utils/yql_paths.cpp
@@ -0,0 +1,23 @@
+#include "yql_paths.h"
+
+#include <util/folder/pathsplit.h>
+
+namespace NYql {
+
+TString BuildTablePath(TStringBuf prefixPath, TStringBuf path) {
+ if (prefixPath.empty()) {
+ return TString(path);
+ }
+ prefixPath.SkipPrefix("//");
+
+ TPathSplitUnix prefixPathSplit(prefixPath);
+ TPathSplitUnix pathSplit(path);
+
+ if (pathSplit.IsAbsolute) {
+ return TString(path);
+ }
+
+ return prefixPathSplit.AppendMany(pathSplit.begin(), pathSplit.end()).Reconstruct();
+}
+
+}
diff --git a/yql/essentials/utils/yql_paths.h b/yql/essentials/utils/yql_paths.h
new file mode 100644
index 0000000000..3ec05e7df8
--- /dev/null
+++ b/yql/essentials/utils/yql_paths.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <util/generic/strbuf.h>
+
+namespace NYql {
+
+TString BuildTablePath(TStringBuf prefixPath, TStringBuf path);
+
+}
diff --git a/yql/essentials/ya.make b/yql/essentials/ya.make
index 57841ee7e2..9eb5692b54 100644
--- a/yql/essentials/ya.make
+++ b/yql/essentials/ya.make
@@ -2,5 +2,6 @@ SUBSCRIBER(g:yql)
RECURSE(
public
+ utils
)