aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-02-09 12:40:11 +0300
committerhor911 <hor911@ydb.tech>2023-02-09 12:40:11 +0300
commit24689527cd888aa8a640ecb5077e656b3520d373 (patch)
treea613ff4cd9567b7113e8376a17f8b85897a42790
parent8642d3642932f03663ba7d2d9670707c192207fd (diff)
downloadydb-24689527cd888aa8a640ecb5077e656b3520d373.tar.gz
Log backend move
-rw-r--r--library/cpp/CMakeLists.darwin.txt1
-rw-r--r--library/cpp/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/CMakeLists.linux.txt1
-rw-r--r--library/cpp/unified_agent_client/CMakeLists.darwin.txt63
-rw-r--r--library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt65
-rw-r--r--library/cpp/unified_agent_client/CMakeLists.linux.txt65
-rw-r--r--library/cpp/unified_agent_client/CMakeLists.txt15
-rw-r--r--library/cpp/unified_agent_client/async_joiner.h42
-rw-r--r--library/cpp/unified_agent_client/backend.cpp112
-rw-r--r--library/cpp/unified_agent_client/backend.h27
-rw-r--r--library/cpp/unified_agent_client/backend_creator.cpp63
-rw-r--r--library/cpp/unified_agent_client/backend_creator.h25
-rw-r--r--library/cpp/unified_agent_client/client.h256
-rw-r--r--library/cpp/unified_agent_client/client_impl.cpp1274
-rw-r--r--library/cpp/unified_agent_client/client_impl.h364
-rw-r--r--library/cpp/unified_agent_client/client_proto_weighing.h75
-rw-r--r--library/cpp/unified_agent_client/clock.cpp48
-rw-r--r--library/cpp/unified_agent_client/clock.h37
-rw-r--r--library/cpp/unified_agent_client/counters.cpp36
-rw-r--r--library/cpp/unified_agent_client/counters.h38
-rw-r--r--library/cpp/unified_agent_client/duration_counter.cpp41
-rw-r--r--library/cpp/unified_agent_client/duration_counter.h43
-rw-r--r--library/cpp/unified_agent_client/dynamic_counters_wrapper.h34
-rw-r--r--library/cpp/unified_agent_client/enum.h30
-rw-r--r--library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp122
-rw-r--r--library/cpp/unified_agent_client/f_maybe.h23
-rw-r--r--library/cpp/unified_agent_client/grpc_io.cpp161
-rw-r--r--library/cpp/unified_agent_client/grpc_io.h141
-rw-r--r--library/cpp/unified_agent_client/grpc_status_code.cpp56
-rw-r--r--library/cpp/unified_agent_client/helpers.cpp65
-rw-r--r--library/cpp/unified_agent_client/helpers.h17
-rw-r--r--library/cpp/unified_agent_client/logger.cpp131
-rw-r--r--library/cpp/unified_agent_client/logger.h157
-rw-r--r--library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt45
-rw-r--r--library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt46
-rw-r--r--library/cpp/unified_agent_client/proto/CMakeLists.linux.txt46
-rw-r--r--library/cpp/unified_agent_client/proto/CMakeLists.txt15
-rw-r--r--library/cpp/unified_agent_client/proto/unified_agent.proto101
-rw-r--r--library/cpp/unified_agent_client/proto_weighing.cpp99
-rw-r--r--library/cpp/unified_agent_client/proto_weighing.h138
-rw-r--r--library/cpp/unified_agent_client/registrar.cpp8
-rw-r--r--library/cpp/unified_agent_client/throttling.cpp67
-rw-r--r--library/cpp/unified_agent_client/throttling.h30
-rw-r--r--library/cpp/unified_agent_client/variant.h18
-rw-r--r--ydb/apps/ydbd/CMakeLists.darwin.txt1
-rw-r--r--ydb/apps/ydbd/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/apps/ydbd/CMakeLists.linux.txt1
-rw-r--r--ydb/apps/ydbd/log_backend.cpp31
-rw-r--r--ydb/apps/ydbd/log_backend.h13
-rw-r--r--ydb/apps/ydbd/main.cpp2
-rw-r--r--ydb/core/CMakeLists.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux.txt2
-rw-r--r--ydb/core/driver_lib/run/factories.h5
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp9
-rw-r--r--ydb/core/driver_lib/run/log_backend.cpp32
-rw-r--r--ydb/core/driver_lib/run/log_backend.h29
-rw-r--r--ydb/core/driver_lib/run/run.cpp10
-rw-r--r--ydb/core/log_backend/CMakeLists.darwin.txt23
-rw-r--r--ydb/core/log_backend/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/core/log_backend/CMakeLists.linux.txt24
-rw-r--r--ydb/core/log_backend/CMakeLists.txt15
-rw-r--r--ydb/core/log_backend/log_backend.cpp66
-rw-r--r--ydb/core/log_backend/log_backend.h18
-rw-r--r--ydb/core/log_backend/log_backend_build.cpp51
-rw-r--r--ydb/core/log_backend/log_backend_build.h21
67 files changed, 4495 insertions, 130 deletions
diff --git a/library/cpp/CMakeLists.darwin.txt b/library/cpp/CMakeLists.darwin.txt
index c583b41c41..2e5648a15c 100644
--- a/library/cpp/CMakeLists.darwin.txt
+++ b/library/cpp/CMakeLists.darwin.txt
@@ -82,6 +82,7 @@ add_subdirectory(time_provider)
add_subdirectory(timezone_conversion)
add_subdirectory(tld)
add_subdirectory(unicode)
+add_subdirectory(unified_agent_client)
add_subdirectory(uri)
add_subdirectory(xml)
add_subdirectory(yaml)
diff --git a/library/cpp/CMakeLists.linux-aarch64.txt b/library/cpp/CMakeLists.linux-aarch64.txt
index 45ca41d349..09cf4909bb 100644
--- a/library/cpp/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/CMakeLists.linux-aarch64.txt
@@ -81,6 +81,7 @@ add_subdirectory(time_provider)
add_subdirectory(timezone_conversion)
add_subdirectory(tld)
add_subdirectory(unicode)
+add_subdirectory(unified_agent_client)
add_subdirectory(uri)
add_subdirectory(xml)
add_subdirectory(yaml)
diff --git a/library/cpp/CMakeLists.linux.txt b/library/cpp/CMakeLists.linux.txt
index c583b41c41..2e5648a15c 100644
--- a/library/cpp/CMakeLists.linux.txt
+++ b/library/cpp/CMakeLists.linux.txt
@@ -82,6 +82,7 @@ add_subdirectory(time_provider)
add_subdirectory(timezone_conversion)
add_subdirectory(tld)
add_subdirectory(unicode)
+add_subdirectory(unified_agent_client)
add_subdirectory(uri)
add_subdirectory(xml)
add_subdirectory(yaml)
diff --git a/library/cpp/unified_agent_client/CMakeLists.darwin.txt b/library/cpp/unified_agent_client/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..ac4481155f
--- /dev/null
+++ b/library/cpp/unified_agent_client/CMakeLists.darwin.txt
@@ -0,0 +1,63 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(proto)
+
+add_library(library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp
+)
+generate_enum_serilization(library-cpp-unified_agent_client
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/grpc_io.h
+)
+
+add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client.global PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp
+)
diff --git a/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt b/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..01a8bd743d
--- /dev/null
+++ b/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,65 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(proto)
+
+add_library(library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp
+)
+generate_enum_serilization(library-cpp-unified_agent_client
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/grpc_io.h
+)
+
+add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client.global PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp
+)
diff --git a/library/cpp/unified_agent_client/CMakeLists.linux.txt b/library/cpp/unified_agent_client/CMakeLists.linux.txt
new file mode 100644
index 0000000000..01a8bd743d
--- /dev/null
+++ b/library/cpp/unified_agent_client/CMakeLists.linux.txt
@@ -0,0 +1,65 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(proto)
+
+add_library(library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp
+)
+generate_enum_serilization(library-cpp-unified_agent_client
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/grpc_io.h
+)
+
+add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client.global PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp
+)
diff --git a/library/cpp/unified_agent_client/CMakeLists.txt b/library/cpp/unified_agent_client/CMakeLists.txt
new file mode 100644
index 0000000000..5bb4faffb4
--- /dev/null
+++ b/library/cpp/unified_agent_client/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/library/cpp/unified_agent_client/async_joiner.h b/library/cpp/unified_agent_client/async_joiner.h
new file mode 100644
index 0000000000..ce392ef7bc
--- /dev/null
+++ b/library/cpp/unified_agent_client/async_joiner.h
@@ -0,0 +1,42 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+
+namespace NUnifiedAgent {
+ class TAsyncJoiner {
+ public:
+ inline TAsyncJoiner()
+ : Promise(NThreading::NewPromise())
+ , Refs(1)
+ {
+ }
+
+ inline i64 Ref(i64 count = 1) noexcept {
+ const auto result = Refs.fetch_add(count);
+ Y_VERIFY(result >= 1, "already joined");
+ return result;
+ }
+
+ inline i64 UnRef() noexcept {
+ const auto prev = Refs.fetch_sub(1);
+ Y_VERIFY(prev >= 1);
+ if (prev == 1) {
+ auto p = Promise;
+ p.SetValue();
+ }
+ return prev;
+ }
+
+ inline NThreading::TFuture<void> Join() noexcept {
+ auto result = Promise;
+ UnRef();
+ return result;
+ }
+
+ private:
+ NThreading::TPromise<void> Promise;
+ std::atomic<i64> Refs;
+ };
+
+ using TAsyncJoinerToken = TIntrusivePtr<TAsyncJoiner>;
+}
diff --git a/library/cpp/unified_agent_client/backend.cpp b/library/cpp/unified_agent_client/backend.cpp
new file mode 100644
index 0000000000..b3c4b4ebcf
--- /dev/null
+++ b/library/cpp/unified_agent_client/backend.cpp
@@ -0,0 +1,112 @@
+#include "backend.h"
+
+#include <library/cpp/unified_agent_client/enum.h>
+
+#include <library/cpp/logger/record.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/guid.h>
+#include <util/generic/serialized_enum.h>
+
+namespace NUnifiedAgent {
+ namespace {
+ class TDefaultRecordConverter : public IRecordConverter {
+ public:
+ TDefaultRecordConverter(bool stripTrailingNewLine)
+ : StripTrailingNewLine(stripTrailingNewLine)
+ , PriorityKey("_priority")
+ {
+ }
+
+ TClientMessage Convert(const TLogRecord& rec) const override {
+ const auto stripTrailingNewLine = StripTrailingNewLine &&
+ rec.Len > 0 && rec.Data[rec.Len - 1] == '\n';
+
+ THashMap<TString, TString> metaFlags{{PriorityKey, NameOf(rec.Priority)}};
+ metaFlags.insert(rec.MetaFlags.begin(), rec.MetaFlags.end());
+
+ return {
+ TString(rec.Data, stripTrailingNewLine ? rec.Len - 1 : rec.Len),
+ std::move(metaFlags)
+ };
+ }
+
+ private:
+ const bool StripTrailingNewLine;
+ const TString PriorityKey;
+ };
+
+ class TClientSessionAdapter: public TLogBackend {
+ public:
+ explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IRecordConverter> recordConverter)
+ : Session(session)
+ , RecordConverter(std::move(recordConverter))
+ {
+ }
+
+ void WriteData(const TLogRecord& rec) override {
+ Session->Send(RecordConverter->Convert(rec));
+ }
+
+ void ReopenLog() override {
+ }
+
+ private:
+ TClientSessionPtr Session;
+ THolder<IRecordConverter> RecordConverter;
+ };
+
+ class TSessionHolder {
+ protected:
+ TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters)
+ : Client(MakeClient(parameters))
+ , Session(Client->CreateSession(sessionParameters))
+ {
+ }
+
+ protected:
+ TClientPtr Client;
+ TClientSessionPtr Session;
+ };
+
+ class TAgentLogBackend: private TSessionHolder, public TClientSessionAdapter {
+ public:
+ TAgentLogBackend(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters,
+ THolder<IRecordConverter> recordConverter)
+ : TSessionHolder(parameters, sessionParameters)
+ , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter))
+ {
+ }
+
+ ~TAgentLogBackend() override {
+ TSessionHolder::Session->Close();
+ }
+ };
+ }
+
+ THolder<IRecordConverter> MakeDefaultRecordConverter(bool stripTrailingNewLine) {
+ return MakeHolder<TDefaultRecordConverter>(stripTrailingNewLine);
+ }
+
+ THolder<TLogBackend> AsLogBackend(const TClientSessionPtr& session, bool stripTrailingNewLine) {
+ return MakeHolder<TClientSessionAdapter>(session, MakeDefaultRecordConverter(stripTrailingNewLine));
+ }
+
+ THolder<TLogBackend> MakeLogBackend(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters,
+ THolder<IRecordConverter> recordConverter)
+ {
+ if (!recordConverter) {
+ recordConverter = MakeDefaultRecordConverter();
+ }
+ return MakeHolder<TAgentLogBackend>(parameters, sessionParameters, std::move(recordConverter));
+ }
+
+ THolder<::TLog> MakeLog(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters,
+ THolder<IRecordConverter> recordConverter)
+ {
+ return MakeHolder<::TLog>(MakeLogBackend(parameters, sessionParameters, std::move(recordConverter)));
+ }
+}
diff --git a/library/cpp/unified_agent_client/backend.h b/library/cpp/unified_agent_client/backend.h
new file mode 100644
index 0000000000..41e8d146b3
--- /dev/null
+++ b/library/cpp/unified_agent_client/backend.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/client.h>
+
+#include <library/cpp/logger/backend.h>
+#include <library/cpp/logger/log.h>
+
+namespace NUnifiedAgent {
+ class IRecordConverter {
+ public:
+ virtual ~IRecordConverter() = default;
+
+ virtual TClientMessage Convert(const TLogRecord&) const = 0;
+ };
+
+ THolder<IRecordConverter> MakeDefaultRecordConverter(bool stripTrailingNewLine = true);
+
+ THolder<TLogBackend> AsLogBackend(const TClientSessionPtr& session, bool stripTrailingNewLine = true);
+
+ THolder<TLogBackend> MakeLogBackend(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters = {},
+ THolder<IRecordConverter> recordConverter = {});
+
+ THolder<::TLog> MakeLog(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters = {},
+ THolder<IRecordConverter> recordConverter = {});
+}
diff --git a/library/cpp/unified_agent_client/backend_creator.cpp b/library/cpp/unified_agent_client/backend_creator.cpp
new file mode 100644
index 0000000000..825e3ebd2b
--- /dev/null
+++ b/library/cpp/unified_agent_client/backend_creator.cpp
@@ -0,0 +1,63 @@
+#include "backend_creator.h"
+#include <library/cpp/logger/global/global.h>
+
+namespace NUnifiedAgent {
+
+
+ TLogBackendCreator::TLogBackendCreator()
+ : TLogBackendCreatorBase("unified_agent")
+ {}
+
+ bool TLogBackendCreator::Init(const IInitContext& ctx) {
+ if(TString socket = ctx.GetOrElse("Uri", TString())) {
+ ClientParams = MakeHolder<TClientParameters>(socket);
+ } else {
+ Cdbg << "Uri not set for unified_agent log backend" << Endl;
+ return false;
+ }
+ TString secretKey;
+ ctx.GetValue("SharedSecretKey", secretKey);
+ if (secretKey) {
+ ClientParams->SharedSecretKey = secretKey;
+ }
+ ctx.GetValue("MaxInflightBytes", ClientParams->MaxInflightBytes);
+ ctx.GetValue("GrpcSendDelay", ClientParams->GrpcSendDelay);
+ size_t rateLimit;
+ if (ctx.GetValue("LogRateLimit", rateLimit)) {
+ ClientParams->LogRateLimitBytes = rateLimit;
+ }
+ ctx.GetValue("GrpcReconnectDelay", ClientParams->GrpcReconnectDelay);
+ ctx.GetValue("GrpcMaxMessageSize", ClientParams->GrpcMaxMessageSize);
+ const auto ownLogger = ctx.GetChildren("OwnLogger");
+ if (!ownLogger.empty() && ownLogger.front()->GetOrElse("LoggerType", TString()) != "global") {
+ OwnLogger = ILogBackendCreator::Create(*ownLogger.front());
+ TLog log;
+ log.ResetBackend(OwnLogger->CreateLogBackend());
+ ClientParams->SetLog(log);
+ }
+ return true;
+ }
+
+
+ void TLogBackendCreator::DoToJson(NJson::TJsonValue& value) const {
+ value["Uri"] = ClientParams->Uri;
+ if (ClientParams->SharedSecretKey) {
+ value["SharedSecretKey"] = *ClientParams->SharedSecretKey;
+ }
+ value["MaxInflightBytes"] = ClientParams->MaxInflightBytes;
+ value["GrpcSendDelay"] = ClientParams->GrpcSendDelay.ToString();
+ if (ClientParams->LogRateLimitBytes) {
+ value["LogRateLimit"] = *ClientParams->LogRateLimitBytes;
+ }
+ value["GrpcReconnectDelay"] = ClientParams->GrpcReconnectDelay.ToString();
+ value["GrpcMaxMessageSize"] = ClientParams->GrpcMaxMessageSize;
+ if (OwnLogger) {
+ OwnLogger->ToJson(value["OwnLogger"].AppendValue(NJson::JSON_MAP));
+ }
+ }
+
+ THolder<TLogBackend> TLogBackendCreator::DoCreateLogBackend() const {
+ return MakeLogBackend(*ClientParams);
+ }
+
+}
diff --git a/library/cpp/unified_agent_client/backend_creator.h b/library/cpp/unified_agent_client/backend_creator.h
new file mode 100644
index 0000000000..04928f616c
--- /dev/null
+++ b/library/cpp/unified_agent_client/backend_creator.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include "backend.h"
+#include <library/cpp/logger/backend_creator.h>
+
+namespace NUnifiedAgent {
+
+ class TLogBackendCreator: public TLogBackendCreatorBase {
+ public:
+ TLogBackendCreator();
+ bool Init(const IInitContext& ctx) override;
+ static TFactory::TRegistrator<TLogBackendCreator> Registrar;
+
+ protected:
+ void DoToJson(NJson::TJsonValue& value) const override;
+
+ private:
+ THolder<TLogBackend> DoCreateLogBackend() const override;
+
+ private:
+ THolder<TClientParameters> ClientParams;
+ THolder<ILogBackendCreator> OwnLogger;
+ };
+
+}
diff --git a/library/cpp/unified_agent_client/client.h b/library/cpp/unified_agent_client/client.h
new file mode 100644
index 0000000000..62e1210803
--- /dev/null
+++ b/library/cpp/unified_agent_client/client.h
@@ -0,0 +1,256 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/counters.h>
+
+#include <library/cpp/logger/log.h>
+#include <library/cpp/threading/future/future.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/hash.h>
+#include <util/generic/maybe.h>
+#include <util/generic/string.h>
+
+namespace NUnifiedAgent {
+ struct TClientParameters {
+ // uri format https://github.com/grpc/grpc/blob/master/doc/naming.md
+ // for example: unix:///unified_agent for unix domain sockets or localhost:12345 for tcp
+ explicit TClientParameters(const TString& uri);
+
+ // Simple way to protect against writing to unintended/invalid Unified Agent endpoint.
+ // Must correspond to 'shared_secret_key' grpc input parameter
+ // (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6333542#L219),
+ // session would end with error otherwise.
+ //
+ // Default: not set
+ TClientParameters& SetSharedSecretKey(const TString& sharedSecretKey) {
+ SharedSecretKey = sharedSecretKey;
+ return *this;
+ }
+
+ // Max bytes count that have been received by client session but not acknowledged yet.
+ // When exceeded, new messages will be discarded, an error message
+ // will be written to the TLog instance and drop counter will be incremented.
+ //
+ // Default: 10 mb
+ TClientParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
+ MaxInflightBytes = maxInflightBytes;
+ return *this;
+ }
+
+ // TLog instance for client library's own logs.
+ //
+ // Default: TLoggerOperator<TGlobalLog>::Log()
+ TClientParameters& SetLog(TLog& log) {
+ Log = log;
+ return *this;
+ }
+
+ // Throttle client library log by rate limit in bytes, excess will be discarded.
+ //
+ // Default: not set
+ TClientParameters& SetLogRateLimit(size_t bytesPerSec) {
+ LogRateLimitBytes = bytesPerSec;
+ return *this;
+ }
+
+ // Try to establish new grpc session if the current one become broken.
+ // Session may break either due to agent unavailability, or the agent itself may
+ // reject new session creation if it does not satisfy certain
+ // conditions - shared_secret_key does not match, the session creation rate has been
+ // exceeded, invalid session metadata has been used and so on.
+ // Attempts to establish a grpc session will continue indefinitely.
+ //
+ // Default: 50 millis
+ TClientParameters& SetGrpcReconnectDelay(TDuration delay) {
+ GrpcReconnectDelay = delay;
+ return *this;
+ }
+
+ // Grpc usually writes data to the socket faster than it comes from the client.
+ // This means that it's possible that each TClientMessage would be sent in it's own grpc message.
+ // This is expensive in terms of cpu, since grpc makes at least one syscall
+ // for each message on the sender and receiver sides.
+ // To avoid a large number of syscalls, the client holds incoming messages
+ // in internal buffer in hope of being able to assemble bigger grpc batch.
+ // This parameter sets the timeout for this delay - from IClientSession::Send
+ // call to the actual sending of the corresponding grpc message.
+ //
+ // Default: 10 millis.
+ TClientParameters& SetGrpcSendDelay(TDuration delay) {
+ GrpcSendDelay = delay;
+ return *this;
+ }
+
+ // Client library sends messages to grpc in batches, this parameter
+ // establishes upper limit on the size of single batch in bytes.
+ // If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185)
+ // in grpc input config, it must be grater than GrpcMaxMessageSize.
+ //
+ // Default: 1 mb
+ TClientParameters& SetGrpcMaxMessageSize(size_t size) {
+ GrpcMaxMessageSize = size;
+ return *this;
+ }
+
+ // Enable forks handling in client library.
+ // Multiple threads and concurrent forks are all supported is this regime.
+ //
+ // Default: false
+ TClientParameters& SetEnableForkSupport(bool value) {
+ EnableForkSupport = value;
+ return *this;
+ }
+
+ // Client library counters.
+ // App can set this to some leaf of it's TDynamicCounters tree.
+ // Actual provided counters are listed in TClientCounters.
+ //
+ // Default: not set
+ TClientParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
+ return SetCounters(MakeIntrusive<TClientCounters>(counters));
+ }
+
+ TClientParameters& SetCounters(const TIntrusivePtr<TClientCounters>& counters) {
+ Counters = counters;
+ return *this;
+ }
+
+ public:
+ static const size_t DefaultMaxInflightBytes;
+ static const size_t DefaultGrpcMaxMessageSize;
+ static const TDuration DefaultGrpcSendDelay;
+
+ public:
+ TString Uri;
+ TMaybe<TString> SharedSecretKey;
+ size_t MaxInflightBytes;
+ TLog Log;
+ TMaybe<size_t> LogRateLimitBytes;
+ TDuration GrpcReconnectDelay;
+ TDuration GrpcSendDelay;
+ bool EnableForkSupport;
+ size_t GrpcMaxMessageSize;
+ TIntrusivePtr<TClientCounters> Counters;
+ };
+
+ struct TSessionParameters {
+ TSessionParameters();
+
+ // Session unique identifier.
+ // It's guaranteed that for messages with the same sessionId relative
+ // ordering of the messages will be preserved at all processing stages
+ // in library, in Unified Agent and in other systems that respect ordering (e.g., Logbroker)
+ //
+ // Default: generated automatically by Unified Agent.
+ TSessionParameters& SetSessionId(const TString& sessionId) {
+ SessionId = sessionId;
+ return *this;
+ }
+
+ // Session metadata as key-value set.
+ // Can be used by agent filters and outputs for validation/routing/enrichment/etc.
+ //
+ // Default: not set
+ TSessionParameters& SetMeta(const THashMap<TString, TString>& meta) {
+ Meta = meta;
+ return *this;
+ }
+
+ // Session counters.
+ // Actual provided counters are listed in TClientSessionCounters.
+ //
+ // Default: A single common for all sessions subgroup of client TDynamicCounters instance
+ // with label ('session': 'default').
+ TSessionParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
+ return SetCounters(MakeIntrusive<TClientSessionCounters>(counters));
+ }
+
+ TSessionParameters& SetCounters(const TIntrusivePtr<TClientSessionCounters>& counters) {
+ Counters = counters;
+ return *this;
+ }
+
+ // Max bytes count that have been received by client session but not acknowledged yet.
+ // When exceeded, new messages will be discarded, an error message
+ // will be written to the TLog instance and drop counter will be incremented.
+ //
+ // Default: value from client settings
+ TSessionParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
+ MaxInflightBytes = maxInflightBytes;
+ return *this;
+ }
+
+ public:
+ TMaybe<TString> SessionId;
+ TMaybe<THashMap<TString, TString>> Meta;
+ TIntrusivePtr<TClientSessionCounters> Counters;
+ TMaybe<size_t> MaxInflightBytes;
+ };
+
+ // Message data to be sent to unified agent.
+ struct TClientMessage {
+ // Opaque message payload.
+ TString Payload;
+
+ // Message metadata as key-value set.
+ // Can be used by agent filters and outputs for validation/routing/enrichment/etc.
+ //
+ // Default: not set
+ TMaybe<THashMap<TString, TString>> Meta{};
+
+ // Message timestamp.
+ //
+ // Default: time the client library has received this instance of TClientMessage.
+ TMaybe<TInstant> Timestamp{};
+ };
+
+ // Message size as it is accounted in byte-related metrics (ReceivedBytes, InflightBytes, etc).
+ size_t SizeOf(const TClientMessage& message);
+
+ class IClientSession: public TAtomicRefCount<IClientSession> {
+ public:
+ virtual ~IClientSession() = default;
+
+ // Places the message into send queue. Actual grpc call may occur later asynchronously,
+ // based on settings GrpcSendDelay and GrpcMaxMessageSize.
+ // A message can be discarded if the limits defined by the GrpcMaxMessageSize and MaxInflightBytes
+ // settings are exceeded, or if the Close method has already been called.
+ // In this case an error message will be written to the TLog instance
+ // and drop counter will be incremented.
+ virtual void Send(TClientMessage&& message) = 0;
+
+ void Send(const TClientMessage& message) {
+ Send(TClientMessage(message));
+ }
+
+ // Waits until either all current inflight messages are
+ // acknowledged or the specified deadline is reached.
+ // Upon the deadline grpc connection would be forcefully dropped (via grpc::ClientContext::TryCancel).
+ virtual NThreading::TFuture<void> CloseAsync(TInstant deadline) = 0;
+
+ void Close(TInstant deadline) {
+ CloseAsync(deadline).Wait();
+ }
+
+ void Close(TDuration timeout = TDuration::Seconds(3)) {
+ Close(Now() + timeout);
+ }
+ };
+ using TClientSessionPtr = TIntrusivePtr<IClientSession>;
+
+ class IClient: public TAtomicRefCount<IClient> {
+ public:
+ virtual ~IClient() = default;
+
+ virtual TClientSessionPtr CreateSession(const TSessionParameters& parameters = {}) = 0;
+
+ virtual void StartTracing(ELogPriority) {
+ }
+
+ virtual void FinishTracing() {
+ }
+ };
+ using TClientPtr = TIntrusivePtr<IClient>;
+
+ TClientPtr MakeClient(const TClientParameters& parameters);
+}
diff --git a/library/cpp/unified_agent_client/client_impl.cpp b/library/cpp/unified_agent_client/client_impl.cpp
new file mode 100644
index 0000000000..4db98120fd
--- /dev/null
+++ b/library/cpp/unified_agent_client/client_impl.cpp
@@ -0,0 +1,1274 @@
+#include "client_impl.h"
+#include "helpers.h"
+
+#include <contrib/libs/grpc/include/grpc/grpc.h>
+#include <contrib/libs/grpc/src/core/lib/gpr/string.h>
+#include <contrib/libs/grpc/src/core/lib/gprpp/fork.h>
+#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
+
+#include <util/charset/utf8.h>
+#include <util/generic/size_literals.h>
+#include <util/system/env.h>
+
+using namespace NThreading;
+using namespace NMonitoring;
+
+namespace NUnifiedAgent::NPrivate {
+ std::shared_ptr<grpc::Channel> CreateChannel(const grpc::string& target) {
+ grpc::ChannelArguments args;
+ args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE);
+ args.SetMaxReceiveMessageSize(Max<int>());
+ args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
+ args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000);
+ args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
+ args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 200);
+ args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
+ args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
+ args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
+ args.SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
+ args.SetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE, 1024*1024);
+ return grpc::CreateCustomChannel(target, grpc::InsecureChannelCredentials(), args);
+ }
+
+ void AddMeta(NUnifiedAgentProto::Request_Initialize& init, const TString& name, const TString& value) {
+ auto* metaItem = init.MutableMeta()->Add();
+ metaItem->SetName(name);
+ metaItem->SetValue(value);
+ }
+
+ std::atomic<ui64> TClient::Id{0};
+
+ TClient::TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector)
+ : Parameters(parameters)
+ , ForkProtector(forkProtector)
+ , Counters(parameters.Counters ? parameters.Counters : MakeIntrusive<TClientCounters>())
+ , Log(parameters.Log)
+ , MainLogger(Log, MakeFMaybe(Parameters.LogRateLimitBytes))
+ , Logger(MainLogger.Child(Sprintf("ua_%lu", Id.fetch_add(1))))
+ , Channel(nullptr)
+ , Stub(nullptr)
+ , ActiveCompletionQueue(nullptr)
+ , SessionLogLabel(0)
+ , ActiveSessions()
+ , Started(false)
+ , Destroyed(false)
+ , Lock()
+ {
+ MainLogger.SetDroppedBytesCounter(&Counters->ClientLogDroppedBytes);
+
+ if (ForkProtector != nullptr) {
+ ForkProtector->Register(*this);
+ }
+
+ EnsureStarted();
+
+ YLOG_INFO(Sprintf("created, uri [%s]", Parameters.Uri.c_str()));
+ }
+
+ TClient::~TClient() {
+ with_lock(Lock) {
+ Y_VERIFY(ActiveSessions.empty(), "active sessions found");
+
+ EnsureStoppedNoLock();
+
+ Destroyed = true;
+ }
+
+ if (ForkProtector != nullptr) {
+ ForkProtector->Unregister(*this);
+ }
+
+ YLOG_INFO(Sprintf("destroyed, uri [%s]", Parameters.Uri.c_str()));
+ }
+
+ TClientSessionPtr TClient::CreateSession(const TSessionParameters& parameters) {
+ return MakeIntrusive<TClientSession>(this, parameters);
+ }
+
+ void TClient::StartTracing(ELogPriority logPriority) {
+ MainLogger.StartTracing(logPriority);
+ StartGrpcTracing();
+ YLOG_INFO("tracing started");
+ }
+
+ void TClient::FinishTracing() {
+ FinishGrpcTracing();
+ MainLogger.FinishTracing();
+ YLOG_INFO("tracing finished");
+ }
+
+ void TClient::RegisterSession(TClientSession* session) {
+ with_lock(Lock) {
+ ActiveSessions.push_back(session);
+ }
+ }
+
+ void TClient::UnregisterSession(TClientSession* session) {
+ with_lock(Lock) {
+ const auto it = Find(ActiveSessions, session);
+ Y_VERIFY(it != ActiveSessions.end());
+ ActiveSessions.erase(it);
+ }
+ }
+
+ void TClient::PreFork() {
+ YLOG_INFO("pre fork started");
+
+ Lock.Acquire();
+
+ auto futures = TVector<TFuture<void>>(Reserve(ActiveSessions.size()));
+ for (auto* s: ActiveSessions) {
+ futures.push_back(s->PreFork());
+ }
+ YLOG_INFO("waiting for sessions");
+ WaitAll(futures).Wait();
+
+ EnsureStoppedNoLock();
+
+ YLOG_INFO("shutdown grpc executor");
+ grpc_core::Executor::SetThreadingAll(false);
+
+ YLOG_INFO("pre fork finished");
+ }
+
+ void TClient::PostForkParent() {
+ YLOG_INFO("post fork parent started");
+
+ if (!Destroyed) {
+ EnsureStartedNoLock();
+ }
+ Lock.Release();
+
+ for (auto* s: ActiveSessions) {
+ s->PostForkParent();
+ }
+
+ YLOG_INFO("post fork parent finished");
+ }
+
+ void TClient::PostForkChild() {
+ YLOG_INFO("post fork child started");
+
+ Lock.Release();
+
+ for (auto* s: ActiveSessions) {
+ s->PostForkChild();
+ }
+
+ YLOG_INFO("post fork child finished");
+ }
+
+ void TClient::EnsureStarted() {
+ with_lock(Lock) {
+ EnsureStartedNoLock();
+ }
+ }
+
+ void TClient::EnsureStartedNoLock() {
+ // Lock must be held
+
+ if (Started) {
+ return;
+ }
+
+ Channel = CreateChannel(Parameters.Uri);
+ Stub = NUnifiedAgentProto::UnifiedAgentService::NewStub(Channel);
+ ActiveCompletionQueue = MakeHolder<TGrpcCompletionQueueHost>();
+ ActiveCompletionQueue->Start();
+
+ Started = true;
+ }
+
+ void TClient::EnsureStoppedNoLock() {
+ // Lock must be held
+
+ if (!Started) {
+ return;
+ }
+
+ YLOG_INFO("stopping");
+ ActiveCompletionQueue->Stop();
+ ActiveCompletionQueue = nullptr;
+ Stub = nullptr;
+ Channel = nullptr;
+ YLOG_INFO("stopped");
+
+ Started = false;
+ }
+
+ TScopeLogger TClient::CreateSessionLogger() {
+ return Logger.Child(ToString(SessionLogLabel.fetch_add(1)));
+ }
+
+ TForkProtector::TForkProtector()
+ : Clients()
+ , GrpcInitializer()
+ , Enabled(grpc_core::Fork::Enabled())
+ , Lock()
+ {
+ }
+
+ void TForkProtector::Register(TClient& client) {
+ if (!Enabled) {
+ return;
+ }
+
+ Y_VERIFY(grpc_is_initialized());
+ Y_VERIFY(grpc_core::Fork::Enabled());
+
+ with_lock(Lock) {
+ Clients.push_back(&client);
+ }
+ }
+
+ void TForkProtector::Unregister(TClient& client) {
+ if (!Enabled) {
+ return;
+ }
+
+ with_lock(Lock) {
+ const auto it = Find(Clients, &client);
+ Y_VERIFY(it != Clients.end());
+ Clients.erase(it);
+ }
+ }
+
+ std::shared_ptr<TForkProtector> TForkProtector::Get(bool createIfNotExists) {
+ with_lock(InstanceLock) {
+ auto result = Instance.lock();
+ if (!result && createIfNotExists) {
+ SetEnv("GRPC_ENABLE_FORK_SUPPORT", "true");
+ result = std::make_shared<TForkProtector>();
+ if (!result->Enabled) {
+ TLog log("cerr");
+ TLogger logger(log, Nothing());
+ auto scopeLogger = logger.Child("ua client");
+ YLOG(TLOG_WARNING,
+ "Grpc is already initialized, can't enable fork support. "
+ "If forks are possible, please set environment variable GRPC_ENABLE_FORK_SUPPORT to 'true'. "
+ "If not, you can suppress this warning by setting EnableForkSupport "
+ "to false when creating the ua client.",
+ scopeLogger);
+ } else if (!SubscribedToForks) {
+ SubscribedToForks = true;
+ #ifdef _unix_
+ pthread_atfork(
+ &TForkProtector::PreFork,
+ &TForkProtector::PostForkParent,
+ &TForkProtector::PostForkChild);
+ #endif
+ }
+
+ Instance = result;
+ }
+ return result;
+ }
+ }
+
+ void TForkProtector::PreFork() {
+ auto self = Get(false);
+ if (!self) {
+ return;
+ }
+ self->Lock.Acquire();
+ for (auto* c : self->Clients) {
+ c->PreFork();
+ }
+ }
+
+ void TForkProtector::PostForkParent() {
+ auto self = Get(false);
+ if (!self) {
+ return;
+ }
+ for (auto* c : self->Clients) {
+ c->PostForkParent();
+ }
+ self->Lock.Release();
+ }
+
+ void TForkProtector::PostForkChild() {
+ auto self = Get(false);
+ if (!self) {
+ return;
+ }
+ for (auto* c : self->Clients) {
+ c->PostForkChild();
+ }
+ self->Lock.Release();
+ }
+
+ std::weak_ptr<TForkProtector> TForkProtector::Instance{};
+ TMutex TForkProtector::InstanceLock{};
+ bool TForkProtector::SubscribedToForks{false};
+
+ TClientSession::TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters)
+ : AsyncJoiner()
+ , Client(client)
+ , OriginalSessionId(MakeFMaybe(parameters.SessionId))
+ , SessionId(OriginalSessionId)
+ , Meta(MakeFMaybe(parameters.Meta))
+ , Logger(Client->CreateSessionLogger())
+ , CloseStarted(false)
+ , ForcedCloseStarted(false)
+ , Closed(false)
+ , ForkInProgressLocal(false)
+ , Started(false)
+ , ClosePromise()
+ , ActiveGrpcCall(nullptr)
+ , WriteQueue()
+ , TrimmedCount(0)
+ , NextIndex(0)
+ , AckSeqNo(Nothing())
+ , PollerLastEventTimestamp()
+ , Counters(parameters.Counters ? parameters.Counters : Client->GetCounters()->GetDefaultSessionCounters())
+ , MakeGrpcCallTimer(nullptr)
+ , ForceCloseTimer(nullptr)
+ , PollTimer(nullptr)
+ , GrpcInflightMessages(0)
+ , GrpcInflightBytes(0)
+ , InflightBytes(0)
+ , CloseRequested(false)
+ , EventsBatchSize(0)
+ , PollingStatus(EPollingStatus::Inactive)
+ , EventNotification(nullptr)
+ , EventNotificationTriggered(false)
+ , EventsBatch()
+ , SecondaryEventsBatch()
+ , ForkInProgress(false)
+ , Lock()
+ , MaxInflightBytes(
+ parameters.MaxInflightBytes.GetOrElse(Client->GetParameters().MaxInflightBytes))
+ , AgentMaxReceiveMessage(Nothing()) {
+ if (Meta.Defined() && !IsUtf8(*Meta)) {
+ throw std::runtime_error("session meta contains non UTF-8 characters");
+ }
+ Y_ENSURE(!(Client->GetParameters().EnableForkSupport && SessionId.Defined()),
+ "explicit session id is not supported with forks");
+ Client->RegisterSession(this);
+
+ with_lock(Lock) {
+ DoStart();
+ }
+ }
+
+ TFuture<void> TClientSession::PreFork() {
+ YLOG_INFO("pre fork started");
+
+ Lock.Acquire();
+
+ YLOG_INFO("triggering event notification");
+ if (!EventNotificationTriggered) {
+ EventNotificationTriggered = true;
+ EventNotification->Trigger();
+ }
+
+ YLOG_INFO("setting 'fork in progress' flag");
+ ForkInProgress.store(true);
+
+ if (!Started) {
+ ClosePromise.TrySetValue();
+ }
+ YLOG_INFO("pre fork finished");
+ return ClosePromise.GetFuture();
+ }
+
+ void TClientSession::PostForkParent() {
+ YLOG_INFO("post fork parent started");
+ ForkInProgress.store(false);
+ ForkInProgressLocal = false;
+ Started = false;
+
+ if (!CloseRequested) {
+ DoStart();
+
+ YLOG_INFO("triggering event notification");
+ EventNotificationTriggered = true;
+ EventNotification->Trigger();
+ }
+
+ Lock.Release();
+
+ YLOG_INFO("post fork parent finished");
+ }
+
+ void TClientSession::PostForkChild() {
+ YLOG_INFO("post fork child started");
+ ForkInProgress.store(false);
+ ForkInProgressLocal = false;
+ Started = false;
+
+ SessionId.Clear();
+ TrimmedCount = 0;
+ NextIndex = 0;
+ AckSeqNo.Clear();
+ PurgeWriteQueue();
+ EventsBatch.clear();
+ SecondaryEventsBatch.clear();
+ EventsBatchSize = 0;
+
+ Lock.Release();
+
+ YLOG_INFO("post fork child finished");
+ }
+
+ void TClientSession::SetAgentMaxReceiveMessage(size_t newValue) {
+ AgentMaxReceiveMessage = newValue;
+ }
+
+ void TClientSession::DoStart() {
+ // Lock must be held
+
+ Y_VERIFY(!Started);
+ YLOG_INFO("starting");
+
+ Client->EnsureStarted();
+
+ MakeGrpcCallTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
+ MakeIOCallback([this](EIOStatus status) {
+ if (status == EIOStatus::Error) {
+ return;
+ }
+ MakeGrpcCall();
+ }, &AsyncJoiner));
+ ForceCloseTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
+ MakeIOCallback([this](EIOStatus status) {
+ if (status == EIOStatus::Error) {
+ return;
+ }
+ YLOG_INFO("ForceCloseTimer");
+ BeginClose(TInstant::Zero());
+ }, &AsyncJoiner));
+ PollTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
+ MakeIOCallback([this](EIOStatus status) {
+ if (status == EIOStatus::Error) {
+ return;
+ }
+ Poll();
+ }, &AsyncJoiner));
+ EventNotification = MakeHolder<TGrpcNotification>(Client->GetCompletionQueue(),
+ MakeIOCallback([this](EIOStatus status) {
+ Y_VERIFY(status == EIOStatus::Ok);
+ Poll();
+ }, &AsyncJoiner));
+
+ CloseStarted = false;
+ ForcedCloseStarted = false;
+ Closed = false;
+ ClosePromise = NewPromise();
+ EventNotificationTriggered = false;
+ PollerLastEventTimestamp = Now();
+ PollingStatus = EPollingStatus::Inactive;
+
+ ++Client->GetCounters()->ActiveSessionsCount;
+ MakeGrpcCallTimer->Set(Now());
+ YLOG_INFO(Sprintf("started, sessionId [%s]", OriginalSessionId.GetOrElse("").c_str()));
+
+ Started = true;
+ }
+
+ void TClientSession::MakeGrpcCall() {
+ if (Closed) {
+ YLOG_INFO("MakeGrpcCall, session already closed");
+ return;
+ }
+ Y_VERIFY(!ForcedCloseStarted);
+ Y_VERIFY(!ActiveGrpcCall);
+ ActiveGrpcCall = MakeIntrusive<TGrpcCall>(*this);
+ ActiveGrpcCall->Start();
+ ++Counters->GrpcCalls;
+ if (CloseStarted) {
+ ActiveGrpcCall->BeginClose(false);
+ }
+ }
+
+ TClientSession::~TClientSession() {
+ Close(TInstant::Zero());
+ AsyncJoiner.Join().Wait();
+ Client->UnregisterSession(this);
+ YLOG_INFO("destroyed");
+ }
+
+ void TClientSession::Send(TClientMessage&& message) {
+ const auto messageSize = SizeOf(message);
+ ++Counters->ReceivedMessages;
+ Counters->ReceivedBytes += messageSize;
+ if (messageSize > Client->GetParameters().GrpcMaxMessageSize) {
+ YLOG_ERR(Sprintf("message size [%lu] is greater than max grpc message size [%lu], message dropped",
+ messageSize, Client->GetParameters().GrpcMaxMessageSize));
+ ++Counters->DroppedMessages;
+ Counters->DroppedBytes += messageSize;
+ ++Counters->ErrorsCount;
+ return;
+ }
+ if (message.Meta.Defined() && !IsUtf8(*message.Meta)) {
+ YLOG_ERR("message meta contains non UTF-8 characters, message dropped");
+ ++Counters->DroppedMessages;
+ Counters->DroppedBytes += messageSize;
+ ++Counters->ErrorsCount;
+ return;
+ }
+ if (!message.Timestamp.Defined()) {
+ message.Timestamp = TInstant::Now();
+ }
+ ++Counters->InflightMessages;
+ Counters->InflightBytes += messageSize;
+ {
+ auto g = Guard(Lock);
+
+ if (!Started) {
+ DoStart();
+ }
+
+ if (CloseRequested) {
+ g.Release();
+ YLOG_ERR(Sprintf("session is closing, message dropped, [%lu] bytes", messageSize));
+ --Counters->InflightMessages;
+ Counters->InflightBytes -= messageSize;
+ ++Counters->DroppedMessages;
+ Counters->DroppedBytes += messageSize;
+ ++Counters->ErrorsCount;
+ return;
+ }
+ if (InflightBytes.load() + messageSize > MaxInflightBytes) {
+ g.Release();
+ YLOG_ERR(Sprintf("max inflight of [%lu] bytes reached, [%lu] bytes dropped",
+ MaxInflightBytes, messageSize));
+ --Counters->InflightMessages;
+ Counters->InflightBytes -= messageSize;
+ ++Counters->DroppedMessages;
+ Counters->DroppedBytes += messageSize;
+ ++Counters->ErrorsCount;
+ return;
+ }
+ InflightBytes.fetch_add(messageSize);
+ EventsBatch.push_back(TMessageReceivedEvent{std::move(message), messageSize});
+ EventsBatchSize += messageSize;
+ if ((PollingStatus == EPollingStatus::Inactive ||
+ EventsBatchSize >= Client->GetParameters().GrpcMaxMessageSize) &&
+ !EventNotificationTriggered)
+ {
+ EventNotificationTriggered = true;
+ EventNotification->Trigger();
+ }
+ }
+ }
+
+ TFuture<void> TClientSession::CloseAsync(TInstant deadline) {
+ YLOG_INFO(Sprintf("close, deadline [%s]", ToString(deadline).c_str()));
+ if (!ClosePromise.GetFuture().HasValue()) {
+ with_lock(Lock) {
+ if (!Started) {
+ return MakeFuture();
+ }
+
+ CloseRequested = true;
+
+ EventsBatch.push_back(TCloseRequestedEvent{deadline});
+ if (!EventNotificationTriggered) {
+ EventNotificationTriggered = true;
+ EventNotification->Trigger();
+ }
+ }
+ }
+ return ClosePromise.GetFuture();
+ }
+
+ void TClientSession::BeginClose(TInstant deadline) {
+ if (Closed) {
+ return;
+ }
+ if (!CloseStarted) {
+ CloseStarted = true;
+ YLOG_INFO("close started");
+ }
+ const auto force = deadline == TInstant::Zero();
+ if (force && !ForcedCloseStarted) {
+ ForcedCloseStarted = true;
+ YLOG_INFO("forced close started");
+ }
+ if (!ActiveGrpcCall && (ForcedCloseStarted || WriteQueue.empty())) {
+ DoClose();
+ } else {
+ if (!force) {
+ ForceCloseTimer->Set(deadline);
+ }
+ if (ActiveGrpcCall) {
+ ActiveGrpcCall->BeginClose(ForcedCloseStarted);
+ }
+ }
+ }
+
+ void TClientSession::Poll() {
+ if (ForkInProgressLocal) {
+ return;
+ }
+
+ const auto now = Now();
+ const auto sendDelay = Client->GetParameters().GrpcSendDelay;
+ const auto oldPollingStatus = PollingStatus;
+
+ {
+ if (!Lock.TryAcquire()) {
+ TSpinWait sw;
+
+ while (Lock.IsLocked() || !Lock.TryAcquire()) {
+ if (ForkInProgress.load()) {
+ YLOG_INFO("poller 'fork in progress' signal received, stopping session");
+ ForkInProgressLocal = true;
+ if (!ActiveGrpcCall || !ActiveGrpcCall->Initialized()) {
+ BeginClose(TInstant::Max());
+ } else if (ActiveGrpcCall->ReuseSessions()) {
+ ActiveGrpcCall->Poison();
+ BeginClose(TInstant::Max());
+ } else {
+ BeginClose(TInstant::Zero());
+ }
+ return;
+ }
+ sw.Sleep();
+ }
+ }
+
+ if (!EventsBatch.empty()) {
+ DoSwap(EventsBatch, SecondaryEventsBatch);
+ EventsBatchSize = 0;
+ PollerLastEventTimestamp = now;
+ }
+ const auto needNextPollStep = sendDelay != TDuration::Zero() &&
+ !CloseRequested &&
+ (now - PollerLastEventTimestamp) < 10 * sendDelay;
+ PollingStatus = needNextPollStep ? EPollingStatus::Active : EPollingStatus::Inactive;
+ EventNotificationTriggered = false;
+
+ Lock.Release();
+ }
+
+ if (PollingStatus == EPollingStatus::Active) {
+ PollTimer->Set(now + sendDelay);
+ }
+ if (PollingStatus != oldPollingStatus) {
+ YLOG_DEBUG(Sprintf("poller %s", PollingStatus == EPollingStatus::Active ? "started" : "stopped"));
+ }
+ if (auto& batch = SecondaryEventsBatch; !batch.empty()) {
+ auto closeIt = FindIf(batch, [](const auto& e) {
+ return std::holds_alternative<TCloseRequestedEvent>(e);
+ });
+
+ if (auto it = begin(batch); it != closeIt) {
+ Y_VERIFY(!CloseStarted);
+ do {
+ auto& e = std::get<TMessageReceivedEvent>(*it++);
+ WriteQueue.push_back({std::move(e.Message), e.Size, false});
+ } while (it != closeIt);
+ if (ActiveGrpcCall) {
+ ActiveGrpcCall->NotifyMessageAdded();
+ }
+ }
+
+ for (auto endIt = end(batch); closeIt != endIt; ++closeIt) {
+ const auto& e = std::get<TCloseRequestedEvent>(*closeIt);
+ BeginClose(e.Deadline);
+ }
+
+ batch.clear();
+ }
+ };
+
+ void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) {
+ auto& initializeMessage = *target.MutableInitialize();
+ if (SessionId.Defined()) {
+ initializeMessage.SetSessionId(*SessionId);
+ }
+ if (Client->GetParameters().SharedSecretKey.Defined()) {
+ initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey);
+ }
+ if (Meta.Defined()) {
+ for (const auto& p: *Meta) {
+ AddMeta(initializeMessage, p.first, p.second);
+ }
+ }
+ if (!Meta.Defined() || Meta->find("_reusable") == Meta->end()) {
+ AddMeta(initializeMessage, "_reusable", "true");
+ }
+ }
+
+ TClientSession::TRequestBuilder::TRequestBuilder(NUnifiedAgentProto::Request& target, size_t RequestPayloadLimitBytes,
+ TFMaybe<size_t> serializedRequestLimitBytes)
+ : Target(target)
+ , PwTarget(MakeFMaybe<NPW::TRequest>())
+ , MetaItems()
+ , RequestPayloadSize(0)
+ , RequestPayloadLimitBytes(RequestPayloadLimitBytes)
+ , SerializedRequestSize(0)
+ , SerializedRequestLimitBytes(serializedRequestLimitBytes)
+ , CountersInvalid(false)
+ {
+ }
+
+ void TClientSession::TRequestBuilder::ResetCounters() {
+ RequestPayloadSize = 0;
+ SerializedRequestSize = 0;
+ PwTarget.Clear();
+ PwTarget.ConstructInPlace();
+ CountersInvalid = false;
+ }
+
+ TClientSession::TRequestBuilder::TAddResult TClientSession::TRequestBuilder::TryAddMessage(
+ const TPendingMessage& message, size_t seqNo) {
+ Y_VERIFY(!CountersInvalid);
+ {
+ // add item to pwRequest to increase calculated size
+ PwTarget->DataBatch.SeqNo.Add(seqNo);
+ PwTarget->DataBatch.Timestamp.Add(message.Message.Timestamp->MicroSeconds());
+ PwTarget->DataBatch.Payload.Add().SetValue(message.Message.Payload);
+ if (message.Message.Meta.Defined()) {
+ for (const auto &m: *message.Message.Meta) {
+ TMetaItemBuilder *metaItemBuilder = nullptr;
+ {
+ auto it = MetaItems.find(m.first);
+ if (it == MetaItems.end()) {
+ PwTarget->DataBatch.Meta.Add().Key.SetValue(m.first);
+ } else {
+ metaItemBuilder = &it->second;
+ }
+ }
+ size_t metaItemIdx = (metaItemBuilder != nullptr) ? metaItemBuilder->ItemIndex :
+ PwTarget->DataBatch.Meta.GetSize() - 1;
+ auto &pwMetaItem = PwTarget->DataBatch.Meta.Get(metaItemIdx);
+ pwMetaItem.Value.Add().SetValue(m.second);
+ const auto index = Target.GetDataBatch().SeqNoSize();
+ if ((metaItemBuilder != nullptr && metaItemBuilder->ValueIndex != index) ||
+ (metaItemBuilder == nullptr && index != 0)) {
+ const auto valueIdx = (metaItemBuilder) ? metaItemBuilder->ValueIndex : 0;
+ pwMetaItem.SkipStart.Add(valueIdx);
+ pwMetaItem.SkipLength.Add(index - valueIdx);
+ }
+ }
+ }
+ }
+ const auto newSerializedRequestSize = PwTarget->ByteSizeLong();
+ const auto newPayloadSize = RequestPayloadSize + message.Size;
+ if ((SerializedRequestLimitBytes.Defined() && newSerializedRequestSize > *SerializedRequestLimitBytes) ||
+ newPayloadSize > RequestPayloadLimitBytes) {
+ CountersInvalid = true;
+ return {true, newPayloadSize, newSerializedRequestSize};
+ }
+
+ {
+ // add item to the real request
+ auto& batch = *Target.MutableDataBatch();
+ batch.AddSeqNo(seqNo);
+ batch.AddTimestamp(message.Message.Timestamp->MicroSeconds());
+ batch.AddPayload(message.Message.Payload);
+ if (message.Message.Meta.Defined()) {
+ for (const auto &m: *message.Message.Meta) {
+ TMetaItemBuilder *metaItemBuilder;
+ {
+ auto it = MetaItems.find(m.first);
+ if (it == MetaItems.end()) {
+ batch.AddMeta()->SetKey(m.first);
+ auto insertResult = MetaItems.insert({m.first, {batch.MetaSize() - 1}});
+ Y_VERIFY(insertResult.second);
+ metaItemBuilder = &insertResult.first->second;
+ } else {
+ metaItemBuilder = &it->second;
+ }
+ }
+ auto *metaItem = batch.MutableMeta(metaItemBuilder->ItemIndex);
+ metaItem->AddValue(m.second);
+ const auto index = batch.SeqNoSize() - 1;
+ if (metaItemBuilder->ValueIndex != index) {
+ metaItem->AddSkipStart(metaItemBuilder->ValueIndex);
+ metaItem->AddSkipLength(index - metaItemBuilder->ValueIndex);
+ }
+ metaItemBuilder->ValueIndex = index + 1;
+ }
+ }
+ SerializedRequestSize = newSerializedRequestSize;
+ RequestPayloadSize = newPayloadSize;
+ }
+
+ return {false, newPayloadSize, newSerializedRequestSize};
+ }
+
+ void TClientSession::PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target) {
+ Y_VERIFY(AckSeqNo.Defined());
+ TRequestBuilder requestBuilder(target, Client->GetParameters().GrpcMaxMessageSize, AgentMaxReceiveMessage);
+ const auto startIndex = NextIndex - TrimmedCount;
+ for (size_t i = startIndex; i < WriteQueue.size(); ++i) {
+ auto& queueItem = WriteQueue[i];
+ if (queueItem.Skipped) {
+ NextIndex++;
+ continue;
+ }
+
+ const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1);
+ const auto serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0;
+ if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) {
+ YLOG_ERR(Sprintf("single serialized message is too large [%lu] > [%lu], dropping it",
+ addResult.NewSerializedRequestSize, serializedLimitToLog));
+ queueItem.Skipped = true;
+ ++Counters->DroppedMessages;
+ Counters->DroppedBytes += queueItem.Size;
+ ++Counters->ErrorsCount;
+ NextIndex++;
+ requestBuilder.ResetCounters();
+ continue;
+ }
+ if (addResult.LimitExceeded) {
+ YLOG_DEBUG(Sprintf(
+ "batch limit exceeded: [%lu] > [%lu] (limit for serialized batch)"
+ "OR [%lu] > [%lu] (limit for raw batch)",
+ addResult.NewSerializedRequestSize, serializedLimitToLog,
+ addResult.NewRequestPayloadSize, Client->GetParameters().GrpcMaxMessageSize));
+ break;
+ }
+
+ NextIndex++;
+ }
+ const auto messagesCount = target.GetDataBatch().SeqNoSize();
+ if (messagesCount == 0) {
+ return;
+ }
+ Y_VERIFY(requestBuilder.GetSerializedRequestSize() == target.ByteSizeLong(),
+ "failed to calculate size for message [%s]", target.ShortDebugString().c_str());
+ GrpcInflightMessages += messagesCount;
+ GrpcInflightBytes += requestBuilder.GetRequestPayloadSize();
+ YLOG_DEBUG(Sprintf("new write batch, [%lu] messages, [%lu] bytes, first seq_no [%lu], serialized size [%lu]",
+ messagesCount, requestBuilder.GetRequestPayloadSize(),
+ *target.GetDataBatch().GetSeqNo().begin(), requestBuilder.GetSerializedRequestSize()));
+ ++Counters->GrpcWriteBatchRequests;
+ Counters->GrpcInflightMessages += messagesCount;
+ Counters->GrpcInflightBytes += requestBuilder.GetRequestPayloadSize();
+ }
+
+ void TClientSession::Acknowledge(ui64 seqNo) {
+ size_t messagesCount = 0;
+ size_t bytesCount = 0;
+ size_t skippedMessagesCount = 0;
+ size_t skippedBytesCount = 0;
+
+ if (AckSeqNo.Defined()) {
+ while (!WriteQueue.empty() && ((*AckSeqNo < seqNo) || WriteQueue.front().Skipped)) {
+ if (WriteQueue.front().Skipped) {
+ skippedMessagesCount++;
+ skippedBytesCount += WriteQueue.front().Size;
+ } else {
+ ++messagesCount;
+ bytesCount += WriteQueue.front().Size;
+ }
+ ++(*AckSeqNo);
+ WriteQueue.pop_front();
+ ++TrimmedCount;
+ }
+ }
+ if (!AckSeqNo.Defined() || seqNo > *AckSeqNo) {
+ AckSeqNo = seqNo;
+ }
+
+ Counters->AcknowledgedMessages += messagesCount;
+ Counters->AcknowledgedBytes += bytesCount;
+ Counters->InflightMessages -= (messagesCount + skippedMessagesCount);
+ Counters->InflightBytes -= (bytesCount + skippedBytesCount);
+ InflightBytes.fetch_sub(bytesCount);
+ Counters->GrpcInflightMessages -= messagesCount;
+ Counters->GrpcInflightBytes -= bytesCount;
+ GrpcInflightMessages -= messagesCount;
+ GrpcInflightBytes -= bytesCount;
+
+ YLOG_DEBUG(Sprintf("ack [%lu], [%lu] messages, [%lu] bytes", seqNo, messagesCount, bytesCount));
+ }
+
+ void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) {
+ SessionId = sessionId;
+ Acknowledge(lastSeqNo);
+ NextIndex = TrimmedCount;
+ ++Counters->GrpcCallsInitialized;
+ Counters->GrpcInflightMessages -= GrpcInflightMessages;
+ Counters->GrpcInflightBytes -= GrpcInflightBytes;
+ GrpcInflightMessages = 0;
+ GrpcInflightBytes = 0;
+ YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%lu]",
+ sessionId.c_str(), lastSeqNo));
+ }
+
+ void TClientSession::OnGrpcCallFinished() {
+ Y_VERIFY(!Closed);
+ Y_VERIFY(ActiveGrpcCall);
+ ActiveGrpcCall = nullptr;
+ if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) {
+ DoClose();
+ } else {
+ const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay;
+ MakeGrpcCallTimer->Set(reconnectTime);
+ YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str()));
+ }
+ }
+
+ auto TClientSession::PurgeWriteQueue() -> TPurgeWriteQueueStats {
+ size_t bytesCount = 0;
+ for (const auto& m: WriteQueue) {
+ bytesCount += m.Size;
+ }
+ auto result = TPurgeWriteQueueStats{WriteQueue.size(), bytesCount};
+
+ Counters->DroppedMessages += WriteQueue.size();
+ Counters->DroppedBytes += bytesCount;
+ Counters->InflightMessages -= WriteQueue.size();
+ Counters->InflightBytes -= bytesCount;
+ Counters->GrpcInflightMessages -= GrpcInflightMessages;
+ Counters->GrpcInflightBytes -= GrpcInflightBytes;
+
+ InflightBytes.fetch_sub(bytesCount);
+ GrpcInflightMessages = 0;
+ GrpcInflightBytes = 0;
+ WriteQueue.clear();
+
+ return result;
+ }
+
+ void TClientSession::DoClose() {
+ Y_VERIFY(CloseStarted);
+ Y_VERIFY(!Closed);
+ Y_VERIFY(!ClosePromise.HasValue());
+ MakeGrpcCallTimer->Cancel();
+ ForceCloseTimer->Cancel();
+ PollTimer->Cancel();
+ if (!ForkInProgressLocal && WriteQueue.size() > 0) {
+ const auto stats = PurgeWriteQueue();
+ ++Counters->ErrorsCount;
+ YLOG_ERR(Sprintf("DoClose, dropped [%lu] messages, [%lu] bytes",
+ stats.PurgedMessages, stats.PurgedBytes));
+ }
+ --Client->GetCounters()->ActiveSessionsCount;
+ Closed = true;
+ ClosePromise.SetValue();
+ YLOG_INFO("session closed");
+ }
+
+ TGrpcCall::TGrpcCall(TClientSession& session)
+ : Session(session)
+ , AsyncJoinerToken(&Session.GetAsyncJoiner())
+ , AcceptTag(MakeIOCallback(this, &TGrpcCall::EndAccept))
+ , ReadTag(MakeIOCallback(this, &TGrpcCall::EndRead))
+ , WriteTag(MakeIOCallback(this, &TGrpcCall::EndWrite))
+ , WritesDoneTag(MakeIOCallback(this, &TGrpcCall::EndWritesDone))
+ , FinishTag(MakeIOCallback(this, &TGrpcCall::EndFinish))
+ , Logger(session.GetLogger().Child("grpc"))
+ , AcceptPending(false)
+ , Initialized_(false)
+ , ReadPending(false)
+ , ReadsDone(false)
+ , WritePending(false)
+ , WritesBlocked(false)
+ , WritesDonePending(false)
+ , WritesDone(false)
+ , ErrorOccured(false)
+ , FinishRequested(false)
+ , FinishStarted(false)
+ , FinishDone(false)
+ , Cancelled(false)
+ , Poisoned(false)
+ , PoisonPillSent(false)
+ , ReuseSessions_(false)
+ , FinishStatus()
+ , ClientContext()
+ , ReaderWriter(nullptr)
+ , Request()
+ , Response()
+ {
+ }
+
+ void TGrpcCall::Start() {
+ AcceptPending = true;
+ auto& client = Session.GetClient();
+ ReaderWriter = client.GetStub().AsyncSession(&ClientContext,
+ &client.GetCompletionQueue(),
+ AcceptTag->Ref());
+ YLOG_INFO("AsyncSession started");
+ }
+
+ TGrpcCall::~TGrpcCall() {
+ YLOG_INFO("destroyed");
+ }
+
+ void TGrpcCall::EnsureFinishStarted() {
+ if (!FinishStarted) {
+ FinishStarted = true;
+ ReaderWriter->Finish(&FinishStatus, FinishTag->Ref());
+ YLOG_INFO("Finish started");
+ }
+ }
+
+ bool TGrpcCall::CheckHasError(EIOStatus status, const char* method) {
+ if (status == EIOStatus::Error) {
+ SetError(Sprintf("%s %s", method, ToString(status).c_str()));
+ return true;
+ }
+ if (ErrorOccured) {
+ ScheduleFinishOnError();
+ return true;
+ }
+ return false;
+ }
+
+ void TGrpcCall::SetError(const TString& error) {
+ if (!Cancelled) {
+ YLOG_ERR(error);
+ ++Session.GetCounters().ErrorsCount;
+ }
+ ErrorOccured = true;
+ ScheduleFinishOnError();
+ }
+
+ void TGrpcCall::ScheduleFinishOnError() {
+ if (!AcceptPending && !WritePending && !WritesDonePending) {
+ EnsureFinishStarted();
+ }
+ }
+
+ void TGrpcCall::BeginClose(bool force) {
+ if (force) {
+ if (!Cancelled) {
+ Cancelled = true;
+ ClientContext.TryCancel();
+ SetError("forced close");
+ }
+ return;
+ }
+ YLOG_INFO(Sprintf("Close Initialized [%d], AcceptPending [%d], "
+ "WritePending [%d], FinishRequested [%d], "
+ "ErrorOccured [%d]",
+ static_cast<int>(Initialized_),
+ static_cast<int>(AcceptPending),
+ static_cast<int>(WritePending),
+ static_cast<int>(FinishRequested),
+ static_cast<int>(ErrorOccured)));
+ if (ErrorOccured || FinishRequested) {
+ return;
+ }
+ FinishRequested = true;
+ if (!Initialized_ || WritePending) {
+ return;
+ }
+ WritesBlocked = true;
+ BeginWritesDone();
+ }
+
+ void TGrpcCall::Poison() {
+ Poisoned = true;
+ NotifyMessageAdded();
+ }
+
+ void TGrpcCall::NotifyMessageAdded() {
+ if (WritePending || !Initialized_ || ErrorOccured || FinishRequested) {
+ return;
+ }
+ ScheduleWrite();
+ }
+
+ void TGrpcCall::ScheduleWrite() {
+ Request.Clear();
+ if (!Poisoned) {
+ Session.PrepareWriteBatchRequest(Request);
+ } else if (!PoisonPillSent) {
+ PoisonPillSent = true;
+ auto& batch = *Request.mutable_data_batch();
+ batch.AddSeqNo(std::numeric_limits<::google::protobuf::uint64>::max());
+ batch.AddTimestamp(Now().MicroSeconds());
+ batch.AddPayload("");
+ YLOG_INFO("poison pill sent");
+ }
+ if (Request.GetDataBatch().GetSeqNo().empty()) {
+ if (FinishRequested) {
+ WritesBlocked = true;
+ BeginWritesDone();
+ }
+ return;
+ }
+
+ BeginWrite();
+ }
+
+ void TGrpcCall::EndAccept(EIOStatus status) {
+ Y_VERIFY(AcceptPending);
+ AcceptPending = false;
+ if (CheckHasError(status, "EndAccept")) {
+ return;
+ }
+ BeginRead();
+ Request.Clear();
+ Session.PrepareInitializeRequest(Request);
+ BeginWrite();
+ }
+
+ void TGrpcCall::EndRead(EIOStatus status) {
+ ReadPending = false;
+ if (FinishDone) {
+ Session.OnGrpcCallFinished();
+ return;
+ }
+ if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) {
+ Y_VERIFY(!WritePending);
+ YLOG_INFO("EndRead ReadsDone");
+ ReadsDone = true;
+ if (WritesDone) {
+ EnsureFinishStarted();
+ return;
+ }
+ return;
+ }
+ if (CheckHasError(status, "EndRead")) {
+ return;
+ }
+ if (!Initialized_) {
+ const auto metadata = ClientContext.GetServerInitialMetadata();
+ {
+ const auto it = metadata.find("ua-reuse-sessions");
+ if (it != metadata.end() && it->second == "true") {
+ ReuseSessions_ = true;
+ }
+ }
+ {
+ const auto it = metadata.find("ua-max-receive-message-size");
+ if (it != metadata.end()) {
+ Session.SetAgentMaxReceiveMessage(FromString<size_t>(TString{it->second.begin(), it->second.end()}));
+ }
+ }
+
+ if (Response.response_case() != NUnifiedAgentProto::Response::kInitialized) {
+ SetError(Sprintf("EndRead while initializing, unexpected response_case [%d]",
+ static_cast<int>(Response.response_case())));
+ return;
+ }
+ Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(),
+ Response.GetInitialized().GetLastSeqNo());
+ Initialized_ = true;
+ if (!WritePending) {
+ ScheduleWrite();
+ }
+ } else {
+ if (Response.response_case() != NUnifiedAgentProto::Response::kAck) {
+ SetError(Sprintf("EndRead unexpected response_case [%d]",
+ static_cast<int>(Response.response_case())));
+ return;
+ }
+ Session.Acknowledge(Response.GetAck().GetSeqNo());
+ }
+ BeginRead();
+ }
+
+ void TGrpcCall::EndWrite(EIOStatus status) {
+ WritePending = false;
+ if (CheckHasError(status, "EndWrite")) {
+ return;
+ }
+ if (!Initialized_) {
+ return;
+ }
+ ScheduleWrite();
+ }
+
+ void TGrpcCall::EndFinish(EIOStatus status) {
+ FinishDone = true;
+ const auto finishStatus = status == EIOStatus::Error
+ ? grpc::Status(grpc::UNKNOWN, "finish error")
+ : FinishStatus;
+ YLOG(finishStatus.ok() || Cancelled || Poisoned ? TLOG_INFO : TLOG_ERR,
+ Sprintf("EndFinish, code [%s], message [%s]",
+ ToString(finishStatus.error_code()).c_str(),
+ finishStatus.error_message().c_str()),
+ Logger);
+ if (!finishStatus.ok() && !Cancelled) {
+ ++Session.GetCounters().ErrorsCount;
+ }
+ if (!ReadPending) {
+ Session.OnGrpcCallFinished();
+ }
+ }
+
+ void TGrpcCall::EndWritesDone(EIOStatus status) {
+ YLOG_INFO(Sprintf("EndWritesDone [%s]", ToString(status).c_str()));
+ Y_VERIFY(!WritePending && !WritesDone && WritesDonePending);
+ WritesDonePending = false;
+ WritesDone = true;
+ if (CheckHasError(status, "EndWriteDone")) {
+ return;
+ }
+ if (ReadsDone) {
+ EnsureFinishStarted();
+ }
+ }
+
+ void TGrpcCall::BeginWritesDone() {
+ WritesDonePending = true;
+ ReaderWriter->WritesDone(WritesDoneTag->Ref());
+ YLOG_INFO("WritesDone started");
+ }
+
+ void TGrpcCall::BeginRead() {
+ ReadPending = true;
+ Response.Clear();
+ ReaderWriter->Read(&Response, ReadTag->Ref());
+ YLOG_DEBUG("Read started");
+ }
+
+ void TGrpcCall::BeginWrite() {
+ WritePending = true;
+ ReaderWriter->Write(Request, WriteTag->Ref());
+ YLOG_DEBUG("Write started");
+ }
+}
+
+namespace NUnifiedAgent {
+ size_t SizeOf(const TClientMessage& message) {
+ auto result = message.Payload.Size() + sizeof(TInstant);
+ if (message.Meta.Defined()) {
+ for (const auto& m: *message.Meta) {
+ result += m.first.Size() + m.second.Size();
+ }
+ }
+ return result;
+ }
+
+ TClientParameters::TClientParameters(const TString& uri)
+ : Uri(uri)
+ , SharedSecretKey(Nothing())
+ , MaxInflightBytes(DefaultMaxInflightBytes)
+ , Log(TLoggerOperator<TGlobalLog>::Log())
+ , LogRateLimitBytes(Nothing())
+ , GrpcReconnectDelay(TDuration::MilliSeconds(50))
+ , GrpcSendDelay(DefaultGrpcSendDelay)
+ , EnableForkSupport(false)
+ , GrpcMaxMessageSize(DefaultGrpcMaxMessageSize)
+ , Counters(nullptr)
+ {
+ }
+
+ TSessionParameters::TSessionParameters()
+ : SessionId(Nothing())
+ , Meta(Nothing())
+ , Counters(nullptr)
+ , MaxInflightBytes()
+ {
+ }
+
+ const size_t TClientParameters::DefaultMaxInflightBytes = 10_MB;
+ const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB;
+ const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10);
+
+ TClientPtr MakeClient(const TClientParameters& parameters) {
+ if (!grpc_is_initialized()) {
+ EnsureGrpcConfigured();
+ }
+
+ std::shared_ptr<NPrivate::TForkProtector> forkProtector{};
+#ifdef _unix_
+ if (parameters.EnableForkSupport) {
+ forkProtector = NPrivate::TForkProtector::Get(true);
+ }
+#endif
+ return MakeIntrusive<NPrivate::TClient>(parameters, forkProtector);
+ }
+}
diff --git a/library/cpp/unified_agent_client/client_impl.h b/library/cpp/unified_agent_client/client_impl.h
new file mode 100644
index 0000000000..6adadf92e3
--- /dev/null
+++ b/library/cpp/unified_agent_client/client_impl.h
@@ -0,0 +1,364 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/client.h>
+#include <library/cpp/unified_agent_client/client_proto_weighing.h>
+#include <library/cpp/unified_agent_client/counters.h>
+#include <library/cpp/unified_agent_client/logger.h>
+#include <library/cpp/unified_agent_client/variant.h>
+#include <library/cpp/unified_agent_client/proto/unified_agent.grpc.pb.h>
+#include <library/cpp/unified_agent_client/grpc_io.h>
+
+#include <library/cpp/logger/global/global.h>
+
+#include <util/generic/deque.h>
+#include <util/system/mutex.h>
+
+namespace NUnifiedAgent::NPrivate {
+ class TClientSession;
+ class TGrpcCall;
+ class TForkProtector;
+
+ class TClient: public IClient {
+ public:
+ explicit TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector);
+
+ ~TClient() override;
+
+ TClientSessionPtr CreateSession(const TSessionParameters& parameters) override;
+
+ void StartTracing(ELogPriority logPriority) override;
+
+ void FinishTracing() override;
+
+ inline const TIntrusivePtr<TClientCounters>& GetCounters() const noexcept {
+ return Counters;
+ }
+
+ inline NUnifiedAgentProto::UnifiedAgentService::Stub& GetStub() noexcept {
+ return *Stub;
+ }
+
+ TScopeLogger CreateSessionLogger();
+
+ inline const TClientParameters& GetParameters() const noexcept {
+ return Parameters;
+ }
+
+ inline grpc::CompletionQueue& GetCompletionQueue() noexcept {
+ return ActiveCompletionQueue->GetCompletionQueue();
+ }
+
+ void RegisterSession(TClientSession* session);
+
+ void UnregisterSession(TClientSession* session);
+
+ void PreFork();
+
+ void PostForkParent();
+
+ void PostForkChild();
+
+ void EnsureStarted();
+
+ private:
+ void EnsureStartedNoLock();
+
+ void EnsureStoppedNoLock();
+
+ private:
+ const TClientParameters Parameters;
+ std::shared_ptr<TForkProtector> ForkProtector;
+ TIntrusivePtr<TClientCounters> Counters;
+ TLog Log;
+ TLogger MainLogger;
+ TScopeLogger Logger;
+ std::shared_ptr<grpc::Channel> Channel;
+ std::unique_ptr<NUnifiedAgentProto::UnifiedAgentService::Stub> Stub;
+ THolder<TGrpcCompletionQueueHost> ActiveCompletionQueue;
+ std::atomic<size_t> SessionLogLabel;
+ TVector<TClientSession*> ActiveSessions;
+ bool Started;
+ bool Destroyed;
+ TAdaptiveLock Lock;
+ static std::atomic<ui64> Id;
+ };
+
+ class TForkProtector {
+ public:
+ TForkProtector();
+
+ void Register(TClient& client);
+
+ void Unregister(TClient& client);
+
+ static std::shared_ptr<TForkProtector> Get(bool createIfNotExists);
+
+ private:
+ static void PreFork();
+
+ static void PostForkParent();
+
+ static void PostForkChild();
+
+ private:
+ TVector<TClient*> Clients;
+ grpc::GrpcLibraryCodegen GrpcInitializer;
+ bool Enabled;
+ TAdaptiveLock Lock;
+
+ static std::weak_ptr<TForkProtector> Instance;
+ static TMutex InstanceLock;
+ static bool SubscribedToForks;
+ };
+
+ class TClientSession: public IClientSession {
+ public:
+ TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters);
+
+ ~TClientSession();
+
+ void Send(TClientMessage&& message) override;
+
+ NThreading::TFuture<void> CloseAsync(TInstant deadline) override;
+
+ inline TClient& GetClient() noexcept {
+ return *Client;
+ }
+
+ inline TScopeLogger& GetLogger() noexcept {
+ return Logger;
+ }
+
+ inline TClientSessionCounters& GetCounters() noexcept {
+ return *Counters;
+ }
+
+ inline TAsyncJoiner& GetAsyncJoiner() noexcept {
+ return AsyncJoiner;
+ }
+
+ void PrepareInitializeRequest(NUnifiedAgentProto::Request& target);
+
+ void PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target);
+
+ void Acknowledge(ui64 seqNo);
+
+ void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo);
+
+ void OnGrpcCallFinished();
+
+ NThreading::TFuture<void> PreFork();
+
+ void PostForkParent();
+
+ void PostForkChild();
+
+ void SetAgentMaxReceiveMessage(size_t);
+
+ private:
+ enum class EPollingStatus {
+ Active,
+ Inactive
+ };
+
+ struct TCloseRequestedEvent {
+ TInstant Deadline;
+ };
+
+ struct TMessageReceivedEvent {
+ TClientMessage Message;
+ size_t Size;
+ };
+
+ struct TPurgeWriteQueueStats {
+ size_t PurgedMessages{};
+ size_t PurgedBytes{};
+ };
+
+ using TEvent = std::variant<TCloseRequestedEvent, TMessageReceivedEvent>;
+
+ public:
+ struct TPendingMessage {
+ TClientMessage Message;
+ size_t Size;
+ bool Skipped;
+ };
+
+ class TRequestBuilder {
+ public:
+ struct TAddResult;
+
+ public:
+ TRequestBuilder(NUnifiedAgentProto::Request &target, size_t RequestPayloadLimitBytes,
+ TFMaybe<size_t> serializedRequestLimitBytes);
+
+ TAddResult TryAddMessage(const TPendingMessage& message, size_t seqNo);
+
+ void ResetCounters();
+
+ inline size_t GetSerializedRequestSize() const {
+ return SerializedRequestSize;
+ }
+
+ inline size_t GetRequestPayloadSize() const {
+ return RequestPayloadSize;
+ }
+
+ public:
+ struct TAddResult {
+ bool LimitExceeded;
+ size_t NewRequestPayloadSize; // == actual value, if !LimitExceeded
+ size_t NewSerializedRequestSize; // == actual value, if !LimitExceeded
+ };
+
+ private:
+ struct TMetaItemBuilder {
+ size_t ItemIndex;
+ size_t ValueIndex{0};
+ };
+
+ private:
+ NUnifiedAgentProto::Request& Target;
+ TFMaybe<NPW::TRequest> PwTarget;
+ THashMap<TString, TMetaItemBuilder> MetaItems;
+ size_t RequestPayloadSize;
+ size_t RequestPayloadLimitBytes;
+ size_t SerializedRequestSize;
+ TFMaybe<size_t> SerializedRequestLimitBytes;
+ bool CountersInvalid;
+ };
+
+ private:
+ void MakeGrpcCall();
+
+ void DoClose();
+
+ void BeginClose(TInstant deadline);
+
+ void Poll();
+
+ TPurgeWriteQueueStats PurgeWriteQueue();
+
+ void DoStart();
+
+ private:
+ TAsyncJoiner AsyncJoiner;
+ TIntrusivePtr<TClient> Client;
+ TFMaybe<TString> OriginalSessionId;
+ TFMaybe<TString> SessionId;
+ TFMaybe<THashMap<TString, TString>> Meta;
+ TScopeLogger Logger;
+ bool CloseStarted;
+ bool ForcedCloseStarted;
+ bool Closed;
+ bool ForkInProgressLocal;
+ bool Started;
+ NThreading::TPromise<void> ClosePromise;
+ TIntrusivePtr<TGrpcCall> ActiveGrpcCall;
+ TDeque<TPendingMessage> WriteQueue;
+ size_t TrimmedCount;
+ size_t NextIndex;
+ TFMaybe<ui64> AckSeqNo;
+ TInstant PollerLastEventTimestamp;
+ TIntrusivePtr<TClientSessionCounters> Counters;
+ THolder<TGrpcTimer> MakeGrpcCallTimer;
+ THolder<TGrpcTimer> ForceCloseTimer;
+ THolder<TGrpcTimer> PollTimer;
+ ui64 GrpcInflightMessages;
+ ui64 GrpcInflightBytes;
+
+ std::atomic<size_t> InflightBytes;
+ bool CloseRequested;
+ size_t EventsBatchSize;
+ EPollingStatus PollingStatus;
+ THolder<TGrpcNotification> EventNotification;
+ bool EventNotificationTriggered;
+ TVector<TEvent> EventsBatch;
+ TVector<TEvent> SecondaryEventsBatch;
+ std::atomic<bool> ForkInProgress;
+ TAdaptiveLock Lock;
+ size_t MaxInflightBytes;
+ TFMaybe<size_t> AgentMaxReceiveMessage;
+ };
+
+ class TGrpcCall final: public TAtomicRefCount<TGrpcCall> {
+ public:
+ explicit TGrpcCall(TClientSession& session);
+
+ void Start();
+
+ ~TGrpcCall();
+
+ void BeginClose(bool force);
+
+ void Poison();
+
+ void NotifyMessageAdded();
+
+ inline bool Initialized() const {
+ return Initialized_;
+ }
+
+ inline bool ReuseSessions() const {
+ return ReuseSessions_;
+ }
+
+ private:
+ void EndAccept(EIOStatus status);
+
+ void EndRead(EIOStatus status);
+
+ void EndWrite(EIOStatus status);
+
+ void EndFinish(EIOStatus status);
+
+ void EndWritesDone(EIOStatus);
+
+ void ScheduleWrite();
+
+ void BeginWritesDone();
+
+ bool CheckHasError(EIOStatus status, const char* method);
+
+ void SetError(const TString& error);
+
+ void EnsureFinishStarted();
+
+ void BeginRead();
+
+ void BeginWrite();
+
+ void ScheduleFinishOnError();
+
+ private:
+ TClientSession& Session;
+ TAsyncJoinerToken AsyncJoinerToken;
+ THolder<IIOCallback> AcceptTag;
+ THolder<IIOCallback> ReadTag;
+ THolder<IIOCallback> WriteTag;
+ THolder<IIOCallback> WritesDoneTag;
+ THolder<IIOCallback> FinishTag;
+ TScopeLogger Logger;
+ bool AcceptPending;
+ bool Initialized_;
+ bool ReadPending;
+ bool ReadsDone;
+ bool WritePending;
+ bool WritesBlocked;
+ bool WritesDonePending;
+ bool WritesDone;
+ bool ErrorOccured;
+ bool FinishRequested;
+ bool FinishStarted;
+ bool FinishDone;
+ bool Cancelled;
+ bool Poisoned;
+ bool PoisonPillSent;
+ bool ReuseSessions_;
+ grpc::Status FinishStatus;
+ grpc::ClientContext ClientContext;
+ std::unique_ptr<grpc::ClientAsyncReaderWriter<NUnifiedAgentProto::Request, NUnifiedAgentProto::Response>> ReaderWriter;
+ NUnifiedAgentProto::Request Request;
+ NUnifiedAgentProto::Response Response;
+ };
+}
diff --git a/library/cpp/unified_agent_client/client_proto_weighing.h b/library/cpp/unified_agent_client/client_proto_weighing.h
new file mode 100644
index 0000000000..728792e49f
--- /dev/null
+++ b/library/cpp/unified_agent_client/client_proto_weighing.h
@@ -0,0 +1,75 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/f_maybe.h>
+#include <library/cpp/unified_agent_client/proto_weighing.h>
+
+namespace NUnifiedAgent::NPW {
+ struct TMessageMetaItem: public TMessage {
+ TMessageMetaItem()
+ : TMessage()
+ , Key(this)
+ , Value(this)
+ , SkipStart(this)
+ , SkipLength(this)
+ {
+ }
+
+ explicit TMessageMetaItem(TMessage* parent)
+ : TMessage(parent)
+ , Key(this)
+ , Value(this)
+ , SkipStart(this)
+ , SkipLength(this)
+ {
+ }
+
+ explicit TMessageMetaItem(const NUnifiedAgent::TFMaybe<TFieldLink>& link)
+ : TMessage(link)
+ , Key(this)
+ , Value(this)
+ , SkipStart(this)
+ , SkipLength(this)
+ {
+ }
+
+ TStringField Key;
+ TRepeatedPtrField<TStringField> Value;
+ TRepeatedField<ui32> SkipStart;
+ TRepeatedField<ui32> SkipLength;
+ };
+
+ struct TDataBatch: public TMessage {
+ TDataBatch()
+ : TMessage()
+ , SeqNo(this)
+ , Timestamp(this)
+ , Payload(this, 2)
+ , Meta(this, 2)
+ {
+ }
+
+ TDataBatch(TMessage* parent)
+ : TMessage(parent)
+ , SeqNo(this)
+ , Timestamp(this)
+ , Payload(this, 2)
+ , Meta(this, 2)
+ {
+ }
+
+ TRepeatedField<ui64> SeqNo; // 1
+ TRepeatedField<ui64> Timestamp; // 2
+ TRepeatedPtrField<TStringField> Payload; // 100
+ TRepeatedPtrField<TMessageMetaItem> Meta; // 101
+ };
+
+ struct TRequest: public TMessage {
+ TRequest()
+ : TMessage()
+ , DataBatch(this)
+ {
+ }
+
+ TDataBatch DataBatch;
+ };
+}
diff --git a/library/cpp/unified_agent_client/clock.cpp b/library/cpp/unified_agent_client/clock.cpp
new file mode 100644
index 0000000000..192c998a02
--- /dev/null
+++ b/library/cpp/unified_agent_client/clock.cpp
@@ -0,0 +1,48 @@
+#include "clock.h"
+
+namespace NUnifiedAgent {
+ void TClock::Configure() {
+ Y_VERIFY(!Configured_);
+
+ Configured_ = true;
+ }
+
+ void TClock::SetBase(TInstant value) {
+ Y_VERIFY(Configured_);
+
+ Base_.store(value.GetValue());
+ }
+
+ void TClock::ResetBase() {
+ Base_.store(0);
+ }
+
+ void TClock::ResetBaseWithShift() {
+ Y_VERIFY(Configured_);
+
+ Shift_.store(static_cast<i64>(Base_.exchange(0)) - static_cast<i64>(::Now().GetValue()));
+ }
+
+ void TClock::SetShift(TDuration value) {
+ Y_VERIFY(Configured_);
+
+ Shift_.fetch_add(value.GetValue());
+ }
+
+ void TClock::ResetShift() {
+ Shift_.store(0);
+ }
+
+ TInstant TClock::Get() {
+ auto base = Base_.load();
+ if (base == 0) {
+ base = ::Now().GetValue();
+ }
+ base += Shift_.load();
+ return TInstant::FromValue(base);
+ }
+
+ bool TClock::Configured_{false};
+ std::atomic<ui64> TClock::Base_{0};
+ std::atomic<i64> TClock::Shift_{0};
+}
diff --git a/library/cpp/unified_agent_client/clock.h b/library/cpp/unified_agent_client/clock.h
new file mode 100644
index 0000000000..77ff44583e
--- /dev/null
+++ b/library/cpp/unified_agent_client/clock.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <util/datetime/base.h>
+
+#include <atomic>
+
+namespace NUnifiedAgent {
+ class TClock {
+ public:
+ static void Configure();
+
+ static inline bool Configured() {
+ return Configured_;
+ }
+
+ static inline TInstant Now() {
+ return Configured_ ? Get() : TInstant::Now();
+ }
+
+ static void SetBase(TInstant value);
+
+ static void ResetBase();
+
+ static void ResetBaseWithShift();
+
+ static void SetShift(TDuration value);
+
+ static void ResetShift();
+
+ static TInstant Get();
+
+ private:
+ static bool Configured_;
+ static std::atomic<ui64> Base_;
+ static std::atomic<i64> Shift_;
+ };
+}
diff --git a/library/cpp/unified_agent_client/counters.cpp b/library/cpp/unified_agent_client/counters.cpp
new file mode 100644
index 0000000000..776a86ec4e
--- /dev/null
+++ b/library/cpp/unified_agent_client/counters.cpp
@@ -0,0 +1,36 @@
+#include "counters.h"
+
+using namespace NMonitoring;
+
+namespace NUnifiedAgent {
+ TClientCounters::TClientCounters(const NMonitoring::TDynamicCounterPtr& counters)
+ : TDynamicCountersWrapper(counters)
+ , ActiveSessionsCount(GetCounter("ActiveSessionsCount", false))
+ , ClientLogDroppedBytes(GetCounter("ClientLogDroppedBytes", true))
+ {
+ }
+
+ TIntrusivePtr<TClientSessionCounters> TClientCounters::GetDefaultSessionCounters() {
+ auto group = Unwrap()->GetSubgroup("session", "default");
+ return MakeIntrusive<TClientSessionCounters>(group);
+ }
+
+ TClientSessionCounters::TClientSessionCounters(const NMonitoring::TDynamicCounterPtr& counters)
+ : TDynamicCountersWrapper(counters)
+ , ReceivedMessages(GetCounter("ReceivedMessages", true))
+ , ReceivedBytes(GetCounter("ReceivedBytes", true))
+ , AcknowledgedMessages(GetCounter("AcknowledgedMessages", true))
+ , AcknowledgedBytes(GetCounter("AcknowledgedBytes", true))
+ , InflightMessages(GetCounter("InflightMessages", false))
+ , InflightBytes(GetCounter("InflightBytes", false))
+ , GrpcWriteBatchRequests(GetCounter("GrpcWriteBatchRequests", true))
+ , GrpcInflightMessages(GetCounter("GrpcInflightMessages", false))
+ , GrpcInflightBytes(GetCounter("GrpcInflightBytes", false))
+ , GrpcCalls(GetCounter("GrpcCalls", true))
+ , GrpcCallsInitialized(GetCounter("GrpcCallsInitialized", true))
+ , DroppedMessages(GetCounter("DroppedMessages", true))
+ , DroppedBytes(GetCounter("DroppedBytes", true))
+ , ErrorsCount(GetCounter("ErrorsCount", true))
+ {
+ }
+}
diff --git a/library/cpp/unified_agent_client/counters.h b/library/cpp/unified_agent_client/counters.h
new file mode 100644
index 0000000000..3c2192c3c5
--- /dev/null
+++ b/library/cpp/unified_agent_client/counters.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/dynamic_counters_wrapper.h>
+
+namespace NUnifiedAgent {
+ struct TClientSessionCounters;
+
+ struct TClientCounters: public TDynamicCountersWrapper {
+ explicit TClientCounters(const NMonitoring::TDynamicCounterPtr& counters =
+ MakeIntrusive<NMonitoring::TDynamicCounters>());
+
+ NMonitoring::TDeprecatedCounter& ActiveSessionsCount;
+ NMonitoring::TDeprecatedCounter& ClientLogDroppedBytes;
+
+ public:
+ TIntrusivePtr<TClientSessionCounters> GetDefaultSessionCounters();
+ };
+
+ struct TClientSessionCounters: public TDynamicCountersWrapper {
+ explicit TClientSessionCounters(const NMonitoring::TDynamicCounterPtr& counters =
+ MakeIntrusive<NMonitoring::TDynamicCounters>());
+
+ NMonitoring::TDeprecatedCounter& ReceivedMessages;
+ NMonitoring::TDeprecatedCounter& ReceivedBytes;
+ NMonitoring::TDeprecatedCounter& AcknowledgedMessages;
+ NMonitoring::TDeprecatedCounter& AcknowledgedBytes;
+ NMonitoring::TDeprecatedCounter& InflightMessages;
+ NMonitoring::TDeprecatedCounter& InflightBytes;
+ NMonitoring::TDeprecatedCounter& GrpcWriteBatchRequests;
+ NMonitoring::TDeprecatedCounter& GrpcInflightMessages;
+ NMonitoring::TDeprecatedCounter& GrpcInflightBytes;
+ NMonitoring::TDeprecatedCounter& GrpcCalls;
+ NMonitoring::TDeprecatedCounter& GrpcCallsInitialized;
+ NMonitoring::TDeprecatedCounter& DroppedMessages;
+ NMonitoring::TDeprecatedCounter& DroppedBytes;
+ NMonitoring::TDeprecatedCounter& ErrorsCount;
+ };
+}
diff --git a/library/cpp/unified_agent_client/duration_counter.cpp b/library/cpp/unified_agent_client/duration_counter.cpp
new file mode 100644
index 0000000000..118778a226
--- /dev/null
+++ b/library/cpp/unified_agent_client/duration_counter.cpp
@@ -0,0 +1,41 @@
+#include "duration_counter.h"
+
+namespace NUnifiedAgent {
+ using namespace NMonitoring;
+
+ TDurationUsCounter::TDurationUsCounter(const TString& name, TDynamicCounters& owner)
+ : Counter(*owner.GetCounter(name, true))
+ , ActiveTimers()
+ , Lock()
+ {
+ }
+
+ NHPTimer::STime* TDurationUsCounter::Begin() {
+ with_lock (Lock) {
+ ActiveTimers.push_back(0);
+ auto& result = ActiveTimers.back();
+ NHPTimer::GetTime(&result);
+ return &result;
+ }
+ }
+
+ void TDurationUsCounter::End(NHPTimer::STime* startTime) {
+ with_lock (Lock) {
+ Counter += static_cast<ui64>(NHPTimer::GetTimePassed(startTime) * 1000000);
+ *startTime = 0;
+ while (!ActiveTimers.empty() && ActiveTimers.front() == 0) {
+ ActiveTimers.pop_front();
+ }
+ }
+ }
+
+ void TDurationUsCounter::Update() {
+ with_lock (Lock) {
+ for (auto& startTime : ActiveTimers) {
+ if (startTime != 0) {
+ Counter += static_cast<ui64>(NHPTimer::GetTimePassed(&startTime) * 1000000);
+ }
+ }
+ }
+ }
+}
diff --git a/library/cpp/unified_agent_client/duration_counter.h b/library/cpp/unified_agent_client/duration_counter.h
new file mode 100644
index 0000000000..dbdfc22ed4
--- /dev/null
+++ b/library/cpp/unified_agent_client/duration_counter.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <util/generic/deque.h>
+#include <util/system/hp_timer.h>
+#include <util/system/mutex.h>
+
+namespace NUnifiedAgent {
+ class TDurationUsCounter {
+ public:
+ class TScope {
+ public:
+ TScope(TDurationUsCounter& counter)
+ : Counter(counter)
+ , StartTime(Counter.Begin())
+ {
+ }
+
+ ~TScope() {
+ Counter.End(StartTime);
+ }
+
+ private:
+ TDurationUsCounter& Counter;
+ NHPTimer::STime* StartTime;
+ };
+
+ public:
+ TDurationUsCounter(const TString& name, NMonitoring::TDynamicCounters& owner);
+
+ NHPTimer::STime* Begin();
+
+ void End(NHPTimer::STime* startTime);
+
+ void Update();
+
+ private:
+ NMonitoring::TDeprecatedCounter& Counter;
+ TDeque<NHPTimer::STime> ActiveTimers;
+ TAdaptiveLock Lock;
+ };
+}
diff --git a/library/cpp/unified_agent_client/dynamic_counters_wrapper.h b/library/cpp/unified_agent_client/dynamic_counters_wrapper.h
new file mode 100644
index 0000000000..cac4c6813d
--- /dev/null
+++ b/library/cpp/unified_agent_client/dynamic_counters_wrapper.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NUnifiedAgent {
+ class TDynamicCountersWrapper: public TAtomicRefCount<TDynamicCountersWrapper> {
+ public:
+ explicit TDynamicCountersWrapper(const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters)
+ : Counters(counters)
+ {
+ }
+
+ virtual ~TDynamicCountersWrapper() = default;
+
+ const TIntrusivePtr<NMonitoring::TDynamicCounters>& Unwrap() const {
+ return Counters;
+ }
+
+ protected:
+ NMonitoring::TDeprecatedCounter& GetCounter(const TString& value, bool derivative) {
+ return *Counters->GetCounter(value, derivative);
+ }
+
+ private:
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
+ };
+
+ class TUpdatableCounters: public TDynamicCountersWrapper {
+ public:
+ using TDynamicCountersWrapper::TDynamicCountersWrapper;
+
+ virtual void Update() = 0;
+ };
+}
diff --git a/library/cpp/unified_agent_client/enum.h b/library/cpp/unified_agent_client/enum.h
new file mode 100644
index 0000000000..b21e21acb0
--- /dev/null
+++ b/library/cpp/unified_agent_client/enum.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <util/generic/serialized_enum.h>
+#include <util/generic/vector.h>
+
+namespace NUnifiedAgent {
+ namespace NPrivate {
+ using TEnumNames = TVector<const TString*>;
+
+ template <typename TEnum>
+ TEnumNames BuildEnumNames() {
+ const auto names = GetEnumNames<TEnum>();
+ auto result = TEnumNames(names.size());
+ size_t index = 0;
+ for (const auto& p: names) {
+ Y_VERIFY(static_cast<size_t>(p.first) == index);
+ result[index++] = &p.second;
+ }
+ return result;
+ }
+
+ template <typename TEnum>
+ inline const auto EnumNames = BuildEnumNames<TEnum>();
+ }
+
+ template <typename TEnum, typename = std::enable_if_t<std::is_enum_v<TEnum>>>
+ inline const TString& NameOf(TEnum val) noexcept {
+ return *NPrivate::EnumNames<TEnum>[static_cast<size_t>(val)];
+ }
+}
diff --git a/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp b/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp
new file mode 100644
index 0000000000..a9eb423d13
--- /dev/null
+++ b/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp
@@ -0,0 +1,122 @@
+#include <library/cpp/unified_agent_client/client.h>
+
+#include <library/cpp/getopt/opt.h>
+
+#include <util/string/split.h>
+
+using namespace NUnifiedAgent;
+
+class TOptions {
+public:
+ TString Uri;
+ TString SharedSecretKey;
+ TString SessionId;
+ TString SessionMeta;
+
+ TOptions(int argc, const char* argv[]) {
+ NLastGetopt::TOpts opts;
+ TString logPriorityStr;
+
+ opts
+ .AddLongOption("uri")
+ .RequiredArgument()
+ .Required()
+ .StoreResult(&Uri);
+ opts
+ .AddLongOption("shared-secret-key")
+ .RequiredArgument()
+ .Optional()
+ .StoreResult(&SharedSecretKey);
+ opts
+ .AddLongOption("session-id")
+ .RequiredArgument()
+ .Optional()
+ .StoreResult(&SessionId);
+ opts
+ .AddLongOption("session-meta", "key-value pairs separated by comma, e.g. 'k1=v1,k2=v2'")
+ .RequiredArgument()
+ .Optional()
+ .StoreResult(&SessionMeta);
+
+ opts.AddHelpOption();
+ opts.AddVersionOption();
+ NLastGetopt::TOptsParseResult res(&opts, argc, argv);
+ }
+};
+
+bool TryParseMeta(const TString& s, THashMap<TString, TString>& meta) {
+ for (auto& t: StringSplitter(s).Split(',')) {
+ TString key;
+ TString value;
+ if (!StringSplitter(t.Token()).Split('=').TryCollectInto(&key, &value)) {
+ Cout << "invalid meta, can't extract key-value pair from [" << t.Token() << "]" << Endl;
+ return false;
+ }
+ meta[key] = value;
+ }
+ return true;
+}
+
+bool TryParseLine(const TString& line, TVector<TString>& lineItems) {
+ lineItems = StringSplitter(line).Split('|').ToList<TString>();
+ Y_VERIFY(lineItems.size() >= 1);
+ if (lineItems.size() > 2) {
+ Cout << "invalid line format, expected 'k1=v1,k2=v2|payload' or just 'payload'" << Endl;
+ return false;
+ }
+ return true;
+}
+
+int main(int argc, const char* argv[]) {
+ TOptions options(argc, argv);
+
+ TClientSessionPtr sessionPtr;
+ {
+ TLog emptyLog;
+ auto clientParameters = TClientParameters(options.Uri).SetLog(emptyLog);
+ if (!options.SharedSecretKey.Empty()) {
+ clientParameters.SetSharedSecretKey(options.SharedSecretKey);
+ }
+ auto clientPtr = MakeClient(clientParameters);
+ auto sessionParameters = TSessionParameters();
+ if (!options.SessionId.Empty()) {
+ sessionParameters.SetSessionId(options.SessionId);
+ }
+ if (!options.SessionMeta.empty()) {
+ THashMap<TString, TString> sessionMeta;
+ if (!TryParseMeta(options.SessionMeta, sessionMeta)) {
+ return -1;
+ }
+ sessionParameters.SetMeta(sessionMeta);
+ }
+ sessionPtr = clientPtr->CreateSession(sessionParameters);
+ }
+
+ TString line;
+ while (true) {
+ Cin.ReadLine(line);
+ if (line.Empty()) {
+ break;
+ }
+
+ TVector<TString> lineItems;
+ if (!TryParseLine(line, lineItems)) {
+ continue;
+ }
+
+ TClientMessage clientMessage;
+ clientMessage.Payload = lineItems.back();
+ if (lineItems.size() == 2) {
+ THashMap<TString, TString> messageMeta;
+ if (!TryParseMeta(lineItems[0], messageMeta)) {
+ continue;
+ }
+ clientMessage.Meta = std::move(messageMeta);
+ }
+ sessionPtr->Send(std::move(clientMessage));
+ }
+
+ sessionPtr->Close();
+
+ return 0;
+}
diff --git a/library/cpp/unified_agent_client/f_maybe.h b/library/cpp/unified_agent_client/f_maybe.h
new file mode 100644
index 0000000000..7abd4c0aac
--- /dev/null
+++ b/library/cpp/unified_agent_client/f_maybe.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <util/generic/maybe.h>
+
+namespace NUnifiedAgent {
+ template <typename T>
+ using TFMaybe = TMaybe<T, ::NMaybe::TPolicyUndefinedFail>;
+
+ template <class T>
+ inline constexpr TFMaybe<std::decay_t<T>> MakeFMaybe(T&& value) {
+ return TMaybe<std::decay_t<T>, ::NMaybe::TPolicyUndefinedFail>(std::forward<T>(value));
+ }
+
+ template <class T, class... TArgs>
+ inline constexpr TFMaybe<T> MakeFMaybe(TArgs&&... args) {
+ return TFMaybe<T>(typename TFMaybe<T>::TInPlace{}, std::forward<TArgs>(args)...);
+ }
+
+ template <class T>
+ inline constexpr TFMaybe<std::decay_t<T>> MakeFMaybe(const TMaybe<T>& source) {
+ return source.Defined() ? MakeFMaybe(*source) : Nothing();
+ }
+}
diff --git a/library/cpp/unified_agent_client/grpc_io.cpp b/library/cpp/unified_agent_client/grpc_io.cpp
new file mode 100644
index 0000000000..6d237d75ec
--- /dev/null
+++ b/library/cpp/unified_agent_client/grpc_io.cpp
@@ -0,0 +1,161 @@
+#include "grpc_io.h"
+
+#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
+#include <contrib/libs/grpc/src/core/lib/surface/completion_queue.h>
+#include <contrib/libs/grpc/include/grpc/impl/codegen/log.h>
+
+#include <util/generic/yexception.h>
+#include <util/string/cast.h>
+#include <util/system/env.h>
+#include <util/system/mutex.h>
+#include <util/system/thread.h>
+
+namespace NUnifiedAgent {
+ namespace {
+ std::once_flag GrpcConfigured{};
+ }
+
+ TGrpcNotification::TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
+ : CompletionQueue(completionQueue)
+ , IOCallback(std::move(ioCallback))
+ , Completion(MakeHolder<grpc_cq_completion>())
+ , InQueue(false)
+ {
+ }
+
+ TGrpcNotification::~TGrpcNotification() = default;
+
+ void TGrpcNotification::Trigger() {
+ {
+ bool inQueue = false;
+ if (!InQueue.compare_exchange_strong(inQueue, true)) {
+ return;
+ }
+ }
+ grpc_core::ApplicationCallbackExecCtx callbackExecCtx;
+ grpc_core::ExecCtx execCtx;
+ IOCallback->Ref();
+ Y_VERIFY(grpc_cq_begin_op(CompletionQueue.cq(), this));
+ grpc_cq_end_op(CompletionQueue.cq(), this, nullptr,
+ [](void* self, grpc_cq_completion*) {
+ Y_VERIFY(static_cast<TGrpcNotification*>(self)->InQueue.exchange(false));
+ },
+ this, Completion.Get());
+ }
+
+ bool TGrpcNotification::FinalizeResult(void** tag, bool*) {
+ *tag = IOCallback.Get();
+ return true;
+ }
+
+ TGrpcTimer::TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
+ : CompletionQueue(completionQueue)
+ , IOCallback(std::move(ioCallback))
+ , Alarm()
+ , AlarmIsSet(false)
+ , NextTriggerTime(Nothing())
+ {
+ }
+
+ void TGrpcTimer::Set(TInstant triggerTime) {
+ if (AlarmIsSet) {
+ NextTriggerTime = triggerTime;
+ Alarm.Cancel();
+ } else {
+ AlarmIsSet = true;
+ Alarm.Set(&CompletionQueue, InstantToTimespec(triggerTime), Ref());
+ }
+ }
+
+ void TGrpcTimer::Cancel() {
+ NextTriggerTime.Clear();
+ if (AlarmIsSet) {
+ Alarm.Cancel();
+ }
+ }
+
+ IIOCallback* TGrpcTimer::Ref() {
+ IOCallback->Ref();
+ return this;
+ }
+
+ void TGrpcTimer::OnIOCompleted(EIOStatus status) {
+ Y_VERIFY(AlarmIsSet);
+ if (NextTriggerTime) {
+ Alarm.Set(&CompletionQueue, InstantToTimespec(*NextTriggerTime), this);
+ NextTriggerTime.Clear();
+ } else {
+ AlarmIsSet = false;
+ IOCallback->OnIOCompleted(status);
+ }
+ }
+
+ TGrpcCompletionQueuePoller::TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue)
+ : Queue(queue)
+ , Thread()
+ {
+ }
+
+ void TGrpcCompletionQueuePoller::Start() {
+ Thread = std::thread([this]() {
+ TThread::SetCurrentThreadName("ua_grpc_cq");
+ void* tag;
+ bool ok;
+ while (Queue.Next(&tag, &ok)) {
+ try {
+ static_cast<IIOCallback*>(tag)->OnIOCompleted(ok ? EIOStatus::Ok : EIOStatus::Error);
+ } catch (...) {
+ Y_FAIL("unexpected exception [%s]", CurrentExceptionMessage().c_str());
+ }
+ }
+ });
+ }
+
+ void TGrpcCompletionQueuePoller::Join() {
+ Thread.join();
+ }
+
+ TGrpcCompletionQueueHost::TGrpcCompletionQueueHost()
+ : CompletionQueue()
+ , Poller(CompletionQueue)
+ {
+ }
+
+ void TGrpcCompletionQueueHost::Start() {
+ Poller.Start();
+ }
+
+ void TGrpcCompletionQueueHost::Stop() {
+ CompletionQueue.Shutdown();
+ Poller.Join();
+ }
+
+ gpr_timespec InstantToTimespec(TInstant instant) {
+ gpr_timespec result;
+ result.clock_type = GPR_CLOCK_REALTIME;
+ result.tv_sec = static_cast<int64_t>(instant.Seconds());
+ result.tv_nsec = instant.NanoSecondsOfSecond();
+ return result;
+ }
+
+ void EnsureGrpcConfigured() {
+ std::call_once(GrpcConfigured, []() {
+ const auto limitStr = GetEnv("UA_GRPC_EXECUTOR_THREADS_LIMIT");
+ ui64 limit;
+ if (limitStr.Empty() || !TryFromString(limitStr, limit)) {
+ limit = 2;
+ }
+ grpc_core::Executor::SetThreadsLimit(limit);
+ });
+ }
+
+ void StartGrpcTracing() {
+ grpc_tracer_set_enabled("all", true);
+ gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
+ }
+
+ void FinishGrpcTracing() {
+ grpc_tracer_set_enabled("all", false);
+ gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
+ }
+}
diff --git a/library/cpp/unified_agent_client/grpc_io.h b/library/cpp/unified_agent_client/grpc_io.h
new file mode 100644
index 0000000000..5f368a5943
--- /dev/null
+++ b/library/cpp/unified_agent_client/grpc_io.h
@@ -0,0 +1,141 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/async_joiner.h>
+#include <library/cpp/unified_agent_client/f_maybe.h>
+
+#include <contrib/libs/grpc/include/grpcpp/alarm.h>
+#include <contrib/libs/grpc/include/grpc++/grpc++.h>
+
+#include <thread>
+
+struct grpc_cq_completion;
+
+namespace NUnifiedAgent {
+ enum class EIOStatus {
+ Ok,
+ Error
+ };
+
+ class IIOCallback {
+ public:
+ virtual ~IIOCallback() = default;
+
+ virtual IIOCallback* Ref() = 0;
+
+ virtual void OnIOCompleted(EIOStatus status) = 0;
+ };
+
+ template<typename TCallback, typename TCounter>
+ class TIOCallback: public IIOCallback {
+ public:
+ explicit TIOCallback(TCallback&& callback, TCounter* counter)
+ : Callback(std::move(callback))
+ , Counter(counter)
+ {
+ }
+
+ IIOCallback* Ref() override {
+ Counter->Ref();
+ return this;
+ }
+
+ void OnIOCompleted(EIOStatus status) override {
+ Callback(status);
+ Counter->UnRef();
+ }
+
+ private:
+ TCallback Callback;
+ TCounter* Counter;
+ };
+
+ template<typename TCallback, typename TCounter>
+ THolder<IIOCallback> MakeIOCallback(TCallback&& callback, TCounter* counter) {
+ return MakeHolder<TIOCallback<TCallback, TCounter>>(std::move(callback), counter);
+ }
+
+ template<typename TTarget, typename TCounter = TTarget>
+ THolder<IIOCallback> MakeIOCallback(TTarget* target, void (TTarget::*method)(EIOStatus),
+ TCounter* counter = nullptr)
+ {
+ return MakeIOCallback([target, method](EIOStatus status) { ((*target).*method)(status); },
+ counter ? counter : target);
+ }
+
+ class TGrpcNotification: private ::grpc::internal::CompletionQueueTag {
+ public:
+ TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback);
+
+ ~TGrpcNotification();
+
+ void Trigger();
+
+ private:
+ bool FinalizeResult(void** tag, bool* status) override;
+
+ private:
+ grpc::CompletionQueue& CompletionQueue;
+ THolder<IIOCallback> IOCallback;
+ THolder<grpc_cq_completion> Completion;
+ std::atomic<bool> InQueue;
+ };
+
+ class TGrpcTimer: private IIOCallback {
+ public:
+ TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback);
+
+ void Set(TInstant triggerTime);
+
+ void Cancel();
+
+ private:
+ IIOCallback* Ref() override;
+
+ void OnIOCompleted(EIOStatus status) override;
+
+ private:
+ grpc::CompletionQueue& CompletionQueue;
+ THolder<IIOCallback> IOCallback;
+ grpc::Alarm Alarm;
+ bool AlarmIsSet;
+ TFMaybe<TInstant> NextTriggerTime;
+ };
+
+ class TGrpcCompletionQueuePoller {
+ public:
+ explicit TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue);
+
+ void Start();
+
+ void Join();
+
+ private:
+ grpc::CompletionQueue& Queue;
+ std::thread Thread;
+ };
+
+ class TGrpcCompletionQueueHost {
+ public:
+ TGrpcCompletionQueueHost();
+
+ void Start();
+
+ void Stop();
+
+ inline grpc::CompletionQueue& GetCompletionQueue() noexcept {
+ return CompletionQueue;
+ }
+
+ private:
+ grpc::CompletionQueue CompletionQueue;
+ TGrpcCompletionQueuePoller Poller;
+ };
+
+ gpr_timespec InstantToTimespec(TInstant instant);
+
+ void EnsureGrpcConfigured();
+
+ void StartGrpcTracing();
+
+ void FinishGrpcTracing();
+}
diff --git a/library/cpp/unified_agent_client/grpc_status_code.cpp b/library/cpp/unified_agent_client/grpc_status_code.cpp
new file mode 100644
index 0000000000..662bbbe7a5
--- /dev/null
+++ b/library/cpp/unified_agent_client/grpc_status_code.cpp
@@ -0,0 +1,56 @@
+#include <contrib/libs/grpc/include/grpcpp/impl/codegen/status_code_enum.h>
+
+#include <util/stream/output.h>
+
+namespace {
+ const char* GrpcStatusCodeToString(grpc::StatusCode statusCode) {
+ switch (statusCode) {
+ case grpc::OK:
+ return "OK";
+ case grpc::CANCELLED:
+ return "CANCELLED";
+ case grpc::UNKNOWN:
+ return "UNKNOWN";
+ case grpc::INVALID_ARGUMENT:
+ return "INVALID_ARGUMENT";
+ case grpc::DEADLINE_EXCEEDED:
+ return "DEADLINE_EXCEEDED";
+ case grpc::NOT_FOUND:
+ return "NOT_FOUND";
+ case grpc::ALREADY_EXISTS:
+ return "ALREADY_EXISTS";
+ case grpc::PERMISSION_DENIED:
+ return "PERMISSION_DENIED";
+ case grpc::UNAUTHENTICATED:
+ return "UNAUTHENTICATED";
+ case grpc::RESOURCE_EXHAUSTED:
+ return "RESOURCE_EXHAUSTED";
+ case grpc::FAILED_PRECONDITION:
+ return "FAILED_PRECONDITION";
+ case grpc::ABORTED:
+ return "ABORTED";
+ case grpc::OUT_OF_RANGE:
+ return "OUT_OF_RANGE";
+ case grpc::UNIMPLEMENTED:
+ return "UNIMPLEMENTED";
+ case grpc::INTERNAL:
+ return "INTERNAL";
+ case grpc::UNAVAILABLE:
+ return "UNAVAILABLE";
+ case grpc::DATA_LOSS:
+ return "DATA_LOSS";
+ default:
+ return nullptr;
+ }
+ }
+}
+
+template <>
+void Out<grpc::StatusCode>(IOutputStream& o, grpc::StatusCode statusCode) {
+ const auto* s = GrpcStatusCodeToString(statusCode);
+ if (s == nullptr) {
+ o << "grpc::StatusCode [" << static_cast<int>(statusCode) << "]";
+ } else {
+ o << s;
+ }
+}
diff --git a/library/cpp/unified_agent_client/helpers.cpp b/library/cpp/unified_agent_client/helpers.cpp
new file mode 100644
index 0000000000..01908660f3
--- /dev/null
+++ b/library/cpp/unified_agent_client/helpers.cpp
@@ -0,0 +1,65 @@
+#include "helpers.h"
+#include <util/string/ascii.h>
+
+namespace NUnifiedAgent::NPrivate {
+ bool IsUtf8(const THashMap<TString, TString>& meta) {
+ for (const auto& p : meta) {
+ if (!IsUtf(p.first) || !IsUtf(p.second)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ ResultReplacingNonUTF ReplaceNonUTF(TStringBuf message, char signBrokenSymbol, size_t maxSize) {
+ ResultReplacingNonUTF result;
+ if (maxSize == 0) {
+ result.IsTruncated = !message.empty();
+ return result;
+ }
+ if (message.empty()) {
+ return result;
+ }
+
+ auto currentPoint = reinterpret_cast<const unsigned char*>(&message[0]);
+ auto endPoint = currentPoint + message.size();
+
+ auto pushSignBroken = [&result, signBrokenSymbol]() {
+ if (result.Data.empty() || result.Data.back() != signBrokenSymbol) {
+ result.Data.push_back(signBrokenSymbol);
+ }
+ ++result.BrokenCount;
+ };
+
+ while (currentPoint < endPoint) {
+ wchar32 rune = 0;
+ size_t rune_len = 0;
+ auto statusRead = SafeReadUTF8Char(rune, rune_len, currentPoint, endPoint);
+
+ if (statusRead == RECODE_OK) {
+ if (rune_len == 1 && !IsAsciiAlnum(*currentPoint) && !IsAsciiPunct(*currentPoint) && !IsAsciiSpace(*currentPoint)) {
+ ++currentPoint;
+ pushSignBroken();
+ } else {
+ while (rune_len != 0) {
+ result.Data.push_back(*currentPoint);
+ ++currentPoint;
+ --rune_len;
+ }
+ }
+ } else if (statusRead == RECODE_BROKENSYMBOL) {
+ ++currentPoint;
+ pushSignBroken();
+ } else {
+ pushSignBroken();
+ break;
+ }
+
+ if (result.Data.size() >= maxSize && currentPoint < endPoint) {
+ result.IsTruncated = true;
+ break;
+ }
+ }
+ return result;
+ }
+}
diff --git a/library/cpp/unified_agent_client/helpers.h b/library/cpp/unified_agent_client/helpers.h
new file mode 100644
index 0000000000..9c8f727315
--- /dev/null
+++ b/library/cpp/unified_agent_client/helpers.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include "client.h"
+
+#include <util/charset/utf8.h>
+
+namespace NUnifiedAgent::NPrivate {
+ bool IsUtf8(const THashMap<TString, TString>& meta);
+
+ struct ResultReplacingNonUTF {
+ bool IsTruncated{false};
+ size_t BrokenCount{0};
+ TString Data;
+ };
+
+ ResultReplacingNonUTF ReplaceNonUTF(TStringBuf message, char signBrokenSymbol = '?', size_t maxSize = std::numeric_limits<size_t>::max());
+}
diff --git a/library/cpp/unified_agent_client/logger.cpp b/library/cpp/unified_agent_client/logger.cpp
new file mode 100644
index 0000000000..abe049d7eb
--- /dev/null
+++ b/library/cpp/unified_agent_client/logger.cpp
@@ -0,0 +1,131 @@
+#include "logger.h"
+
+#include <library/cpp/unified_agent_client/clock.h>
+#include <library/cpp/unified_agent_client/helpers.h>
+
+#include <library/cpp/logger/log.h>
+
+#include <util/datetime/base.h>
+#include <util/stream/str.h>
+#include <util/system/getpid.h>
+#include <util/system/thread.h>
+
+namespace NUnifiedAgent {
+ namespace {
+ TString FormatLogLine(ELogPriority logLevel, const TStringBuf message, const TString& scope) {
+ TString result;
+ {
+ TStringOutput output(result);
+ output << FormatIsoLocal(TClock::Now())
+ << " " << GetPID()
+ << " " << TThread::CurrentThreadId()
+ << " " << logLevel;
+ if (!scope.Empty()) {
+ output << " " << scope;
+ }
+ output << " " << message << "\n";
+ }
+ return result;
+ }
+ }
+
+ TLogger::TThrottlerWithLock::TThrottlerWithLock(size_t rateLimitBytes)
+ : Throttler(rateLimitBytes, rateLimitBytes / 2)
+ , Lock()
+ {
+ }
+
+ bool TLogger::TThrottlerWithLock::TryConsume(double tokens) {
+ with_lock(Lock) {
+ return Throttler.TryConsume(tokens);
+ }
+ }
+
+ TLogger::TLogger(TLog& log, TFMaybe<size_t> rateLimitBytes)
+ : DefaultLogContext{log, log.IsNullLog() ? ELogPriority::TLOG_EMERG : log.FiltrationLevel()}
+ , TracingLogContexts()
+ , CurrentLogContext_()
+ , Errors(nullptr)
+ , DroppedBytes(nullptr)
+ , Throttler(rateLimitBytes.Defined() ? MakeHolder<TThrottlerWithLock>(*rateLimitBytes) : nullptr)
+ , Lock()
+ {
+ SetCurrentLogContext(DefaultLogContext);
+ }
+
+ void TLogger::SetCurrentLogContext(TLogContext& logContext) {
+ CurrentLogContext_.store(logContext.Log.IsNullLog() ? nullptr : &logContext, std::memory_order_release);
+ }
+
+ void TLogger::Log(TLog& log, ELogPriority logPriority, const TStringBuf message, const TString& scope) const {
+ try {
+ const auto logLine = FormatLogLine(logPriority, NUnifiedAgent::NPrivate::ReplaceNonUTF(message).Data, scope);
+ if (Throttler && &log == &DefaultLogContext.Log && !Throttler->TryConsume(logLine.size())) {
+ if (DroppedBytes) {
+ DroppedBytes->Add(logLine.size());
+ }
+ return;
+ }
+ log.Write(logPriority, logLine);
+ } catch (...) {
+ }
+ }
+
+ void TLogger::StartTracing(ELogPriority logPriority) noexcept {
+ with_lock(Lock) {
+ auto& logContext = GetOrCreateTracingLogContext(logPriority);
+ SetTracing(logContext, "started");
+ }
+ }
+
+ void TLogger::FinishTracing() noexcept {
+ with_lock(Lock) {
+ SetTracing(DefaultLogContext, "finished");
+ }
+ }
+
+ void TLogger::SetTracing(TLogContext& logContext, const char* action) {
+ // Lock must be held
+
+ SetCurrentLogContext(logContext);
+
+ Log(logContext.Log,
+ TLOG_INFO,
+ Sprintf("tracing %s, log priority is set to [%s]",
+ action, ToString(logContext.Priority).c_str()),
+ "");
+ }
+
+ auto TLogger::GetOrCreateTracingLogContext(ELogPriority logPriority) -> TLogContext& {
+ // Lock must be held
+
+ for (const auto& c: TracingLogContexts) {
+ if (c->Priority == logPriority) {
+ return *c;
+ }
+ }
+
+ auto newLogContext = MakeHolder<TLogContext>();
+ newLogContext->Log = TLog("cerr", logPriority);
+ newLogContext->Priority = logPriority;
+ auto* result = newLogContext.Get();
+ TracingLogContexts.push_back(std::move(newLogContext));
+ return *result;
+ }
+
+ TScopeLogger::TScopeLogger()
+ : Logger(nullptr)
+ , Scope()
+ , Errors(nullptr)
+ {
+ }
+
+ TScopeLogger::TScopeLogger(TLogger* logger,
+ const TString& scope,
+ NMonitoring::TDeprecatedCounter* errors)
+ : Logger(logger)
+ , Scope(scope)
+ , Errors(errors)
+ {
+ }
+}
diff --git a/library/cpp/unified_agent_client/logger.h b/library/cpp/unified_agent_client/logger.h
new file mode 100644
index 0000000000..d83cba92de
--- /dev/null
+++ b/library/cpp/unified_agent_client/logger.h
@@ -0,0 +1,157 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/f_maybe.h>
+#include <library/cpp/unified_agent_client/throttling.h>
+
+#include <library/cpp/logger/log.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <util/generic/string.h>
+#include <util/string/join.h>
+#include <util/string/printf.h>
+#include <util/system/file.h>
+
+#include <atomic>
+
+#define YLOG(logPriority, message, logger) \
+ do { \
+ const auto __logPriority = logPriority; \
+ if (auto* log = logger.Accept(__logPriority, false); log != nullptr) { \
+ logger.Log(*log, __logPriority, message); \
+ } \
+ } while (false)
+
+#define YLOG_EMERG(msg) YLOG(TLOG_EMERG, msg, Logger)
+#define YLOG_ALERT(msg) YLOG(TLOG_ALERT, msg, Logger)
+#define YLOG_CRIT(msg) YLOG(TLOG_CRIT, msg, Logger)
+#define YLOG_ERR(msg) YLOG(TLOG_ERR, msg, Logger)
+#define YLOG_WARNING(msg) YLOG(TLOG_WARNING, msg, Logger)
+#define YLOG_NOTICE(msg) YLOG(TLOG_NOTICE, msg, Logger)
+#define YLOG_INFO(msg) YLOG(TLOG_INFO, msg, Logger)
+#define YLOG_DEBUG(msg) YLOG(TLOG_DEBUG, msg, Logger)
+#define YLOG_RESOURCES(msg) YLOG(TLOG_RESOURCES , msg, Logger)
+
+#define YLOG_FATAL(msg) \
+ YLOG(TLOG_CRIT, msg, Logger); \
+ _Exit(1);
+
+namespace NUnifiedAgent {
+ class TScopeLogger;
+
+ class TLogger {
+ public:
+ TLogger(TLog& log, TFMaybe<size_t> rateLimitBytes);
+
+ void StartTracing(ELogPriority logPriority) noexcept;
+
+ void FinishTracing() noexcept;
+
+ inline TScopeLogger Child(const TString& v, NMonitoring::TDeprecatedCounter* errors = nullptr);
+
+ inline void SetErrorsCounter(NMonitoring::TDeprecatedCounter* counter) noexcept {
+ Errors = counter;
+ }
+
+ inline void SetDroppedBytesCounter(NMonitoring::TDeprecatedCounter* counter) noexcept {
+ DroppedBytes = counter;
+ }
+
+ inline bool HasRateLimit() const noexcept {
+ return Throttler != nullptr;
+ }
+
+ friend class TScopeLogger;
+
+ private:
+ void Log(TLog& log, ELogPriority logPriority, const TStringBuf message, const TString& scope) const;
+
+ inline TLog* Accept(ELogPriority logPriority, NMonitoring::TDeprecatedCounter* errors) const noexcept {
+ if ((logPriority <= TLOG_ERR) && (errors != nullptr)) {
+ ++(*errors);
+ }
+ auto* result = CurrentLogContext_.load(std::memory_order_acquire);
+ return result != nullptr && static_cast<int>(logPriority) <= static_cast<int>(result->Priority)
+ ? &result->Log
+ : nullptr;
+ }
+
+ private:
+ struct TLogContext {
+ TLog Log;
+ ELogPriority Priority;
+ };
+
+ class TThrottlerWithLock {
+ public:
+ explicit TThrottlerWithLock(size_t rateLimitBytes);
+
+ bool TryConsume(double tokens);
+
+ private:
+ TThrottler Throttler;
+ TAdaptiveLock Lock;
+ };
+
+ private:
+ void SetCurrentLogContext(TLogContext& logContext);
+
+ TLogContext& GetOrCreateTracingLogContext(ELogPriority logPriority);
+
+ void SetTracing(TLogContext& logContext, const char* action);
+
+ private:
+ TLogContext DefaultLogContext;
+ TVector<THolder<TLogContext>> TracingLogContexts;
+ std::atomic<TLogContext*> CurrentLogContext_;
+ NMonitoring::TDeprecatedCounter* Errors;
+ NMonitoring::TDeprecatedCounter* DroppedBytes;
+ const THolder<TThrottlerWithLock> Throttler;
+ TAdaptiveLock Lock;
+ };
+
+ class TScopeLogger {
+ public:
+ TScopeLogger();
+
+ inline void Log(TLog& log, ELogPriority logPriority, const TStringBuf message) const {
+ if (Logger) {
+ Logger->Log(log, logPriority, message, Scope);
+ }
+ }
+
+ inline TLog* Accept(ELogPriority logPriority, bool silent) const noexcept {
+ return Logger ? Logger->Accept(logPriority, silent ? nullptr : Errors) : nullptr;
+ }
+
+ inline TScopeLogger Child(const TString& v, NMonitoring::TDeprecatedCounter* errors = nullptr) {
+ return Logger
+ ? Logger->Child(Join('/', Scope, v), errors == nullptr ? Errors : errors)
+ : TScopeLogger();
+ }
+
+ inline TLogger* Unwrap() noexcept {
+ return Logger;
+ }
+
+ friend class TLogger;
+
+ private:
+ TScopeLogger(TLogger* logger,
+ const TString& scope,
+ NMonitoring::TDeprecatedCounter* errors);
+
+ private:
+ TLogger* Logger;
+ TString Scope;
+ NMonitoring::TDeprecatedCounter* Errors;
+ };
+
+ inline TScopeLogger TLogger::Child(const TString& v, NMonitoring::TDeprecatedCounter* errors) {
+ return TScopeLogger(this, v, errors == nullptr ? Errors : errors);
+ }
+
+ inline ELogPriority ToLogPriority(int level) noexcept {
+ const auto result = ClampVal(level, 0, static_cast<int>(TLOG_RESOURCES));
+ return static_cast<ELogPriority>(result);
+ }
+}
diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt b/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..9642a192c4
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt
@@ -0,0 +1,45 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-unified_agent_client-proto)
+set_property(TARGET cpp-unified_agent_client-proto PROPERTY
+ PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h
+)
+target_link_libraries(cpp-unified_agent_client-proto PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ tools-enum_parser-enum_serialization_runtime
+ contrib-libs-protobuf
+)
+target_proto_messages(cpp-unified_agent_client-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto
+)
+generate_enum_serilization(cpp-unified_agent_client-proto
+ ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/proto/unified_agent.pb.h
+)
+target_proto_addincls(cpp-unified_agent_client-proto
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(cpp-unified_agent_client-proto
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
+target_proto_plugin(cpp-unified_agent_client-proto
+ grpc_cpp
+ grpc_cpp
+)
diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt b/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..da20d2e479
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,46 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-unified_agent_client-proto)
+set_property(TARGET cpp-unified_agent_client-proto PROPERTY
+ PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h
+)
+target_link_libraries(cpp-unified_agent_client-proto PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ tools-enum_parser-enum_serialization_runtime
+ contrib-libs-protobuf
+)
+target_proto_messages(cpp-unified_agent_client-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto
+)
+generate_enum_serilization(cpp-unified_agent_client-proto
+ ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/proto/unified_agent.pb.h
+)
+target_proto_addincls(cpp-unified_agent_client-proto
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(cpp-unified_agent_client-proto
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
+target_proto_plugin(cpp-unified_agent_client-proto
+ grpc_cpp
+ grpc_cpp
+)
diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt b/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt
new file mode 100644
index 0000000000..da20d2e479
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt
@@ -0,0 +1,46 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-unified_agent_client-proto)
+set_property(TARGET cpp-unified_agent_client-proto PROPERTY
+ PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h
+)
+target_link_libraries(cpp-unified_agent_client-proto PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ tools-enum_parser-enum_serialization_runtime
+ contrib-libs-protobuf
+)
+target_proto_messages(cpp-unified_agent_client-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto
+)
+generate_enum_serilization(cpp-unified_agent_client-proto
+ ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/proto/unified_agent.pb.h
+)
+target_proto_addincls(cpp-unified_agent_client-proto
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(cpp-unified_agent_client-proto
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
+target_proto_plugin(cpp-unified_agent_client-proto
+ grpc_cpp
+ grpc_cpp
+)
diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.txt b/library/cpp/unified_agent_client/proto/CMakeLists.txt
new file mode 100644
index 0000000000..5bb4faffb4
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/library/cpp/unified_agent_client/proto/unified_agent.proto b/library/cpp/unified_agent_client/proto/unified_agent.proto
new file mode 100644
index 0000000000..68efe35747
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto/unified_agent.proto
@@ -0,0 +1,101 @@
+syntax = "proto3";
+import "google/protobuf/descriptor.proto";
+
+package NUnifiedAgentProto;
+
+option java_package = "com.yandex.unified_agent";
+option go_package = "a.yandex-team.ru/library/cpp/unified_agent_client/proto;unifiedagent";
+
+extend google.protobuf.FileOptions {
+ bool GenerateYaStyle = 66777;
+}
+
+message Request {
+ message SessionMetaItem {
+ string name = 1;
+ string value = 2;
+ }
+
+ message Initialize {
+ // Session_id provided by server, use it in case of reconnects.
+ string session_id = 1;
+
+ // Session metadata
+ repeated SessionMetaItem meta = 2;
+
+ string shared_secret_key = 3;
+ }
+
+ message MessageMetaItem {
+ // Arbitrary key-value pairs. Can be used by agent filters to modify/filter messages
+ // or to route them to target outputs.
+
+ // Meta items of all messages should be grouped by meta key, it's expected in the 'key' field.
+ // Meta values should be passed in the 'value' sequence, it corresponds to the payload
+ // sequence from DataBatch. If some messages don't have a meta with this key, the range of such messages
+ // can be passed via skip_start/skip_length sequences.
+ // For example, [{m:v1}, {}, {}, {m: v2}, {}, {m: v3}, {}, {}] can be represented as follows:
+ // key: 'm'
+ // value: ['v1', 'v2', 'v3']
+ // skip_start: [1, 4]
+ // skip_length: [2, 1]
+
+ string key = 1;
+ repeated string value = 2;
+ repeated uint32 skip_start = 3;
+ repeated uint32 skip_length = 4;
+ }
+
+ message DataBatch {
+ repeated uint64 seq_no = 1;
+ repeated uint64 timestamp = 2; //microseconds
+ repeated bytes payload = 100;
+ repeated MessageMetaItem meta = 101;
+ }
+
+ oneof request {
+ Initialize initialize = 1;
+ DataBatch data_batch = 2;
+ }
+}
+
+message Response {
+ message Initialized {
+ // Session identifier for log and deduplication purposes.
+ string session_id = 1;
+
+ // Application can skip all formed messages by seq_no upto last_seq_no - they are consumed by server.
+ uint64 last_seq_no = 2;
+ }
+
+ message Ack {
+ uint64 seq_no = 1;
+ }
+
+ oneof response {
+ Initialized initialized = 1;
+ Ack ack = 2;
+ }
+}
+
+service UnifiedAgentService {
+ rpc Session(stream Request) returns (stream Response);
+}
+
+
+// dataflow:
+// Request.initialize -> UnifiedAgent;
+// specify session_id when this is a retry. Сlient can already have sesison_id from previous init response,
+// or it can use some pregenerated sessionId for each session.
+// Response.initializeded -> client;
+// Request.entry -> UnifiedAgent;
+// ....
+// Response.ack -> client;
+// when this record is consumed by UnifiedAgent with choosen garanties UnifiedAgent will send ack to client;
+// client can forget about this log record now
+//
+// grpc finish session -> client;
+// something went wrong; client must reconnect and retry all not acknowleged records
+//
+// Exactly once retries - when reconnect, client must provide previous session_id and same seq_no`s
+// for records - only in this case UnifiedAgent can dedup.
diff --git a/library/cpp/unified_agent_client/proto_weighing.cpp b/library/cpp/unified_agent_client/proto_weighing.cpp
new file mode 100644
index 0000000000..7a532213ea
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto_weighing.cpp
@@ -0,0 +1,99 @@
+#include "proto_weighing.h"
+
+#include <google/protobuf/io/coded_stream.h>
+
+namespace NUnifiedAgent::NPW {
+ template <typename T>
+ inline size_t SizeOf(T value);
+
+ using CodedOutputStream = google::protobuf::io::CodedOutputStream;
+
+ template <>
+ inline size_t SizeOf(ui64 value) {
+ return CodedOutputStream::VarintSize64(value);
+ }
+
+ template <>
+ inline size_t SizeOf(ui32 value) {
+ return CodedOutputStream::VarintSize32(value);
+ }
+
+ template <>
+ inline size_t SizeOf(i64 value) {
+ return CodedOutputStream::VarintSize64(static_cast<google::protobuf::uint64>(value));
+ }
+
+ TFieldLink::TFieldLink(TLengthDelimited* container, bool repeated, size_t keySize)
+ : Container(container)
+ , OuterSize(0)
+ , Repeated(repeated)
+ , KeySize(keySize)
+ {
+ }
+
+ void TFieldLink::SetValueSize(bool empty, size_t size) {
+ const auto newOuterSize = empty && !Repeated ? 0 : KeySize + static_cast<int>(size);
+ Container->IncSize(newOuterSize - OuterSize);
+ OuterSize = newOuterSize;
+ }
+
+ TLengthDelimited::TLengthDelimited(const TFMaybe<TFieldLink>& link)
+ : Link(link)
+ , ByteSize(0)
+ {
+ }
+
+ void TLengthDelimited::IncSize(int sizeDelta) {
+ ByteSize += sizeDelta;
+ if (Link) {
+ const auto byteSize = static_cast<ui32>(ByteSize);
+ Link->SetValueSize(false, byteSize + SizeOf(byteSize));
+ }
+ }
+
+ template <typename T>
+ void TRepeatedField<T>::Add(T value) {
+ IncSize(static_cast<int>(SizeOf(value)));
+ }
+
+ template <typename T>
+ TNumberField<T>::TNumberField(const TFieldLink& link)
+ : Link(link)
+ {
+ }
+
+ template <typename T>
+ void TNumberField<T>::SetValue(T value) {
+ Link.SetValueSize(value == 0, SizeOf(value));
+ }
+
+ template <typename T>
+ TFixedNumberField<T>::TFixedNumberField(const TFieldLink& link)
+ : Link(link)
+ {
+ }
+
+ template <typename T>
+ void TFixedNumberField<T>::SetValue() {
+ Link.SetValueSize(false, sizeof(T));
+ }
+
+ TStringField::TStringField(const TFieldLink& link)
+ : Link(link)
+ {
+ }
+
+ void TStringField::SetValue(const TString& value) {
+ Link.SetValueSize(value.Empty(), value.Size() + SizeOf(static_cast<ui32>(value.Size())));
+ }
+
+ template class TNumberField<ui64>;
+ template class TNumberField<ui32>;
+ template class TNumberField<i64>;
+ template class TFixedNumberField<ui64>;
+ template class TFixedNumberField<ui32>;
+ template class TFixedNumberField<i64>;
+ template class TRepeatedField<ui64>;
+ template class TRepeatedField<ui32>;
+ template class TRepeatedField<i64>;
+}
diff --git a/library/cpp/unified_agent_client/proto_weighing.h b/library/cpp/unified_agent_client/proto_weighing.h
new file mode 100644
index 0000000000..47cf577e14
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto_weighing.h
@@ -0,0 +1,138 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/f_maybe.h>
+
+#include <util/generic/deque.h>
+#include <util/generic/string.h>
+
+namespace NUnifiedAgent::NPW {
+ class TLengthDelimited;
+
+ class TFieldLink {
+ public:
+ TFieldLink(TLengthDelimited* container, bool repeated = false, size_t keySize = 1);
+
+ void SetValueSize(bool empty, size_t size);
+
+ private:
+ TLengthDelimited* Container;
+ int OuterSize;
+ bool Repeated;
+ size_t KeySize;
+ };
+
+ class TLengthDelimited {
+ public:
+ explicit TLengthDelimited(const TFMaybe<TFieldLink>& link = Nothing());
+
+ void IncSize(int sizeDelta);
+
+ size_t ByteSizeLong() const {
+ return static_cast<size_t>(ByteSize);
+ }
+
+ private:
+ TFMaybe<TFieldLink> Link;
+ int ByteSize;
+ };
+
+ using TMessage = TLengthDelimited;
+
+ template <typename T>
+ class TRepeatedField: public TLengthDelimited {
+ public:
+ static_assert(std::is_same_v<T, ui32> ||
+ std::is_same_v<T, ui64> ||
+ std::is_same_v<T, i64>,
+ "type is not supported");
+
+ using TLengthDelimited::TLengthDelimited;
+
+ void Add(T value);
+ };
+
+ template <typename T>
+ class TRepeatedPtrField {
+ public:
+ explicit TRepeatedPtrField(TMessage* message, size_t keySize = 1)
+ : Message(message)
+ , Children()
+ , KeySize(keySize)
+ {
+ }
+
+ size_t GetSize() const {
+ return Children.size();
+ }
+
+ T& Get(size_t index) {
+ return Children[index];
+ }
+
+ T& Add() {
+ if constexpr (std::is_constructible<T, TFieldLink>::value) {
+ Children.emplace_back(TFieldLink(Message, true, KeySize));
+ } else {
+ Children.emplace_back(Message);
+ }
+ return Children.back();
+ }
+
+ private:
+ TMessage* Message;
+ TDeque<T> Children;
+ size_t KeySize;
+ };
+
+ template <typename T>
+ class TNumberField {
+ public:
+ static_assert(std::is_same_v<T, ui32> ||
+ std::is_same_v<T, ui64> ||
+ std::is_same_v<T, i64>,
+ "type is not supported");
+
+ explicit TNumberField(const TFieldLink& link);
+
+ void SetValue(T value);
+
+ private:
+ TFieldLink Link;
+ };
+
+ template <typename T>
+ class TFixedNumberField {
+ public:
+ static_assert(std::is_same_v<T, ui32> ||
+ std::is_same_v<T, ui64> ||
+ std::is_same_v<T, i64>,
+ "type is not supported");
+
+ explicit TFixedNumberField(const TFieldLink& link);
+
+ void SetValue();
+
+ private:
+ TFieldLink Link;
+ };
+
+ class TStringField {
+ public:
+ explicit TStringField(const TFieldLink& link);
+
+ void SetValue(const TString& value);
+
+ private:
+ TFieldLink Link;
+ };
+
+ extern template class TNumberField<ui64>;
+ extern template class TNumberField<ui32>;
+ extern template class TNumberField<i64>;
+ extern template class TFixedNumberField<ui64>;
+ extern template class TFixedNumberField<ui32>;
+ extern template class TFixedNumberField<i64>;
+ extern template class TRepeatedField<ui64>;
+ extern template class TRepeatedField<ui32>;
+ extern template class TRepeatedField<i64>;
+}
diff --git a/library/cpp/unified_agent_client/registrar.cpp b/library/cpp/unified_agent_client/registrar.cpp
new file mode 100644
index 0000000000..41f6eb34ca
--- /dev/null
+++ b/library/cpp/unified_agent_client/registrar.cpp
@@ -0,0 +1,8 @@
+#include "backend_creator.h"
+
+namespace NUnifiedAgent {
+
+ ILogBackendCreator::TFactory::TRegistrator<NUnifiedAgent::TLogBackendCreator> TLogBackendCreator::Registrar("unified_agent");
+
+}
+
diff --git a/library/cpp/unified_agent_client/throttling.cpp b/library/cpp/unified_agent_client/throttling.cpp
new file mode 100644
index 0000000000..271f7b0e7e
--- /dev/null
+++ b/library/cpp/unified_agent_client/throttling.cpp
@@ -0,0 +1,67 @@
+#include "throttling.h"
+
+#include <util/datetime/cputimer.h>
+
+namespace NUnifiedAgent {
+ TThrottler::TThrottler(double rate, TDuration updatePeriod)
+ : CyclesPerMillisecond(GetCyclesPerMillisecond())
+ , UpdatePeriod(updatePeriod.MilliSeconds() * CyclesPerMillisecond)
+ , PeriodTokens(updatePeriod.SecondsFloat() * rate)
+ , AvailableTokens(0)
+ , ExpirationTime(0)
+ {
+ }
+
+ TThrottler::TThrottler(double rate, double burst)
+ : TThrottler(rate, TDuration::Seconds(burst / rate))
+ {
+ }
+
+ void TThrottler::Consume(double& tokens, TFMaybe<TDuration>& nextCheckDelay) {
+ const auto updateTime = UpdateTokens();
+
+ if (tokens <= AvailableTokens) {
+ AvailableTokens -= tokens;
+ tokens = 0.0;
+ nextCheckDelay = Nothing();
+ } else {
+ tokens -= AvailableTokens;
+ AvailableTokens = 0.0;
+ nextCheckDelay = TDuration::MicroSeconds((ExpirationTime - updateTime) * 1000 / CyclesPerMillisecond + 1);
+ }
+ }
+
+ bool TThrottler::TryConsume(double tokens) {
+ UpdateTokens();
+
+ if (tokens > AvailableTokens) {
+ return false;
+ }
+ AvailableTokens -= tokens;
+ return true;
+ }
+
+ void TThrottler::ConsumeAndWait(double tokens) {
+ TFMaybe<TDuration> nextCheckDelay;
+ while (true) {
+ Consume(tokens, nextCheckDelay);
+ if (!nextCheckDelay.Defined()) {
+ return;
+ }
+ Sleep(*nextCheckDelay);
+ }
+ }
+
+ ui64 TThrottler::UpdateTokens() {
+ const auto updateTime = GetCycleCount();
+ if (updateTime >= ExpirationTime) {
+ if (ExpirationTime == 0) {
+ ExpirationTime = updateTime + UpdatePeriod;
+ } else {
+ ExpirationTime += ((updateTime - ExpirationTime) / UpdatePeriod + 1) * UpdatePeriod;
+ }
+ AvailableTokens = PeriodTokens;
+ }
+ return updateTime;
+ }
+}
diff --git a/library/cpp/unified_agent_client/throttling.h b/library/cpp/unified_agent_client/throttling.h
new file mode 100644
index 0000000000..1e5db1e8fa
--- /dev/null
+++ b/library/cpp/unified_agent_client/throttling.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/f_maybe.h>
+
+#include <util/datetime/base.h>
+
+namespace NUnifiedAgent {
+ class TThrottler {
+ public:
+ explicit TThrottler(double rate, TDuration updatePeriod = TDuration::MilliSeconds(100));
+
+ TThrottler(double rate, double burst);
+
+ void Consume(double& tokens, TFMaybe<TDuration>& nextCheckDelay);
+
+ bool TryConsume(double tokens);
+
+ void ConsumeAndWait(double tokens);
+
+ private:
+ ui64 UpdateTokens();
+
+ private:
+ ui64 CyclesPerMillisecond;
+ ui64 UpdatePeriod;
+ double PeriodTokens;
+ double AvailableTokens;
+ ui64 ExpirationTime;
+ };
+}
diff --git a/library/cpp/unified_agent_client/variant.h b/library/cpp/unified_agent_client/variant.h
new file mode 100644
index 0000000000..e261aa9af3
--- /dev/null
+++ b/library/cpp/unified_agent_client/variant.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <variant>
+
+namespace NUnifiedAgent {
+ template<class... Ts> struct TOverloaded : Ts... { using Ts::operator()...; };
+ template<class... Ts> TOverloaded(Ts...) -> TOverloaded<Ts...>;
+
+ template <class T, class... U>
+ auto Visit(T&& variant, U&&... visitorOverloads) {
+ return std::visit(TOverloaded{std::forward<U>(visitorOverloads)...}, std::forward<T>(variant));
+ }
+
+ template <typename TTarget, typename... TSourceTypes>
+ auto CastTo(std::variant<TSourceTypes...>&& variant) {
+ return Visit(variant, [](auto& p) -> TTarget { return std::move(p); });
+ }
+}
diff --git a/ydb/apps/ydbd/CMakeLists.darwin.txt b/ydb/apps/ydbd/CMakeLists.darwin.txt
index 2ded59f519..dbfba1c967 100644
--- a/ydb/apps/ydbd/CMakeLists.darwin.txt
+++ b/ydb/apps/ydbd/CMakeLists.darwin.txt
@@ -59,7 +59,6 @@ target_link_options(ydbd PRIVATE
CoreFoundation
)
target_sources(ydbd PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/log_backend.cpp
${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/export.cpp
${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/sqs.cpp
${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/main.cpp
diff --git a/ydb/apps/ydbd/CMakeLists.linux-aarch64.txt b/ydb/apps/ydbd/CMakeLists.linux-aarch64.txt
index 4363019c93..3aeaceee3a 100644
--- a/ydb/apps/ydbd/CMakeLists.linux-aarch64.txt
+++ b/ydb/apps/ydbd/CMakeLists.linux-aarch64.txt
@@ -61,7 +61,6 @@ target_link_options(ydbd PRIVATE
-ldl
)
target_sources(ydbd PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/log_backend.cpp
${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/export.cpp
${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/sqs.cpp
${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/main.cpp
diff --git a/ydb/apps/ydbd/CMakeLists.linux.txt b/ydb/apps/ydbd/CMakeLists.linux.txt
index 19aea829d2..554dfec80f 100644
--- a/ydb/apps/ydbd/CMakeLists.linux.txt
+++ b/ydb/apps/ydbd/CMakeLists.linux.txt
@@ -63,7 +63,6 @@ target_link_options(ydbd PRIVATE
-ldl
)
target_sources(ydbd PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/log_backend.cpp
${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/export.cpp
${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/sqs.cpp
${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/main.cpp
diff --git a/ydb/apps/ydbd/log_backend.cpp b/ydb/apps/ydbd/log_backend.cpp
deleted file mode 100644
index dc50353efa..0000000000
--- a/ydb/apps/ydbd/log_backend.cpp
+++ /dev/null
@@ -1,31 +0,0 @@
-#include "log_backend.h"
-#include <ydb/core/base/counters.h>
-
-namespace NKikimr {
-
-TAutoPtr<TLogBackend> TAuditLogBackendFactory::CreateLogBackend(
- const TKikimrRunConfig& runConfig,
- NMonitoring::TDynamicCounterPtr)
-{
- TAutoPtr<TLogBackend> logBackend;
- if (!runConfig.AppConfig.HasAuditConfig())
- return logBackend;
-
- const auto& auditConfig = runConfig.AppConfig.GetAuditConfig();
- if (auditConfig.HasAuditFilePath()) {
- const auto& filePath = auditConfig.GetAuditFilePath();
- try {
- logBackend = new TFileLogBackend(filePath);
- } catch (const TFileError& ex) {
- Cerr << "TAuditLogBackendFactory: failed to open file '" << filePath << "': " << ex.what() << Endl;
- exit(1);
- }
- }
-
- if (logBackend) {
- return logBackend;
- }
- return NActors::CreateStderrBackend();
-}
-
-} // NKikimr
diff --git a/ydb/apps/ydbd/log_backend.h b/ydb/apps/ydbd/log_backend.h
deleted file mode 100644
index 6daab1cc13..0000000000
--- a/ydb/apps/ydbd/log_backend.h
+++ /dev/null
@@ -1,13 +0,0 @@
-#pragma once
-#include <ydb/core/driver_lib/run/log_backend.h>
-
-namespace NKikimr {
-
-class TAuditLogBackendFactory : public TLogBackendFactory {
-public:
- virtual TAutoPtr<TLogBackend> CreateLogBackend(
- const TKikimrRunConfig& runConfig,
- NMonitoring::TDynamicCounterPtr counters) override;
-};
-
-} // NKikimr
diff --git a/ydb/apps/ydbd/main.cpp b/ydb/apps/ydbd/main.cpp
index 558efcaee1..3f24369b36 100644
--- a/ydb/apps/ydbd/main.cpp
+++ b/ydb/apps/ydbd/main.cpp
@@ -1,6 +1,5 @@
#include "sqs.h"
#include "export.h"
-#include "log_backend.h"
#include <ydb/core/driver_lib/run/main.h>
#include <ydb/core/security/ticket_parser.h>
#include <ydb/core/ymq/actor/auth_factory.h>
@@ -16,7 +15,6 @@ int main(int argc, char **argv) {
SetupTerminateHandler();
auto factories = std::make_shared<NKikimr::TModuleFactories>();
- factories->AuditLogBackendFactory = std::make_shared<NKikimr::TAuditLogBackendFactory>();
factories->DataShardExportFactory = std::make_shared<TDataShardExportFactory>();
factories->CreateTicketParser = NKikimr::CreateTicketParser;
factories->FolderServiceFactory = NKikimr::NFolderService::CreateMockFolderServiceActor;
diff --git a/ydb/core/CMakeLists.txt b/ydb/core/CMakeLists.txt
index 5064bfc71f..568ae75971 100644
--- a/ydb/core/CMakeLists.txt
+++ b/ydb/core/CMakeLists.txt
@@ -31,6 +31,7 @@ add_subdirectory(kesus)
add_subdirectory(keyvalue)
add_subdirectory(kqp)
add_subdirectory(load_test)
+add_subdirectory(log_backend)
add_subdirectory(metering)
add_subdirectory(mind)
add_subdirectory(mon)
diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin.txt b/ydb/core/driver_lib/run/CMakeLists.darwin.txt
index dc4f71dca1..91bdf6787a 100644
--- a/ydb/core/driver_lib/run/CMakeLists.darwin.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.darwin.txt
@@ -65,6 +65,7 @@ target_link_libraries(run PUBLIC
ydb-core-kqp
core-kqp-rm_service
ydb-core-load_test
+ ydb-core-log_backend
ydb-core-metering
ydb-core-mind
core-mind-address_classification
@@ -141,7 +142,6 @@ target_sources(run PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/dummy.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/factories.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/log_backend.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/main.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/run.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/service_initializer.cpp
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
index 8df330ddbd..49d1f9a476 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
@@ -66,6 +66,7 @@ target_link_libraries(run PUBLIC
ydb-core-kqp
core-kqp-rm_service
ydb-core-load_test
+ ydb-core-log_backend
ydb-core-metering
ydb-core-mind
core-mind-address_classification
@@ -142,7 +143,6 @@ target_sources(run PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/dummy.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/factories.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/log_backend.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/main.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/run.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/service_initializer.cpp
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux.txt b/ydb/core/driver_lib/run/CMakeLists.linux.txt
index 8df330ddbd..49d1f9a476 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux.txt
@@ -66,6 +66,7 @@ target_link_libraries(run PUBLIC
ydb-core-kqp
core-kqp-rm_service
ydb-core-load_test
+ ydb-core-log_backend
ydb-core-metering
ydb-core-mind
core-mind-address_classification
@@ -142,7 +143,6 @@ target_sources(run PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/dummy.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/factories.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/log_backend.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/main.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/run.cpp
${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/service_initializer.cpp
diff --git a/ydb/core/driver_lib/run/factories.h b/ydb/core/driver_lib/run/factories.h
index f905621ce3..22e6b5f45b 100644
--- a/ydb/core/driver_lib/run/factories.h
+++ b/ydb/core/driver_lib/run/factories.h
@@ -1,5 +1,4 @@
#pragma once
-#include "log_backend.h"
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_devicemode.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/tx/datashard/export_iface.h>
@@ -31,10 +30,6 @@ namespace NKikimr {
// A way to parameterize YDB binary, we do it via a set of factories
struct TModuleFactories {
- // A way to parameterize log backend
- std::shared_ptr<ILogBackendFactory> LogBackendFactory;
- //
- std::shared_ptr<ILogBackendFactory> AuditLogBackendFactory;
// A backend factory for Query Replay
std::shared_ptr<NKqp::IQueryReplayBackendFactory> QueryReplayBackendFactory;
//
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 4281556e7b..2036c12d5b 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -49,6 +49,8 @@
#include <ydb/core/grpc_services/grpc_mon.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
+#include <ydb/core/log_backend/log_backend.h>
+
#include <ydb/core/kesus/proxy/proxy.h>
#include <ydb/core/kesus/tablet/tablet.h>
@@ -2359,12 +2361,9 @@ TAuditWriterInitializer::TAuditWriterInitializer(const TKikimrRunConfig &runConf
void TAuditWriterInitializer::InitializeServices(TActorSystemSetup* setup, const TAppData* appData)
{
- TAutoPtr<TLogBackend> fileBackend;
- if (Factories && Factories->AuditLogBackendFactory) {
- fileBackend = Factories->AuditLogBackendFactory->CreateLogBackend(KikimrRunConfig, appData->Counters);
- }
+ auto fileBackend = CreateAuditLogBackendWithUnifiedAgent(KikimrRunConfig, appData->Counters);
if (!fileBackend)
- return;
+ return;
const auto format = Config.GetAuditConfig().GetFormat();
auto actor = NAudit::CreateAuditWriter(std::move(fileBackend), format);
diff --git a/ydb/core/driver_lib/run/log_backend.cpp b/ydb/core/driver_lib/run/log_backend.cpp
deleted file mode 100644
index 3a33ab6cac..0000000000
--- a/ydb/core/driver_lib/run/log_backend.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-#include "log_backend.h"
-
-namespace NKikimr {
-
-TAutoPtr<TLogBackend> TLogBackendFactory::CreateLogBackendFromLogConfig(const TKikimrRunConfig& runConfig) {
- TAutoPtr<TLogBackend> logBackend;
- if (runConfig.AppConfig.HasLogConfig()) {
- const auto& logConfig = runConfig.AppConfig.GetLogConfig();
- if (logConfig.HasSysLog() && logConfig.GetSysLog()) {
- const TString& service = logConfig.GetSysLogService();
- logBackend = NActors::CreateSysLogBackend(service ? service : "KIKIMR", false, true);
- } else if (logConfig.HasBackendFileName()) {
- logBackend = NActors::CreateFileBackend(logConfig.GetBackendFileName());
- }
- }
- return logBackend;
-}
-
-TAutoPtr<TLogBackend> TLogBackendFactory::CreateLogBackend(
- const TKikimrRunConfig& runConfig,
- ::NMonitoring::TDynamicCounterPtr counters) {
- Y_UNUSED(counters);
- TAutoPtr<TLogBackend> logBackend = CreateLogBackendFromLogConfig(runConfig);
- if (logBackend) {
- return logBackend;
- } else {
- return NActors::CreateStderrBackend();
- }
-}
-
-} // NKikimr
-
diff --git a/ydb/core/driver_lib/run/log_backend.h b/ydb/core/driver_lib/run/log_backend.h
deleted file mode 100644
index d641a0fd9d..0000000000
--- a/ydb/core/driver_lib/run/log_backend.h
+++ /dev/null
@@ -1,29 +0,0 @@
-#pragma once
-
-#include "config.h"
-#include <library/cpp/logger/backend.h>
-
-namespace NKikimr {
-
-// Interface for LogBackend creation
-class ILogBackendFactory {
-public:
- virtual TAutoPtr<TLogBackend> CreateLogBackend(
- const TKikimrRunConfig& runConfig,
- ::NMonitoring::TDynamicCounterPtr counters) = 0;
-
- virtual ~ILogBackendFactory() {}
-};
-
-// Default implementation if ILogBackendFactory
-class TLogBackendFactory : public ILogBackendFactory {
-protected:
- TAutoPtr<TLogBackend> CreateLogBackendFromLogConfig(const TKikimrRunConfig& runConfig);
-
-public:
- virtual TAutoPtr<TLogBackend> CreateLogBackend(
- const TKikimrRunConfig& runConfig,
- ::NMonitoring::TDynamicCounterPtr counters) override;
-};
-
-} // NKikimr
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index c25ff2807c..1f73ffea88 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -32,6 +32,7 @@
#include <ydb/core/formats/clickhouse_block.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
#include <ydb/core/grpc_services/grpc_mon.h>
+#include <ydb/core/log_backend/log_backend.h>
#include <ydb/core/mon/sync_http_mon.h>
#include <ydb/core/mon/async_http_mon.h>
#include <ydb/core/mon/crossref.h>
@@ -1089,13 +1090,8 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig)
void TKikimrRunner::InitializeLogSettings(const TKikimrRunConfig& runConfig)
{
- if (ModuleFactories && ModuleFactories->LogBackendFactory) {
- auto logBackend = ModuleFactories->LogBackendFactory->CreateLogBackend(runConfig, Counters);
- LogBackend.reset(logBackend.Release());
- } else {
- auto logBackend = TLogBackendFactory().CreateLogBackend(runConfig, Counters);
- LogBackend.reset(logBackend.Release());
- }
+ auto logBackend = CreateLogBackendWithUnifiedAgent(runConfig, Counters);
+ LogBackend.reset(logBackend.Release());
if (!runConfig.AppConfig.HasLogConfig())
return;
diff --git a/ydb/core/log_backend/CMakeLists.darwin.txt b/ydb/core/log_backend/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..94831d48f3
--- /dev/null
+++ b/ydb/core/log_backend/CMakeLists.darwin.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ydb-core-log_backend)
+target_compile_options(ydb-core-log_backend PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ydb-core-log_backend PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-unified_agent_client
+ ydb-core-base
+)
+target_sources(ydb-core-log_backend PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend_build.cpp
+)
diff --git a/ydb/core/log_backend/CMakeLists.linux-aarch64.txt b/ydb/core/log_backend/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..57ef405c9b
--- /dev/null
+++ b/ydb/core/log_backend/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ydb-core-log_backend)
+target_compile_options(ydb-core-log_backend PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ydb-core-log_backend PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-unified_agent_client
+ ydb-core-base
+)
+target_sources(ydb-core-log_backend PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend_build.cpp
+)
diff --git a/ydb/core/log_backend/CMakeLists.linux.txt b/ydb/core/log_backend/CMakeLists.linux.txt
new file mode 100644
index 0000000000..57ef405c9b
--- /dev/null
+++ b/ydb/core/log_backend/CMakeLists.linux.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ydb-core-log_backend)
+target_compile_options(ydb-core-log_backend PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ydb-core-log_backend PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-unified_agent_client
+ ydb-core-base
+)
+target_sources(ydb-core-log_backend PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend_build.cpp
+)
diff --git a/ydb/core/log_backend/CMakeLists.txt b/ydb/core/log_backend/CMakeLists.txt
new file mode 100644
index 0000000000..5bb4faffb4
--- /dev/null
+++ b/ydb/core/log_backend/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/core/log_backend/log_backend.cpp b/ydb/core/log_backend/log_backend.cpp
new file mode 100644
index 0000000000..7a5b970986
--- /dev/null
+++ b/ydb/core/log_backend/log_backend.cpp
@@ -0,0 +1,66 @@
+#include "log_backend.h"
+#include "log_backend_build.h"
+#include <ydb/core/base/counters.h>
+
+namespace NKikimr {
+
+TAutoPtr<TLogBackend> CreateLogBackendWithUnifiedAgent(
+ const TKikimrRunConfig& runConfig,
+ NMonitoring::TDynamicCounterPtr counters)
+{
+ if (runConfig.AppConfig.HasLogConfig()) {
+ const auto& logConfig = runConfig.AppConfig.GetLogConfig();
+ TAutoPtr<TLogBackend> logBackend = TLogBackendBuildHelper::CreateLogBackendFromLogConfig(logConfig);
+ if (logConfig.HasUAClientConfig()) {
+ const auto& uaClientConfig = logConfig.GetUAClientConfig();
+ auto uaCounters = GetServiceCounters(counters, "utils")->GetSubgroup("subsystem", "ua_client");
+ auto logName = uaClientConfig.GetLogName();
+ TAutoPtr<TLogBackend> uaLogBackend = TLogBackendBuildHelper::CreateLogBackendFromUAClientConfig(uaClientConfig, uaCounters, logName);
+ logBackend = logBackend ? NActors::CreateCompositeLogBackend({logBackend, uaLogBackend}) : uaLogBackend;
+ }
+ if (logBackend) {
+ return logBackend;
+ }
+ }
+
+ return NActors::CreateStderrBackend();
+}
+
+TAutoPtr<TLogBackend> CreateAuditLogBackendWithUnifiedAgent(
+ const TKikimrRunConfig& runConfig,
+ NMonitoring::TDynamicCounterPtr counters)
+{
+ TAutoPtr<TLogBackend> logBackend;
+ if (!runConfig.AppConfig.HasAuditConfig())
+ return logBackend;
+
+ const auto& auditConfig = runConfig.AppConfig.GetAuditConfig();
+ if (auditConfig.HasAuditFilePath()) {
+ const auto& filePath = auditConfig.GetAuditFilePath();
+ try {
+ logBackend = new TFileLogBackend(filePath);
+ } catch (const TFileError& ex) {
+ Cerr << "TAuditLogBackendFactoryWithUnifiedAgent: failed to open file '" << filePath << "': " << ex.what() << Endl;
+ exit(1);
+ }
+ }
+
+ if (auditConfig.GetUnifiedAgentEnable() && runConfig.AppConfig.HasLogConfig() && runConfig.AppConfig.GetLogConfig().HasUAClientConfig()) {
+ const auto& logConfig = runConfig.AppConfig.GetLogConfig();
+ const auto& uaClientConfig = logConfig.GetUAClientConfig();
+ auto uaCounters = GetServiceCounters(counters, "utils")->GetSubgroup("subsystem", "ua_client");
+ auto logName = runConfig.AppConfig.GetAuditConfig().HasLogName()
+ ? runConfig.AppConfig.GetAuditConfig().GetLogName()
+ : uaClientConfig.GetLogName();
+ TAutoPtr<TLogBackend> uaLogBackend = TLogBackendBuildHelper::CreateLogBackendFromUAClientConfig(uaClientConfig, uaCounters, logName);
+ logBackend = logBackend ? NActors::CreateCompositeLogBackend({logBackend, uaLogBackend}) : uaLogBackend;
+ }
+
+ if (logBackend) {
+ return logBackend;
+ }
+ return NActors::CreateStderrBackend();
+}
+
+} // NKikimr
+
diff --git a/ydb/core/log_backend/log_backend.h b/ydb/core/log_backend/log_backend.h
new file mode 100644
index 0000000000..f9de92f39b
--- /dev/null
+++ b/ydb/core/log_backend/log_backend.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <ydb/core/driver_lib/run/config.h>
+
+#include <library/cpp/logger/backend.h>
+
+namespace NKikimr {
+
+TAutoPtr<TLogBackend> CreateLogBackendWithUnifiedAgent(
+ const TKikimrRunConfig& runConfig,
+ NMonitoring::TDynamicCounterPtr counters);
+
+TAutoPtr<TLogBackend> CreateAuditLogBackendWithUnifiedAgent(
+ const TKikimrRunConfig& runConfig,
+ NMonitoring::TDynamicCounterPtr counters);
+
+} // NKikimr
+
diff --git a/ydb/core/log_backend/log_backend_build.cpp b/ydb/core/log_backend/log_backend_build.cpp
new file mode 100644
index 0000000000..0bdd98ffeb
--- /dev/null
+++ b/ydb/core/log_backend/log_backend_build.cpp
@@ -0,0 +1,51 @@
+#include "log_backend_build.h"
+#include <util/system/getpid.h>
+#include <library/cpp/actors/core/log.h>
+
+namespace NKikimr {
+
+TAutoPtr<TLogBackend> TLogBackendBuildHelper::CreateLogBackendFromLogConfig(const NKikimrConfig::TLogConfig& logConfig, const TString& defaultIdent) {
+ TAutoPtr<TLogBackend> logBackend;
+ if (logConfig.HasSysLog() && logConfig.GetSysLog()) {
+ const TString& service = logConfig.GetSysLogService();
+ logBackend = NActors::CreateSysLogBackend(service ? service : defaultIdent, false, true);
+ } else if (logConfig.HasBackendFileName()) {
+ logBackend = NActors::CreateFileBackend(logConfig.GetBackendFileName());
+ }
+ return logBackend;
+}
+
+TAutoPtr<TLogBackend> TLogBackendBuildHelper::CreateLogBackendFromUAClientConfig(const NKikimrConfig::TUAClientConfig& uaClientConfig, NMonitoring::TDynamicCounterPtr uaCounters, const TString& logName) {
+ auto parameters = NUnifiedAgent::TClientParameters(uaClientConfig.GetUri())
+ .SetCounters(uaCounters)
+ .SetMaxInflightBytes(uaClientConfig.GetMaxInflightBytes());
+ if (uaClientConfig.HasSharedSecretKey()) {
+ parameters.SetSharedSecretKey(uaClientConfig.GetSharedSecretKey());
+ }
+ if (uaClientConfig.HasGrpcReconnectDelayMs()) {
+ parameters.SetGrpcReconnectDelay(TDuration::MilliSeconds(uaClientConfig.GetGrpcReconnectDelayMs()));
+ }
+ if (uaClientConfig.HasGrpcSendDelayMs()) {
+ parameters.SetGrpcSendDelay(TDuration::MilliSeconds(uaClientConfig.GetGrpcSendDelayMs()));
+ }
+ if (uaClientConfig.HasGrpcMaxMessageSize()) {
+ parameters.SetGrpcMaxMessageSize(uaClientConfig.GetGrpcMaxMessageSize());
+ }
+ if (uaClientConfig.HasClientLogFile()) {
+ TLog log(uaClientConfig.GetClientLogFile(),
+ static_cast<ELogPriority>(uaClientConfig.GetClientLogPriority()));
+ parameters.SetLog(log);
+ }
+
+ auto sessionParameters = NUnifiedAgent::TSessionParameters();
+ sessionParameters.Meta.ConstructInPlace();
+ (*sessionParameters.Meta)["_pid"] = ToString(GetPID());
+ if (logName) {
+ (*sessionParameters.Meta)["_log_name"] = logName;
+ }
+
+ TAutoPtr<TLogBackend> uaLogBackend = MakeLogBackend(parameters, sessionParameters).Release();
+ return uaLogBackend;
+}
+
+} // NKikimr
diff --git a/ydb/core/log_backend/log_backend_build.h b/ydb/core/log_backend/log_backend_build.h
new file mode 100644
index 0000000000..ab672bb819
--- /dev/null
+++ b/ydb/core/log_backend/log_backend_build.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <ydb/core/base/counters.h>
+#include <ydb/core/protos/config.pb.h>
+#include <library/cpp/logger/backend.h>
+#include <library/cpp/unified_agent_client/backend.h>
+#include <util/generic/ptr.h>
+
+namespace NKikimr {
+
+class TLogBackendBuildHelper {
+public:
+ static TAutoPtr<TLogBackend> CreateLogBackendFromLogConfig(
+ const NKikimrConfig::TLogConfig& logConfig,
+ const TString& defaultIdent = "KIKIMR");
+ static TAutoPtr<TLogBackend> CreateLogBackendFromUAClientConfig(
+ const NKikimrConfig::TUAClientConfig& uaClientConfig,
+ NMonitoring::TDynamicCounterPtr uaCounters,
+ const TString& logName);
+};
+}