diff options
author | hor911 <hor911@ydb.tech> | 2023-02-09 12:40:11 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-02-09 12:40:11 +0300 |
commit | 24689527cd888aa8a640ecb5077e656b3520d373 (patch) | |
tree | a613ff4cd9567b7113e8376a17f8b85897a42790 | |
parent | 8642d3642932f03663ba7d2d9670707c192207fd (diff) | |
download | ydb-24689527cd888aa8a640ecb5077e656b3520d373.tar.gz |
Log backend move
67 files changed, 4495 insertions, 130 deletions
diff --git a/library/cpp/CMakeLists.darwin.txt b/library/cpp/CMakeLists.darwin.txt index c583b41c41..2e5648a15c 100644 --- a/library/cpp/CMakeLists.darwin.txt +++ b/library/cpp/CMakeLists.darwin.txt @@ -82,6 +82,7 @@ add_subdirectory(time_provider) add_subdirectory(timezone_conversion) add_subdirectory(tld) add_subdirectory(unicode) +add_subdirectory(unified_agent_client) add_subdirectory(uri) add_subdirectory(xml) add_subdirectory(yaml) diff --git a/library/cpp/CMakeLists.linux-aarch64.txt b/library/cpp/CMakeLists.linux-aarch64.txt index 45ca41d349..09cf4909bb 100644 --- a/library/cpp/CMakeLists.linux-aarch64.txt +++ b/library/cpp/CMakeLists.linux-aarch64.txt @@ -81,6 +81,7 @@ add_subdirectory(time_provider) add_subdirectory(timezone_conversion) add_subdirectory(tld) add_subdirectory(unicode) +add_subdirectory(unified_agent_client) add_subdirectory(uri) add_subdirectory(xml) add_subdirectory(yaml) diff --git a/library/cpp/CMakeLists.linux.txt b/library/cpp/CMakeLists.linux.txt index c583b41c41..2e5648a15c 100644 --- a/library/cpp/CMakeLists.linux.txt +++ b/library/cpp/CMakeLists.linux.txt @@ -82,6 +82,7 @@ add_subdirectory(time_provider) add_subdirectory(timezone_conversion) add_subdirectory(tld) add_subdirectory(unicode) +add_subdirectory(unified_agent_client) add_subdirectory(uri) add_subdirectory(xml) add_subdirectory(yaml) diff --git a/library/cpp/unified_agent_client/CMakeLists.darwin.txt b/library/cpp/unified_agent_client/CMakeLists.darwin.txt new file mode 100644 index 0000000000..ac4481155f --- /dev/null +++ b/library/cpp/unified_agent_client/CMakeLists.darwin.txt @@ -0,0 +1,63 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(proto) + +add_library(library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp +) +generate_enum_serilization(library-cpp-unified_agent_client + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/grpc_io.h +) + +add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client.global PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp +) diff --git a/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt b/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..01a8bd743d --- /dev/null +++ b/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt @@ -0,0 +1,65 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(proto) + +add_library(library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp +) +generate_enum_serilization(library-cpp-unified_agent_client + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/grpc_io.h +) + +add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client.global PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp +) diff --git a/library/cpp/unified_agent_client/CMakeLists.linux.txt b/library/cpp/unified_agent_client/CMakeLists.linux.txt new file mode 100644 index 0000000000..01a8bd743d --- /dev/null +++ b/library/cpp/unified_agent_client/CMakeLists.linux.txt @@ -0,0 +1,65 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(proto) + +add_library(library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp +) +generate_enum_serilization(library-cpp-unified_agent_client + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/grpc_io.h +) + +add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client.global PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp +) diff --git a/library/cpp/unified_agent_client/CMakeLists.txt b/library/cpp/unified_agent_client/CMakeLists.txt new file mode 100644 index 0000000000..5bb4faffb4 --- /dev/null +++ b/library/cpp/unified_agent_client/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/library/cpp/unified_agent_client/async_joiner.h b/library/cpp/unified_agent_client/async_joiner.h new file mode 100644 index 0000000000..ce392ef7bc --- /dev/null +++ b/library/cpp/unified_agent_client/async_joiner.h @@ -0,0 +1,42 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> + +namespace NUnifiedAgent { + class TAsyncJoiner { + public: + inline TAsyncJoiner() + : Promise(NThreading::NewPromise()) + , Refs(1) + { + } + + inline i64 Ref(i64 count = 1) noexcept { + const auto result = Refs.fetch_add(count); + Y_VERIFY(result >= 1, "already joined"); + return result; + } + + inline i64 UnRef() noexcept { + const auto prev = Refs.fetch_sub(1); + Y_VERIFY(prev >= 1); + if (prev == 1) { + auto p = Promise; + p.SetValue(); + } + return prev; + } + + inline NThreading::TFuture<void> Join() noexcept { + auto result = Promise; + UnRef(); + return result; + } + + private: + NThreading::TPromise<void> Promise; + std::atomic<i64> Refs; + }; + + using TAsyncJoinerToken = TIntrusivePtr<TAsyncJoiner>; +} diff --git a/library/cpp/unified_agent_client/backend.cpp b/library/cpp/unified_agent_client/backend.cpp new file mode 100644 index 0000000000..b3c4b4ebcf --- /dev/null +++ b/library/cpp/unified_agent_client/backend.cpp @@ -0,0 +1,112 @@ +#include "backend.h" + +#include <library/cpp/unified_agent_client/enum.h> + +#include <library/cpp/logger/record.h> + +#include <util/datetime/base.h> +#include <util/generic/guid.h> +#include <util/generic/serialized_enum.h> + +namespace NUnifiedAgent { + namespace { + class TDefaultRecordConverter : public IRecordConverter { + public: + TDefaultRecordConverter(bool stripTrailingNewLine) + : StripTrailingNewLine(stripTrailingNewLine) + , PriorityKey("_priority") + { + } + + TClientMessage Convert(const TLogRecord& rec) const override { + const auto stripTrailingNewLine = StripTrailingNewLine && + rec.Len > 0 && rec.Data[rec.Len - 1] == '\n'; + + THashMap<TString, TString> metaFlags{{PriorityKey, NameOf(rec.Priority)}}; + metaFlags.insert(rec.MetaFlags.begin(), rec.MetaFlags.end()); + + return { + TString(rec.Data, stripTrailingNewLine ? rec.Len - 1 : rec.Len), + std::move(metaFlags) + }; + } + + private: + const bool StripTrailingNewLine; + const TString PriorityKey; + }; + + class TClientSessionAdapter: public TLogBackend { + public: + explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IRecordConverter> recordConverter) + : Session(session) + , RecordConverter(std::move(recordConverter)) + { + } + + void WriteData(const TLogRecord& rec) override { + Session->Send(RecordConverter->Convert(rec)); + } + + void ReopenLog() override { + } + + private: + TClientSessionPtr Session; + THolder<IRecordConverter> RecordConverter; + }; + + class TSessionHolder { + protected: + TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters) + : Client(MakeClient(parameters)) + , Session(Client->CreateSession(sessionParameters)) + { + } + + protected: + TClientPtr Client; + TClientSessionPtr Session; + }; + + class TAgentLogBackend: private TSessionHolder, public TClientSessionAdapter { + public: + TAgentLogBackend(const TClientParameters& parameters, + const TSessionParameters& sessionParameters, + THolder<IRecordConverter> recordConverter) + : TSessionHolder(parameters, sessionParameters) + , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter)) + { + } + + ~TAgentLogBackend() override { + TSessionHolder::Session->Close(); + } + }; + } + + THolder<IRecordConverter> MakeDefaultRecordConverter(bool stripTrailingNewLine) { + return MakeHolder<TDefaultRecordConverter>(stripTrailingNewLine); + } + + THolder<TLogBackend> AsLogBackend(const TClientSessionPtr& session, bool stripTrailingNewLine) { + return MakeHolder<TClientSessionAdapter>(session, MakeDefaultRecordConverter(stripTrailingNewLine)); + } + + THolder<TLogBackend> MakeLogBackend(const TClientParameters& parameters, + const TSessionParameters& sessionParameters, + THolder<IRecordConverter> recordConverter) + { + if (!recordConverter) { + recordConverter = MakeDefaultRecordConverter(); + } + return MakeHolder<TAgentLogBackend>(parameters, sessionParameters, std::move(recordConverter)); + } + + THolder<::TLog> MakeLog(const TClientParameters& parameters, + const TSessionParameters& sessionParameters, + THolder<IRecordConverter> recordConverter) + { + return MakeHolder<::TLog>(MakeLogBackend(parameters, sessionParameters, std::move(recordConverter))); + } +} diff --git a/library/cpp/unified_agent_client/backend.h b/library/cpp/unified_agent_client/backend.h new file mode 100644 index 0000000000..41e8d146b3 --- /dev/null +++ b/library/cpp/unified_agent_client/backend.h @@ -0,0 +1,27 @@ +#pragma once + +#include <library/cpp/unified_agent_client/client.h> + +#include <library/cpp/logger/backend.h> +#include <library/cpp/logger/log.h> + +namespace NUnifiedAgent { + class IRecordConverter { + public: + virtual ~IRecordConverter() = default; + + virtual TClientMessage Convert(const TLogRecord&) const = 0; + }; + + THolder<IRecordConverter> MakeDefaultRecordConverter(bool stripTrailingNewLine = true); + + THolder<TLogBackend> AsLogBackend(const TClientSessionPtr& session, bool stripTrailingNewLine = true); + + THolder<TLogBackend> MakeLogBackend(const TClientParameters& parameters, + const TSessionParameters& sessionParameters = {}, + THolder<IRecordConverter> recordConverter = {}); + + THolder<::TLog> MakeLog(const TClientParameters& parameters, + const TSessionParameters& sessionParameters = {}, + THolder<IRecordConverter> recordConverter = {}); +} diff --git a/library/cpp/unified_agent_client/backend_creator.cpp b/library/cpp/unified_agent_client/backend_creator.cpp new file mode 100644 index 0000000000..825e3ebd2b --- /dev/null +++ b/library/cpp/unified_agent_client/backend_creator.cpp @@ -0,0 +1,63 @@ +#include "backend_creator.h" +#include <library/cpp/logger/global/global.h> + +namespace NUnifiedAgent { + + + TLogBackendCreator::TLogBackendCreator() + : TLogBackendCreatorBase("unified_agent") + {} + + bool TLogBackendCreator::Init(const IInitContext& ctx) { + if(TString socket = ctx.GetOrElse("Uri", TString())) { + ClientParams = MakeHolder<TClientParameters>(socket); + } else { + Cdbg << "Uri not set for unified_agent log backend" << Endl; + return false; + } + TString secretKey; + ctx.GetValue("SharedSecretKey", secretKey); + if (secretKey) { + ClientParams->SharedSecretKey = secretKey; + } + ctx.GetValue("MaxInflightBytes", ClientParams->MaxInflightBytes); + ctx.GetValue("GrpcSendDelay", ClientParams->GrpcSendDelay); + size_t rateLimit; + if (ctx.GetValue("LogRateLimit", rateLimit)) { + ClientParams->LogRateLimitBytes = rateLimit; + } + ctx.GetValue("GrpcReconnectDelay", ClientParams->GrpcReconnectDelay); + ctx.GetValue("GrpcMaxMessageSize", ClientParams->GrpcMaxMessageSize); + const auto ownLogger = ctx.GetChildren("OwnLogger"); + if (!ownLogger.empty() && ownLogger.front()->GetOrElse("LoggerType", TString()) != "global") { + OwnLogger = ILogBackendCreator::Create(*ownLogger.front()); + TLog log; + log.ResetBackend(OwnLogger->CreateLogBackend()); + ClientParams->SetLog(log); + } + return true; + } + + + void TLogBackendCreator::DoToJson(NJson::TJsonValue& value) const { + value["Uri"] = ClientParams->Uri; + if (ClientParams->SharedSecretKey) { + value["SharedSecretKey"] = *ClientParams->SharedSecretKey; + } + value["MaxInflightBytes"] = ClientParams->MaxInflightBytes; + value["GrpcSendDelay"] = ClientParams->GrpcSendDelay.ToString(); + if (ClientParams->LogRateLimitBytes) { + value["LogRateLimit"] = *ClientParams->LogRateLimitBytes; + } + value["GrpcReconnectDelay"] = ClientParams->GrpcReconnectDelay.ToString(); + value["GrpcMaxMessageSize"] = ClientParams->GrpcMaxMessageSize; + if (OwnLogger) { + OwnLogger->ToJson(value["OwnLogger"].AppendValue(NJson::JSON_MAP)); + } + } + + THolder<TLogBackend> TLogBackendCreator::DoCreateLogBackend() const { + return MakeLogBackend(*ClientParams); + } + +} diff --git a/library/cpp/unified_agent_client/backend_creator.h b/library/cpp/unified_agent_client/backend_creator.h new file mode 100644 index 0000000000..04928f616c --- /dev/null +++ b/library/cpp/unified_agent_client/backend_creator.h @@ -0,0 +1,25 @@ +#pragma once + +#include "backend.h" +#include <library/cpp/logger/backend_creator.h> + +namespace NUnifiedAgent { + + class TLogBackendCreator: public TLogBackendCreatorBase { + public: + TLogBackendCreator(); + bool Init(const IInitContext& ctx) override; + static TFactory::TRegistrator<TLogBackendCreator> Registrar; + + protected: + void DoToJson(NJson::TJsonValue& value) const override; + + private: + THolder<TLogBackend> DoCreateLogBackend() const override; + + private: + THolder<TClientParameters> ClientParams; + THolder<ILogBackendCreator> OwnLogger; + }; + +} diff --git a/library/cpp/unified_agent_client/client.h b/library/cpp/unified_agent_client/client.h new file mode 100644 index 0000000000..62e1210803 --- /dev/null +++ b/library/cpp/unified_agent_client/client.h @@ -0,0 +1,256 @@ +#pragma once + +#include <library/cpp/unified_agent_client/counters.h> + +#include <library/cpp/logger/log.h> +#include <library/cpp/threading/future/future.h> + +#include <util/datetime/base.h> +#include <util/generic/hash.h> +#include <util/generic/maybe.h> +#include <util/generic/string.h> + +namespace NUnifiedAgent { + struct TClientParameters { + // uri format https://github.com/grpc/grpc/blob/master/doc/naming.md + // for example: unix:///unified_agent for unix domain sockets or localhost:12345 for tcp + explicit TClientParameters(const TString& uri); + + // Simple way to protect against writing to unintended/invalid Unified Agent endpoint. + // Must correspond to 'shared_secret_key' grpc input parameter + // (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6333542#L219), + // session would end with error otherwise. + // + // Default: not set + TClientParameters& SetSharedSecretKey(const TString& sharedSecretKey) { + SharedSecretKey = sharedSecretKey; + return *this; + } + + // Max bytes count that have been received by client session but not acknowledged yet. + // When exceeded, new messages will be discarded, an error message + // will be written to the TLog instance and drop counter will be incremented. + // + // Default: 10 mb + TClientParameters& SetMaxInflightBytes(size_t maxInflightBytes) { + MaxInflightBytes = maxInflightBytes; + return *this; + } + + // TLog instance for client library's own logs. + // + // Default: TLoggerOperator<TGlobalLog>::Log() + TClientParameters& SetLog(TLog& log) { + Log = log; + return *this; + } + + // Throttle client library log by rate limit in bytes, excess will be discarded. + // + // Default: not set + TClientParameters& SetLogRateLimit(size_t bytesPerSec) { + LogRateLimitBytes = bytesPerSec; + return *this; + } + + // Try to establish new grpc session if the current one become broken. + // Session may break either due to agent unavailability, or the agent itself may + // reject new session creation if it does not satisfy certain + // conditions - shared_secret_key does not match, the session creation rate has been + // exceeded, invalid session metadata has been used and so on. + // Attempts to establish a grpc session will continue indefinitely. + // + // Default: 50 millis + TClientParameters& SetGrpcReconnectDelay(TDuration delay) { + GrpcReconnectDelay = delay; + return *this; + } + + // Grpc usually writes data to the socket faster than it comes from the client. + // This means that it's possible that each TClientMessage would be sent in it's own grpc message. + // This is expensive in terms of cpu, since grpc makes at least one syscall + // for each message on the sender and receiver sides. + // To avoid a large number of syscalls, the client holds incoming messages + // in internal buffer in hope of being able to assemble bigger grpc batch. + // This parameter sets the timeout for this delay - from IClientSession::Send + // call to the actual sending of the corresponding grpc message. + // + // Default: 10 millis. + TClientParameters& SetGrpcSendDelay(TDuration delay) { + GrpcSendDelay = delay; + return *this; + } + + // Client library sends messages to grpc in batches, this parameter + // establishes upper limit on the size of single batch in bytes. + // If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185) + // in grpc input config, it must be grater than GrpcMaxMessageSize. + // + // Default: 1 mb + TClientParameters& SetGrpcMaxMessageSize(size_t size) { + GrpcMaxMessageSize = size; + return *this; + } + + // Enable forks handling in client library. + // Multiple threads and concurrent forks are all supported is this regime. + // + // Default: false + TClientParameters& SetEnableForkSupport(bool value) { + EnableForkSupport = value; + return *this; + } + + // Client library counters. + // App can set this to some leaf of it's TDynamicCounters tree. + // Actual provided counters are listed in TClientCounters. + // + // Default: not set + TClientParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) { + return SetCounters(MakeIntrusive<TClientCounters>(counters)); + } + + TClientParameters& SetCounters(const TIntrusivePtr<TClientCounters>& counters) { + Counters = counters; + return *this; + } + + public: + static const size_t DefaultMaxInflightBytes; + static const size_t DefaultGrpcMaxMessageSize; + static const TDuration DefaultGrpcSendDelay; + + public: + TString Uri; + TMaybe<TString> SharedSecretKey; + size_t MaxInflightBytes; + TLog Log; + TMaybe<size_t> LogRateLimitBytes; + TDuration GrpcReconnectDelay; + TDuration GrpcSendDelay; + bool EnableForkSupport; + size_t GrpcMaxMessageSize; + TIntrusivePtr<TClientCounters> Counters; + }; + + struct TSessionParameters { + TSessionParameters(); + + // Session unique identifier. + // It's guaranteed that for messages with the same sessionId relative + // ordering of the messages will be preserved at all processing stages + // in library, in Unified Agent and in other systems that respect ordering (e.g., Logbroker) + // + // Default: generated automatically by Unified Agent. + TSessionParameters& SetSessionId(const TString& sessionId) { + SessionId = sessionId; + return *this; + } + + // Session metadata as key-value set. + // Can be used by agent filters and outputs for validation/routing/enrichment/etc. + // + // Default: not set + TSessionParameters& SetMeta(const THashMap<TString, TString>& meta) { + Meta = meta; + return *this; + } + + // Session counters. + // Actual provided counters are listed in TClientSessionCounters. + // + // Default: A single common for all sessions subgroup of client TDynamicCounters instance + // with label ('session': 'default'). + TSessionParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) { + return SetCounters(MakeIntrusive<TClientSessionCounters>(counters)); + } + + TSessionParameters& SetCounters(const TIntrusivePtr<TClientSessionCounters>& counters) { + Counters = counters; + return *this; + } + + // Max bytes count that have been received by client session but not acknowledged yet. + // When exceeded, new messages will be discarded, an error message + // will be written to the TLog instance and drop counter will be incremented. + // + // Default: value from client settings + TSessionParameters& SetMaxInflightBytes(size_t maxInflightBytes) { + MaxInflightBytes = maxInflightBytes; + return *this; + } + + public: + TMaybe<TString> SessionId; + TMaybe<THashMap<TString, TString>> Meta; + TIntrusivePtr<TClientSessionCounters> Counters; + TMaybe<size_t> MaxInflightBytes; + }; + + // Message data to be sent to unified agent. + struct TClientMessage { + // Opaque message payload. + TString Payload; + + // Message metadata as key-value set. + // Can be used by agent filters and outputs for validation/routing/enrichment/etc. + // + // Default: not set + TMaybe<THashMap<TString, TString>> Meta{}; + + // Message timestamp. + // + // Default: time the client library has received this instance of TClientMessage. + TMaybe<TInstant> Timestamp{}; + }; + + // Message size as it is accounted in byte-related metrics (ReceivedBytes, InflightBytes, etc). + size_t SizeOf(const TClientMessage& message); + + class IClientSession: public TAtomicRefCount<IClientSession> { + public: + virtual ~IClientSession() = default; + + // Places the message into send queue. Actual grpc call may occur later asynchronously, + // based on settings GrpcSendDelay and GrpcMaxMessageSize. + // A message can be discarded if the limits defined by the GrpcMaxMessageSize and MaxInflightBytes + // settings are exceeded, or if the Close method has already been called. + // In this case an error message will be written to the TLog instance + // and drop counter will be incremented. + virtual void Send(TClientMessage&& message) = 0; + + void Send(const TClientMessage& message) { + Send(TClientMessage(message)); + } + + // Waits until either all current inflight messages are + // acknowledged or the specified deadline is reached. + // Upon the deadline grpc connection would be forcefully dropped (via grpc::ClientContext::TryCancel). + virtual NThreading::TFuture<void> CloseAsync(TInstant deadline) = 0; + + void Close(TInstant deadline) { + CloseAsync(deadline).Wait(); + } + + void Close(TDuration timeout = TDuration::Seconds(3)) { + Close(Now() + timeout); + } + }; + using TClientSessionPtr = TIntrusivePtr<IClientSession>; + + class IClient: public TAtomicRefCount<IClient> { + public: + virtual ~IClient() = default; + + virtual TClientSessionPtr CreateSession(const TSessionParameters& parameters = {}) = 0; + + virtual void StartTracing(ELogPriority) { + } + + virtual void FinishTracing() { + } + }; + using TClientPtr = TIntrusivePtr<IClient>; + + TClientPtr MakeClient(const TClientParameters& parameters); +} diff --git a/library/cpp/unified_agent_client/client_impl.cpp b/library/cpp/unified_agent_client/client_impl.cpp new file mode 100644 index 0000000000..4db98120fd --- /dev/null +++ b/library/cpp/unified_agent_client/client_impl.cpp @@ -0,0 +1,1274 @@ +#include "client_impl.h" +#include "helpers.h" + +#include <contrib/libs/grpc/include/grpc/grpc.h> +#include <contrib/libs/grpc/src/core/lib/gpr/string.h> +#include <contrib/libs/grpc/src/core/lib/gprpp/fork.h> +#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h> + +#include <util/charset/utf8.h> +#include <util/generic/size_literals.h> +#include <util/system/env.h> + +using namespace NThreading; +using namespace NMonitoring; + +namespace NUnifiedAgent::NPrivate { + std::shared_ptr<grpc::Channel> CreateChannel(const grpc::string& target) { + grpc::ChannelArguments args; + args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE); + args.SetMaxReceiveMessageSize(Max<int>()); + args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000); + args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000); + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100); + args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 200); + args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 5000); + args.SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000); + args.SetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE, 1024*1024); + return grpc::CreateCustomChannel(target, grpc::InsecureChannelCredentials(), args); + } + + void AddMeta(NUnifiedAgentProto::Request_Initialize& init, const TString& name, const TString& value) { + auto* metaItem = init.MutableMeta()->Add(); + metaItem->SetName(name); + metaItem->SetValue(value); + } + + std::atomic<ui64> TClient::Id{0}; + + TClient::TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector) + : Parameters(parameters) + , ForkProtector(forkProtector) + , Counters(parameters.Counters ? parameters.Counters : MakeIntrusive<TClientCounters>()) + , Log(parameters.Log) + , MainLogger(Log, MakeFMaybe(Parameters.LogRateLimitBytes)) + , Logger(MainLogger.Child(Sprintf("ua_%lu", Id.fetch_add(1)))) + , Channel(nullptr) + , Stub(nullptr) + , ActiveCompletionQueue(nullptr) + , SessionLogLabel(0) + , ActiveSessions() + , Started(false) + , Destroyed(false) + , Lock() + { + MainLogger.SetDroppedBytesCounter(&Counters->ClientLogDroppedBytes); + + if (ForkProtector != nullptr) { + ForkProtector->Register(*this); + } + + EnsureStarted(); + + YLOG_INFO(Sprintf("created, uri [%s]", Parameters.Uri.c_str())); + } + + TClient::~TClient() { + with_lock(Lock) { + Y_VERIFY(ActiveSessions.empty(), "active sessions found"); + + EnsureStoppedNoLock(); + + Destroyed = true; + } + + if (ForkProtector != nullptr) { + ForkProtector->Unregister(*this); + } + + YLOG_INFO(Sprintf("destroyed, uri [%s]", Parameters.Uri.c_str())); + } + + TClientSessionPtr TClient::CreateSession(const TSessionParameters& parameters) { + return MakeIntrusive<TClientSession>(this, parameters); + } + + void TClient::StartTracing(ELogPriority logPriority) { + MainLogger.StartTracing(logPriority); + StartGrpcTracing(); + YLOG_INFO("tracing started"); + } + + void TClient::FinishTracing() { + FinishGrpcTracing(); + MainLogger.FinishTracing(); + YLOG_INFO("tracing finished"); + } + + void TClient::RegisterSession(TClientSession* session) { + with_lock(Lock) { + ActiveSessions.push_back(session); + } + } + + void TClient::UnregisterSession(TClientSession* session) { + with_lock(Lock) { + const auto it = Find(ActiveSessions, session); + Y_VERIFY(it != ActiveSessions.end()); + ActiveSessions.erase(it); + } + } + + void TClient::PreFork() { + YLOG_INFO("pre fork started"); + + Lock.Acquire(); + + auto futures = TVector<TFuture<void>>(Reserve(ActiveSessions.size())); + for (auto* s: ActiveSessions) { + futures.push_back(s->PreFork()); + } + YLOG_INFO("waiting for sessions"); + WaitAll(futures).Wait(); + + EnsureStoppedNoLock(); + + YLOG_INFO("shutdown grpc executor"); + grpc_core::Executor::SetThreadingAll(false); + + YLOG_INFO("pre fork finished"); + } + + void TClient::PostForkParent() { + YLOG_INFO("post fork parent started"); + + if (!Destroyed) { + EnsureStartedNoLock(); + } + Lock.Release(); + + for (auto* s: ActiveSessions) { + s->PostForkParent(); + } + + YLOG_INFO("post fork parent finished"); + } + + void TClient::PostForkChild() { + YLOG_INFO("post fork child started"); + + Lock.Release(); + + for (auto* s: ActiveSessions) { + s->PostForkChild(); + } + + YLOG_INFO("post fork child finished"); + } + + void TClient::EnsureStarted() { + with_lock(Lock) { + EnsureStartedNoLock(); + } + } + + void TClient::EnsureStartedNoLock() { + // Lock must be held + + if (Started) { + return; + } + + Channel = CreateChannel(Parameters.Uri); + Stub = NUnifiedAgentProto::UnifiedAgentService::NewStub(Channel); + ActiveCompletionQueue = MakeHolder<TGrpcCompletionQueueHost>(); + ActiveCompletionQueue->Start(); + + Started = true; + } + + void TClient::EnsureStoppedNoLock() { + // Lock must be held + + if (!Started) { + return; + } + + YLOG_INFO("stopping"); + ActiveCompletionQueue->Stop(); + ActiveCompletionQueue = nullptr; + Stub = nullptr; + Channel = nullptr; + YLOG_INFO("stopped"); + + Started = false; + } + + TScopeLogger TClient::CreateSessionLogger() { + return Logger.Child(ToString(SessionLogLabel.fetch_add(1))); + } + + TForkProtector::TForkProtector() + : Clients() + , GrpcInitializer() + , Enabled(grpc_core::Fork::Enabled()) + , Lock() + { + } + + void TForkProtector::Register(TClient& client) { + if (!Enabled) { + return; + } + + Y_VERIFY(grpc_is_initialized()); + Y_VERIFY(grpc_core::Fork::Enabled()); + + with_lock(Lock) { + Clients.push_back(&client); + } + } + + void TForkProtector::Unregister(TClient& client) { + if (!Enabled) { + return; + } + + with_lock(Lock) { + const auto it = Find(Clients, &client); + Y_VERIFY(it != Clients.end()); + Clients.erase(it); + } + } + + std::shared_ptr<TForkProtector> TForkProtector::Get(bool createIfNotExists) { + with_lock(InstanceLock) { + auto result = Instance.lock(); + if (!result && createIfNotExists) { + SetEnv("GRPC_ENABLE_FORK_SUPPORT", "true"); + result = std::make_shared<TForkProtector>(); + if (!result->Enabled) { + TLog log("cerr"); + TLogger logger(log, Nothing()); + auto scopeLogger = logger.Child("ua client"); + YLOG(TLOG_WARNING, + "Grpc is already initialized, can't enable fork support. " + "If forks are possible, please set environment variable GRPC_ENABLE_FORK_SUPPORT to 'true'. " + "If not, you can suppress this warning by setting EnableForkSupport " + "to false when creating the ua client.", + scopeLogger); + } else if (!SubscribedToForks) { + SubscribedToForks = true; + #ifdef _unix_ + pthread_atfork( + &TForkProtector::PreFork, + &TForkProtector::PostForkParent, + &TForkProtector::PostForkChild); + #endif + } + + Instance = result; + } + return result; + } + } + + void TForkProtector::PreFork() { + auto self = Get(false); + if (!self) { + return; + } + self->Lock.Acquire(); + for (auto* c : self->Clients) { + c->PreFork(); + } + } + + void TForkProtector::PostForkParent() { + auto self = Get(false); + if (!self) { + return; + } + for (auto* c : self->Clients) { + c->PostForkParent(); + } + self->Lock.Release(); + } + + void TForkProtector::PostForkChild() { + auto self = Get(false); + if (!self) { + return; + } + for (auto* c : self->Clients) { + c->PostForkChild(); + } + self->Lock.Release(); + } + + std::weak_ptr<TForkProtector> TForkProtector::Instance{}; + TMutex TForkProtector::InstanceLock{}; + bool TForkProtector::SubscribedToForks{false}; + + TClientSession::TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters) + : AsyncJoiner() + , Client(client) + , OriginalSessionId(MakeFMaybe(parameters.SessionId)) + , SessionId(OriginalSessionId) + , Meta(MakeFMaybe(parameters.Meta)) + , Logger(Client->CreateSessionLogger()) + , CloseStarted(false) + , ForcedCloseStarted(false) + , Closed(false) + , ForkInProgressLocal(false) + , Started(false) + , ClosePromise() + , ActiveGrpcCall(nullptr) + , WriteQueue() + , TrimmedCount(0) + , NextIndex(0) + , AckSeqNo(Nothing()) + , PollerLastEventTimestamp() + , Counters(parameters.Counters ? parameters.Counters : Client->GetCounters()->GetDefaultSessionCounters()) + , MakeGrpcCallTimer(nullptr) + , ForceCloseTimer(nullptr) + , PollTimer(nullptr) + , GrpcInflightMessages(0) + , GrpcInflightBytes(0) + , InflightBytes(0) + , CloseRequested(false) + , EventsBatchSize(0) + , PollingStatus(EPollingStatus::Inactive) + , EventNotification(nullptr) + , EventNotificationTriggered(false) + , EventsBatch() + , SecondaryEventsBatch() + , ForkInProgress(false) + , Lock() + , MaxInflightBytes( + parameters.MaxInflightBytes.GetOrElse(Client->GetParameters().MaxInflightBytes)) + , AgentMaxReceiveMessage(Nothing()) { + if (Meta.Defined() && !IsUtf8(*Meta)) { + throw std::runtime_error("session meta contains non UTF-8 characters"); + } + Y_ENSURE(!(Client->GetParameters().EnableForkSupport && SessionId.Defined()), + "explicit session id is not supported with forks"); + Client->RegisterSession(this); + + with_lock(Lock) { + DoStart(); + } + } + + TFuture<void> TClientSession::PreFork() { + YLOG_INFO("pre fork started"); + + Lock.Acquire(); + + YLOG_INFO("triggering event notification"); + if (!EventNotificationTriggered) { + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + + YLOG_INFO("setting 'fork in progress' flag"); + ForkInProgress.store(true); + + if (!Started) { + ClosePromise.TrySetValue(); + } + YLOG_INFO("pre fork finished"); + return ClosePromise.GetFuture(); + } + + void TClientSession::PostForkParent() { + YLOG_INFO("post fork parent started"); + ForkInProgress.store(false); + ForkInProgressLocal = false; + Started = false; + + if (!CloseRequested) { + DoStart(); + + YLOG_INFO("triggering event notification"); + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + + Lock.Release(); + + YLOG_INFO("post fork parent finished"); + } + + void TClientSession::PostForkChild() { + YLOG_INFO("post fork child started"); + ForkInProgress.store(false); + ForkInProgressLocal = false; + Started = false; + + SessionId.Clear(); + TrimmedCount = 0; + NextIndex = 0; + AckSeqNo.Clear(); + PurgeWriteQueue(); + EventsBatch.clear(); + SecondaryEventsBatch.clear(); + EventsBatchSize = 0; + + Lock.Release(); + + YLOG_INFO("post fork child finished"); + } + + void TClientSession::SetAgentMaxReceiveMessage(size_t newValue) { + AgentMaxReceiveMessage = newValue; + } + + void TClientSession::DoStart() { + // Lock must be held + + Y_VERIFY(!Started); + YLOG_INFO("starting"); + + Client->EnsureStarted(); + + MakeGrpcCallTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + if (status == EIOStatus::Error) { + return; + } + MakeGrpcCall(); + }, &AsyncJoiner)); + ForceCloseTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + if (status == EIOStatus::Error) { + return; + } + YLOG_INFO("ForceCloseTimer"); + BeginClose(TInstant::Zero()); + }, &AsyncJoiner)); + PollTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + if (status == EIOStatus::Error) { + return; + } + Poll(); + }, &AsyncJoiner)); + EventNotification = MakeHolder<TGrpcNotification>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + Y_VERIFY(status == EIOStatus::Ok); + Poll(); + }, &AsyncJoiner)); + + CloseStarted = false; + ForcedCloseStarted = false; + Closed = false; + ClosePromise = NewPromise(); + EventNotificationTriggered = false; + PollerLastEventTimestamp = Now(); + PollingStatus = EPollingStatus::Inactive; + + ++Client->GetCounters()->ActiveSessionsCount; + MakeGrpcCallTimer->Set(Now()); + YLOG_INFO(Sprintf("started, sessionId [%s]", OriginalSessionId.GetOrElse("").c_str())); + + Started = true; + } + + void TClientSession::MakeGrpcCall() { + if (Closed) { + YLOG_INFO("MakeGrpcCall, session already closed"); + return; + } + Y_VERIFY(!ForcedCloseStarted); + Y_VERIFY(!ActiveGrpcCall); + ActiveGrpcCall = MakeIntrusive<TGrpcCall>(*this); + ActiveGrpcCall->Start(); + ++Counters->GrpcCalls; + if (CloseStarted) { + ActiveGrpcCall->BeginClose(false); + } + } + + TClientSession::~TClientSession() { + Close(TInstant::Zero()); + AsyncJoiner.Join().Wait(); + Client->UnregisterSession(this); + YLOG_INFO("destroyed"); + } + + void TClientSession::Send(TClientMessage&& message) { + const auto messageSize = SizeOf(message); + ++Counters->ReceivedMessages; + Counters->ReceivedBytes += messageSize; + if (messageSize > Client->GetParameters().GrpcMaxMessageSize) { + YLOG_ERR(Sprintf("message size [%lu] is greater than max grpc message size [%lu], message dropped", + messageSize, Client->GetParameters().GrpcMaxMessageSize)); + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + if (message.Meta.Defined() && !IsUtf8(*message.Meta)) { + YLOG_ERR("message meta contains non UTF-8 characters, message dropped"); + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + if (!message.Timestamp.Defined()) { + message.Timestamp = TInstant::Now(); + } + ++Counters->InflightMessages; + Counters->InflightBytes += messageSize; + { + auto g = Guard(Lock); + + if (!Started) { + DoStart(); + } + + if (CloseRequested) { + g.Release(); + YLOG_ERR(Sprintf("session is closing, message dropped, [%lu] bytes", messageSize)); + --Counters->InflightMessages; + Counters->InflightBytes -= messageSize; + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + if (InflightBytes.load() + messageSize > MaxInflightBytes) { + g.Release(); + YLOG_ERR(Sprintf("max inflight of [%lu] bytes reached, [%lu] bytes dropped", + MaxInflightBytes, messageSize)); + --Counters->InflightMessages; + Counters->InflightBytes -= messageSize; + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + InflightBytes.fetch_add(messageSize); + EventsBatch.push_back(TMessageReceivedEvent{std::move(message), messageSize}); + EventsBatchSize += messageSize; + if ((PollingStatus == EPollingStatus::Inactive || + EventsBatchSize >= Client->GetParameters().GrpcMaxMessageSize) && + !EventNotificationTriggered) + { + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + } + } + + TFuture<void> TClientSession::CloseAsync(TInstant deadline) { + YLOG_INFO(Sprintf("close, deadline [%s]", ToString(deadline).c_str())); + if (!ClosePromise.GetFuture().HasValue()) { + with_lock(Lock) { + if (!Started) { + return MakeFuture(); + } + + CloseRequested = true; + + EventsBatch.push_back(TCloseRequestedEvent{deadline}); + if (!EventNotificationTriggered) { + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + } + } + return ClosePromise.GetFuture(); + } + + void TClientSession::BeginClose(TInstant deadline) { + if (Closed) { + return; + } + if (!CloseStarted) { + CloseStarted = true; + YLOG_INFO("close started"); + } + const auto force = deadline == TInstant::Zero(); + if (force && !ForcedCloseStarted) { + ForcedCloseStarted = true; + YLOG_INFO("forced close started"); + } + if (!ActiveGrpcCall && (ForcedCloseStarted || WriteQueue.empty())) { + DoClose(); + } else { + if (!force) { + ForceCloseTimer->Set(deadline); + } + if (ActiveGrpcCall) { + ActiveGrpcCall->BeginClose(ForcedCloseStarted); + } + } + } + + void TClientSession::Poll() { + if (ForkInProgressLocal) { + return; + } + + const auto now = Now(); + const auto sendDelay = Client->GetParameters().GrpcSendDelay; + const auto oldPollingStatus = PollingStatus; + + { + if (!Lock.TryAcquire()) { + TSpinWait sw; + + while (Lock.IsLocked() || !Lock.TryAcquire()) { + if (ForkInProgress.load()) { + YLOG_INFO("poller 'fork in progress' signal received, stopping session"); + ForkInProgressLocal = true; + if (!ActiveGrpcCall || !ActiveGrpcCall->Initialized()) { + BeginClose(TInstant::Max()); + } else if (ActiveGrpcCall->ReuseSessions()) { + ActiveGrpcCall->Poison(); + BeginClose(TInstant::Max()); + } else { + BeginClose(TInstant::Zero()); + } + return; + } + sw.Sleep(); + } + } + + if (!EventsBatch.empty()) { + DoSwap(EventsBatch, SecondaryEventsBatch); + EventsBatchSize = 0; + PollerLastEventTimestamp = now; + } + const auto needNextPollStep = sendDelay != TDuration::Zero() && + !CloseRequested && + (now - PollerLastEventTimestamp) < 10 * sendDelay; + PollingStatus = needNextPollStep ? EPollingStatus::Active : EPollingStatus::Inactive; + EventNotificationTriggered = false; + + Lock.Release(); + } + + if (PollingStatus == EPollingStatus::Active) { + PollTimer->Set(now + sendDelay); + } + if (PollingStatus != oldPollingStatus) { + YLOG_DEBUG(Sprintf("poller %s", PollingStatus == EPollingStatus::Active ? "started" : "stopped")); + } + if (auto& batch = SecondaryEventsBatch; !batch.empty()) { + auto closeIt = FindIf(batch, [](const auto& e) { + return std::holds_alternative<TCloseRequestedEvent>(e); + }); + + if (auto it = begin(batch); it != closeIt) { + Y_VERIFY(!CloseStarted); + do { + auto& e = std::get<TMessageReceivedEvent>(*it++); + WriteQueue.push_back({std::move(e.Message), e.Size, false}); + } while (it != closeIt); + if (ActiveGrpcCall) { + ActiveGrpcCall->NotifyMessageAdded(); + } + } + + for (auto endIt = end(batch); closeIt != endIt; ++closeIt) { + const auto& e = std::get<TCloseRequestedEvent>(*closeIt); + BeginClose(e.Deadline); + } + + batch.clear(); + } + }; + + void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) { + auto& initializeMessage = *target.MutableInitialize(); + if (SessionId.Defined()) { + initializeMessage.SetSessionId(*SessionId); + } + if (Client->GetParameters().SharedSecretKey.Defined()) { + initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey); + } + if (Meta.Defined()) { + for (const auto& p: *Meta) { + AddMeta(initializeMessage, p.first, p.second); + } + } + if (!Meta.Defined() || Meta->find("_reusable") == Meta->end()) { + AddMeta(initializeMessage, "_reusable", "true"); + } + } + + TClientSession::TRequestBuilder::TRequestBuilder(NUnifiedAgentProto::Request& target, size_t RequestPayloadLimitBytes, + TFMaybe<size_t> serializedRequestLimitBytes) + : Target(target) + , PwTarget(MakeFMaybe<NPW::TRequest>()) + , MetaItems() + , RequestPayloadSize(0) + , RequestPayloadLimitBytes(RequestPayloadLimitBytes) + , SerializedRequestSize(0) + , SerializedRequestLimitBytes(serializedRequestLimitBytes) + , CountersInvalid(false) + { + } + + void TClientSession::TRequestBuilder::ResetCounters() { + RequestPayloadSize = 0; + SerializedRequestSize = 0; + PwTarget.Clear(); + PwTarget.ConstructInPlace(); + CountersInvalid = false; + } + + TClientSession::TRequestBuilder::TAddResult TClientSession::TRequestBuilder::TryAddMessage( + const TPendingMessage& message, size_t seqNo) { + Y_VERIFY(!CountersInvalid); + { + // add item to pwRequest to increase calculated size + PwTarget->DataBatch.SeqNo.Add(seqNo); + PwTarget->DataBatch.Timestamp.Add(message.Message.Timestamp->MicroSeconds()); + PwTarget->DataBatch.Payload.Add().SetValue(message.Message.Payload); + if (message.Message.Meta.Defined()) { + for (const auto &m: *message.Message.Meta) { + TMetaItemBuilder *metaItemBuilder = nullptr; + { + auto it = MetaItems.find(m.first); + if (it == MetaItems.end()) { + PwTarget->DataBatch.Meta.Add().Key.SetValue(m.first); + } else { + metaItemBuilder = &it->second; + } + } + size_t metaItemIdx = (metaItemBuilder != nullptr) ? metaItemBuilder->ItemIndex : + PwTarget->DataBatch.Meta.GetSize() - 1; + auto &pwMetaItem = PwTarget->DataBatch.Meta.Get(metaItemIdx); + pwMetaItem.Value.Add().SetValue(m.second); + const auto index = Target.GetDataBatch().SeqNoSize(); + if ((metaItemBuilder != nullptr && metaItemBuilder->ValueIndex != index) || + (metaItemBuilder == nullptr && index != 0)) { + const auto valueIdx = (metaItemBuilder) ? metaItemBuilder->ValueIndex : 0; + pwMetaItem.SkipStart.Add(valueIdx); + pwMetaItem.SkipLength.Add(index - valueIdx); + } + } + } + } + const auto newSerializedRequestSize = PwTarget->ByteSizeLong(); + const auto newPayloadSize = RequestPayloadSize + message.Size; + if ((SerializedRequestLimitBytes.Defined() && newSerializedRequestSize > *SerializedRequestLimitBytes) || + newPayloadSize > RequestPayloadLimitBytes) { + CountersInvalid = true; + return {true, newPayloadSize, newSerializedRequestSize}; + } + + { + // add item to the real request + auto& batch = *Target.MutableDataBatch(); + batch.AddSeqNo(seqNo); + batch.AddTimestamp(message.Message.Timestamp->MicroSeconds()); + batch.AddPayload(message.Message.Payload); + if (message.Message.Meta.Defined()) { + for (const auto &m: *message.Message.Meta) { + TMetaItemBuilder *metaItemBuilder; + { + auto it = MetaItems.find(m.first); + if (it == MetaItems.end()) { + batch.AddMeta()->SetKey(m.first); + auto insertResult = MetaItems.insert({m.first, {batch.MetaSize() - 1}}); + Y_VERIFY(insertResult.second); + metaItemBuilder = &insertResult.first->second; + } else { + metaItemBuilder = &it->second; + } + } + auto *metaItem = batch.MutableMeta(metaItemBuilder->ItemIndex); + metaItem->AddValue(m.second); + const auto index = batch.SeqNoSize() - 1; + if (metaItemBuilder->ValueIndex != index) { + metaItem->AddSkipStart(metaItemBuilder->ValueIndex); + metaItem->AddSkipLength(index - metaItemBuilder->ValueIndex); + } + metaItemBuilder->ValueIndex = index + 1; + } + } + SerializedRequestSize = newSerializedRequestSize; + RequestPayloadSize = newPayloadSize; + } + + return {false, newPayloadSize, newSerializedRequestSize}; + } + + void TClientSession::PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target) { + Y_VERIFY(AckSeqNo.Defined()); + TRequestBuilder requestBuilder(target, Client->GetParameters().GrpcMaxMessageSize, AgentMaxReceiveMessage); + const auto startIndex = NextIndex - TrimmedCount; + for (size_t i = startIndex; i < WriteQueue.size(); ++i) { + auto& queueItem = WriteQueue[i]; + if (queueItem.Skipped) { + NextIndex++; + continue; + } + + const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1); + const auto serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0; + if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) { + YLOG_ERR(Sprintf("single serialized message is too large [%lu] > [%lu], dropping it", + addResult.NewSerializedRequestSize, serializedLimitToLog)); + queueItem.Skipped = true; + ++Counters->DroppedMessages; + Counters->DroppedBytes += queueItem.Size; + ++Counters->ErrorsCount; + NextIndex++; + requestBuilder.ResetCounters(); + continue; + } + if (addResult.LimitExceeded) { + YLOG_DEBUG(Sprintf( + "batch limit exceeded: [%lu] > [%lu] (limit for serialized batch)" + "OR [%lu] > [%lu] (limit for raw batch)", + addResult.NewSerializedRequestSize, serializedLimitToLog, + addResult.NewRequestPayloadSize, Client->GetParameters().GrpcMaxMessageSize)); + break; + } + + NextIndex++; + } + const auto messagesCount = target.GetDataBatch().SeqNoSize(); + if (messagesCount == 0) { + return; + } + Y_VERIFY(requestBuilder.GetSerializedRequestSize() == target.ByteSizeLong(), + "failed to calculate size for message [%s]", target.ShortDebugString().c_str()); + GrpcInflightMessages += messagesCount; + GrpcInflightBytes += requestBuilder.GetRequestPayloadSize(); + YLOG_DEBUG(Sprintf("new write batch, [%lu] messages, [%lu] bytes, first seq_no [%lu], serialized size [%lu]", + messagesCount, requestBuilder.GetRequestPayloadSize(), + *target.GetDataBatch().GetSeqNo().begin(), requestBuilder.GetSerializedRequestSize())); + ++Counters->GrpcWriteBatchRequests; + Counters->GrpcInflightMessages += messagesCount; + Counters->GrpcInflightBytes += requestBuilder.GetRequestPayloadSize(); + } + + void TClientSession::Acknowledge(ui64 seqNo) { + size_t messagesCount = 0; + size_t bytesCount = 0; + size_t skippedMessagesCount = 0; + size_t skippedBytesCount = 0; + + if (AckSeqNo.Defined()) { + while (!WriteQueue.empty() && ((*AckSeqNo < seqNo) || WriteQueue.front().Skipped)) { + if (WriteQueue.front().Skipped) { + skippedMessagesCount++; + skippedBytesCount += WriteQueue.front().Size; + } else { + ++messagesCount; + bytesCount += WriteQueue.front().Size; + } + ++(*AckSeqNo); + WriteQueue.pop_front(); + ++TrimmedCount; + } + } + if (!AckSeqNo.Defined() || seqNo > *AckSeqNo) { + AckSeqNo = seqNo; + } + + Counters->AcknowledgedMessages += messagesCount; + Counters->AcknowledgedBytes += bytesCount; + Counters->InflightMessages -= (messagesCount + skippedMessagesCount); + Counters->InflightBytes -= (bytesCount + skippedBytesCount); + InflightBytes.fetch_sub(bytesCount); + Counters->GrpcInflightMessages -= messagesCount; + Counters->GrpcInflightBytes -= bytesCount; + GrpcInflightMessages -= messagesCount; + GrpcInflightBytes -= bytesCount; + + YLOG_DEBUG(Sprintf("ack [%lu], [%lu] messages, [%lu] bytes", seqNo, messagesCount, bytesCount)); + } + + void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) { + SessionId = sessionId; + Acknowledge(lastSeqNo); + NextIndex = TrimmedCount; + ++Counters->GrpcCallsInitialized; + Counters->GrpcInflightMessages -= GrpcInflightMessages; + Counters->GrpcInflightBytes -= GrpcInflightBytes; + GrpcInflightMessages = 0; + GrpcInflightBytes = 0; + YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%lu]", + sessionId.c_str(), lastSeqNo)); + } + + void TClientSession::OnGrpcCallFinished() { + Y_VERIFY(!Closed); + Y_VERIFY(ActiveGrpcCall); + ActiveGrpcCall = nullptr; + if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) { + DoClose(); + } else { + const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay; + MakeGrpcCallTimer->Set(reconnectTime); + YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str())); + } + } + + auto TClientSession::PurgeWriteQueue() -> TPurgeWriteQueueStats { + size_t bytesCount = 0; + for (const auto& m: WriteQueue) { + bytesCount += m.Size; + } + auto result = TPurgeWriteQueueStats{WriteQueue.size(), bytesCount}; + + Counters->DroppedMessages += WriteQueue.size(); + Counters->DroppedBytes += bytesCount; + Counters->InflightMessages -= WriteQueue.size(); + Counters->InflightBytes -= bytesCount; + Counters->GrpcInflightMessages -= GrpcInflightMessages; + Counters->GrpcInflightBytes -= GrpcInflightBytes; + + InflightBytes.fetch_sub(bytesCount); + GrpcInflightMessages = 0; + GrpcInflightBytes = 0; + WriteQueue.clear(); + + return result; + } + + void TClientSession::DoClose() { + Y_VERIFY(CloseStarted); + Y_VERIFY(!Closed); + Y_VERIFY(!ClosePromise.HasValue()); + MakeGrpcCallTimer->Cancel(); + ForceCloseTimer->Cancel(); + PollTimer->Cancel(); + if (!ForkInProgressLocal && WriteQueue.size() > 0) { + const auto stats = PurgeWriteQueue(); + ++Counters->ErrorsCount; + YLOG_ERR(Sprintf("DoClose, dropped [%lu] messages, [%lu] bytes", + stats.PurgedMessages, stats.PurgedBytes)); + } + --Client->GetCounters()->ActiveSessionsCount; + Closed = true; + ClosePromise.SetValue(); + YLOG_INFO("session closed"); + } + + TGrpcCall::TGrpcCall(TClientSession& session) + : Session(session) + , AsyncJoinerToken(&Session.GetAsyncJoiner()) + , AcceptTag(MakeIOCallback(this, &TGrpcCall::EndAccept)) + , ReadTag(MakeIOCallback(this, &TGrpcCall::EndRead)) + , WriteTag(MakeIOCallback(this, &TGrpcCall::EndWrite)) + , WritesDoneTag(MakeIOCallback(this, &TGrpcCall::EndWritesDone)) + , FinishTag(MakeIOCallback(this, &TGrpcCall::EndFinish)) + , Logger(session.GetLogger().Child("grpc")) + , AcceptPending(false) + , Initialized_(false) + , ReadPending(false) + , ReadsDone(false) + , WritePending(false) + , WritesBlocked(false) + , WritesDonePending(false) + , WritesDone(false) + , ErrorOccured(false) + , FinishRequested(false) + , FinishStarted(false) + , FinishDone(false) + , Cancelled(false) + , Poisoned(false) + , PoisonPillSent(false) + , ReuseSessions_(false) + , FinishStatus() + , ClientContext() + , ReaderWriter(nullptr) + , Request() + , Response() + { + } + + void TGrpcCall::Start() { + AcceptPending = true; + auto& client = Session.GetClient(); + ReaderWriter = client.GetStub().AsyncSession(&ClientContext, + &client.GetCompletionQueue(), + AcceptTag->Ref()); + YLOG_INFO("AsyncSession started"); + } + + TGrpcCall::~TGrpcCall() { + YLOG_INFO("destroyed"); + } + + void TGrpcCall::EnsureFinishStarted() { + if (!FinishStarted) { + FinishStarted = true; + ReaderWriter->Finish(&FinishStatus, FinishTag->Ref()); + YLOG_INFO("Finish started"); + } + } + + bool TGrpcCall::CheckHasError(EIOStatus status, const char* method) { + if (status == EIOStatus::Error) { + SetError(Sprintf("%s %s", method, ToString(status).c_str())); + return true; + } + if (ErrorOccured) { + ScheduleFinishOnError(); + return true; + } + return false; + } + + void TGrpcCall::SetError(const TString& error) { + if (!Cancelled) { + YLOG_ERR(error); + ++Session.GetCounters().ErrorsCount; + } + ErrorOccured = true; + ScheduleFinishOnError(); + } + + void TGrpcCall::ScheduleFinishOnError() { + if (!AcceptPending && !WritePending && !WritesDonePending) { + EnsureFinishStarted(); + } + } + + void TGrpcCall::BeginClose(bool force) { + if (force) { + if (!Cancelled) { + Cancelled = true; + ClientContext.TryCancel(); + SetError("forced close"); + } + return; + } + YLOG_INFO(Sprintf("Close Initialized [%d], AcceptPending [%d], " + "WritePending [%d], FinishRequested [%d], " + "ErrorOccured [%d]", + static_cast<int>(Initialized_), + static_cast<int>(AcceptPending), + static_cast<int>(WritePending), + static_cast<int>(FinishRequested), + static_cast<int>(ErrorOccured))); + if (ErrorOccured || FinishRequested) { + return; + } + FinishRequested = true; + if (!Initialized_ || WritePending) { + return; + } + WritesBlocked = true; + BeginWritesDone(); + } + + void TGrpcCall::Poison() { + Poisoned = true; + NotifyMessageAdded(); + } + + void TGrpcCall::NotifyMessageAdded() { + if (WritePending || !Initialized_ || ErrorOccured || FinishRequested) { + return; + } + ScheduleWrite(); + } + + void TGrpcCall::ScheduleWrite() { + Request.Clear(); + if (!Poisoned) { + Session.PrepareWriteBatchRequest(Request); + } else if (!PoisonPillSent) { + PoisonPillSent = true; + auto& batch = *Request.mutable_data_batch(); + batch.AddSeqNo(std::numeric_limits<::google::protobuf::uint64>::max()); + batch.AddTimestamp(Now().MicroSeconds()); + batch.AddPayload(""); + YLOG_INFO("poison pill sent"); + } + if (Request.GetDataBatch().GetSeqNo().empty()) { + if (FinishRequested) { + WritesBlocked = true; + BeginWritesDone(); + } + return; + } + + BeginWrite(); + } + + void TGrpcCall::EndAccept(EIOStatus status) { + Y_VERIFY(AcceptPending); + AcceptPending = false; + if (CheckHasError(status, "EndAccept")) { + return; + } + BeginRead(); + Request.Clear(); + Session.PrepareInitializeRequest(Request); + BeginWrite(); + } + + void TGrpcCall::EndRead(EIOStatus status) { + ReadPending = false; + if (FinishDone) { + Session.OnGrpcCallFinished(); + return; + } + if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) { + Y_VERIFY(!WritePending); + YLOG_INFO("EndRead ReadsDone"); + ReadsDone = true; + if (WritesDone) { + EnsureFinishStarted(); + return; + } + return; + } + if (CheckHasError(status, "EndRead")) { + return; + } + if (!Initialized_) { + const auto metadata = ClientContext.GetServerInitialMetadata(); + { + const auto it = metadata.find("ua-reuse-sessions"); + if (it != metadata.end() && it->second == "true") { + ReuseSessions_ = true; + } + } + { + const auto it = metadata.find("ua-max-receive-message-size"); + if (it != metadata.end()) { + Session.SetAgentMaxReceiveMessage(FromString<size_t>(TString{it->second.begin(), it->second.end()})); + } + } + + if (Response.response_case() != NUnifiedAgentProto::Response::kInitialized) { + SetError(Sprintf("EndRead while initializing, unexpected response_case [%d]", + static_cast<int>(Response.response_case()))); + return; + } + Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(), + Response.GetInitialized().GetLastSeqNo()); + Initialized_ = true; + if (!WritePending) { + ScheduleWrite(); + } + } else { + if (Response.response_case() != NUnifiedAgentProto::Response::kAck) { + SetError(Sprintf("EndRead unexpected response_case [%d]", + static_cast<int>(Response.response_case()))); + return; + } + Session.Acknowledge(Response.GetAck().GetSeqNo()); + } + BeginRead(); + } + + void TGrpcCall::EndWrite(EIOStatus status) { + WritePending = false; + if (CheckHasError(status, "EndWrite")) { + return; + } + if (!Initialized_) { + return; + } + ScheduleWrite(); + } + + void TGrpcCall::EndFinish(EIOStatus status) { + FinishDone = true; + const auto finishStatus = status == EIOStatus::Error + ? grpc::Status(grpc::UNKNOWN, "finish error") + : FinishStatus; + YLOG(finishStatus.ok() || Cancelled || Poisoned ? TLOG_INFO : TLOG_ERR, + Sprintf("EndFinish, code [%s], message [%s]", + ToString(finishStatus.error_code()).c_str(), + finishStatus.error_message().c_str()), + Logger); + if (!finishStatus.ok() && !Cancelled) { + ++Session.GetCounters().ErrorsCount; + } + if (!ReadPending) { + Session.OnGrpcCallFinished(); + } + } + + void TGrpcCall::EndWritesDone(EIOStatus status) { + YLOG_INFO(Sprintf("EndWritesDone [%s]", ToString(status).c_str())); + Y_VERIFY(!WritePending && !WritesDone && WritesDonePending); + WritesDonePending = false; + WritesDone = true; + if (CheckHasError(status, "EndWriteDone")) { + return; + } + if (ReadsDone) { + EnsureFinishStarted(); + } + } + + void TGrpcCall::BeginWritesDone() { + WritesDonePending = true; + ReaderWriter->WritesDone(WritesDoneTag->Ref()); + YLOG_INFO("WritesDone started"); + } + + void TGrpcCall::BeginRead() { + ReadPending = true; + Response.Clear(); + ReaderWriter->Read(&Response, ReadTag->Ref()); + YLOG_DEBUG("Read started"); + } + + void TGrpcCall::BeginWrite() { + WritePending = true; + ReaderWriter->Write(Request, WriteTag->Ref()); + YLOG_DEBUG("Write started"); + } +} + +namespace NUnifiedAgent { + size_t SizeOf(const TClientMessage& message) { + auto result = message.Payload.Size() + sizeof(TInstant); + if (message.Meta.Defined()) { + for (const auto& m: *message.Meta) { + result += m.first.Size() + m.second.Size(); + } + } + return result; + } + + TClientParameters::TClientParameters(const TString& uri) + : Uri(uri) + , SharedSecretKey(Nothing()) + , MaxInflightBytes(DefaultMaxInflightBytes) + , Log(TLoggerOperator<TGlobalLog>::Log()) + , LogRateLimitBytes(Nothing()) + , GrpcReconnectDelay(TDuration::MilliSeconds(50)) + , GrpcSendDelay(DefaultGrpcSendDelay) + , EnableForkSupport(false) + , GrpcMaxMessageSize(DefaultGrpcMaxMessageSize) + , Counters(nullptr) + { + } + + TSessionParameters::TSessionParameters() + : SessionId(Nothing()) + , Meta(Nothing()) + , Counters(nullptr) + , MaxInflightBytes() + { + } + + const size_t TClientParameters::DefaultMaxInflightBytes = 10_MB; + const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB; + const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10); + + TClientPtr MakeClient(const TClientParameters& parameters) { + if (!grpc_is_initialized()) { + EnsureGrpcConfigured(); + } + + std::shared_ptr<NPrivate::TForkProtector> forkProtector{}; +#ifdef _unix_ + if (parameters.EnableForkSupport) { + forkProtector = NPrivate::TForkProtector::Get(true); + } +#endif + return MakeIntrusive<NPrivate::TClient>(parameters, forkProtector); + } +} diff --git a/library/cpp/unified_agent_client/client_impl.h b/library/cpp/unified_agent_client/client_impl.h new file mode 100644 index 0000000000..6adadf92e3 --- /dev/null +++ b/library/cpp/unified_agent_client/client_impl.h @@ -0,0 +1,364 @@ +#pragma once + +#include <library/cpp/unified_agent_client/client.h> +#include <library/cpp/unified_agent_client/client_proto_weighing.h> +#include <library/cpp/unified_agent_client/counters.h> +#include <library/cpp/unified_agent_client/logger.h> +#include <library/cpp/unified_agent_client/variant.h> +#include <library/cpp/unified_agent_client/proto/unified_agent.grpc.pb.h> +#include <library/cpp/unified_agent_client/grpc_io.h> + +#include <library/cpp/logger/global/global.h> + +#include <util/generic/deque.h> +#include <util/system/mutex.h> + +namespace NUnifiedAgent::NPrivate { + class TClientSession; + class TGrpcCall; + class TForkProtector; + + class TClient: public IClient { + public: + explicit TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector); + + ~TClient() override; + + TClientSessionPtr CreateSession(const TSessionParameters& parameters) override; + + void StartTracing(ELogPriority logPriority) override; + + void FinishTracing() override; + + inline const TIntrusivePtr<TClientCounters>& GetCounters() const noexcept { + return Counters; + } + + inline NUnifiedAgentProto::UnifiedAgentService::Stub& GetStub() noexcept { + return *Stub; + } + + TScopeLogger CreateSessionLogger(); + + inline const TClientParameters& GetParameters() const noexcept { + return Parameters; + } + + inline grpc::CompletionQueue& GetCompletionQueue() noexcept { + return ActiveCompletionQueue->GetCompletionQueue(); + } + + void RegisterSession(TClientSession* session); + + void UnregisterSession(TClientSession* session); + + void PreFork(); + + void PostForkParent(); + + void PostForkChild(); + + void EnsureStarted(); + + private: + void EnsureStartedNoLock(); + + void EnsureStoppedNoLock(); + + private: + const TClientParameters Parameters; + std::shared_ptr<TForkProtector> ForkProtector; + TIntrusivePtr<TClientCounters> Counters; + TLog Log; + TLogger MainLogger; + TScopeLogger Logger; + std::shared_ptr<grpc::Channel> Channel; + std::unique_ptr<NUnifiedAgentProto::UnifiedAgentService::Stub> Stub; + THolder<TGrpcCompletionQueueHost> ActiveCompletionQueue; + std::atomic<size_t> SessionLogLabel; + TVector<TClientSession*> ActiveSessions; + bool Started; + bool Destroyed; + TAdaptiveLock Lock; + static std::atomic<ui64> Id; + }; + + class TForkProtector { + public: + TForkProtector(); + + void Register(TClient& client); + + void Unregister(TClient& client); + + static std::shared_ptr<TForkProtector> Get(bool createIfNotExists); + + private: + static void PreFork(); + + static void PostForkParent(); + + static void PostForkChild(); + + private: + TVector<TClient*> Clients; + grpc::GrpcLibraryCodegen GrpcInitializer; + bool Enabled; + TAdaptiveLock Lock; + + static std::weak_ptr<TForkProtector> Instance; + static TMutex InstanceLock; + static bool SubscribedToForks; + }; + + class TClientSession: public IClientSession { + public: + TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters); + + ~TClientSession(); + + void Send(TClientMessage&& message) override; + + NThreading::TFuture<void> CloseAsync(TInstant deadline) override; + + inline TClient& GetClient() noexcept { + return *Client; + } + + inline TScopeLogger& GetLogger() noexcept { + return Logger; + } + + inline TClientSessionCounters& GetCounters() noexcept { + return *Counters; + } + + inline TAsyncJoiner& GetAsyncJoiner() noexcept { + return AsyncJoiner; + } + + void PrepareInitializeRequest(NUnifiedAgentProto::Request& target); + + void PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target); + + void Acknowledge(ui64 seqNo); + + void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo); + + void OnGrpcCallFinished(); + + NThreading::TFuture<void> PreFork(); + + void PostForkParent(); + + void PostForkChild(); + + void SetAgentMaxReceiveMessage(size_t); + + private: + enum class EPollingStatus { + Active, + Inactive + }; + + struct TCloseRequestedEvent { + TInstant Deadline; + }; + + struct TMessageReceivedEvent { + TClientMessage Message; + size_t Size; + }; + + struct TPurgeWriteQueueStats { + size_t PurgedMessages{}; + size_t PurgedBytes{}; + }; + + using TEvent = std::variant<TCloseRequestedEvent, TMessageReceivedEvent>; + + public: + struct TPendingMessage { + TClientMessage Message; + size_t Size; + bool Skipped; + }; + + class TRequestBuilder { + public: + struct TAddResult; + + public: + TRequestBuilder(NUnifiedAgentProto::Request &target, size_t RequestPayloadLimitBytes, + TFMaybe<size_t> serializedRequestLimitBytes); + + TAddResult TryAddMessage(const TPendingMessage& message, size_t seqNo); + + void ResetCounters(); + + inline size_t GetSerializedRequestSize() const { + return SerializedRequestSize; + } + + inline size_t GetRequestPayloadSize() const { + return RequestPayloadSize; + } + + public: + struct TAddResult { + bool LimitExceeded; + size_t NewRequestPayloadSize; // == actual value, if !LimitExceeded + size_t NewSerializedRequestSize; // == actual value, if !LimitExceeded + }; + + private: + struct TMetaItemBuilder { + size_t ItemIndex; + size_t ValueIndex{0}; + }; + + private: + NUnifiedAgentProto::Request& Target; + TFMaybe<NPW::TRequest> PwTarget; + THashMap<TString, TMetaItemBuilder> MetaItems; + size_t RequestPayloadSize; + size_t RequestPayloadLimitBytes; + size_t SerializedRequestSize; + TFMaybe<size_t> SerializedRequestLimitBytes; + bool CountersInvalid; + }; + + private: + void MakeGrpcCall(); + + void DoClose(); + + void BeginClose(TInstant deadline); + + void Poll(); + + TPurgeWriteQueueStats PurgeWriteQueue(); + + void DoStart(); + + private: + TAsyncJoiner AsyncJoiner; + TIntrusivePtr<TClient> Client; + TFMaybe<TString> OriginalSessionId; + TFMaybe<TString> SessionId; + TFMaybe<THashMap<TString, TString>> Meta; + TScopeLogger Logger; + bool CloseStarted; + bool ForcedCloseStarted; + bool Closed; + bool ForkInProgressLocal; + bool Started; + NThreading::TPromise<void> ClosePromise; + TIntrusivePtr<TGrpcCall> ActiveGrpcCall; + TDeque<TPendingMessage> WriteQueue; + size_t TrimmedCount; + size_t NextIndex; + TFMaybe<ui64> AckSeqNo; + TInstant PollerLastEventTimestamp; + TIntrusivePtr<TClientSessionCounters> Counters; + THolder<TGrpcTimer> MakeGrpcCallTimer; + THolder<TGrpcTimer> ForceCloseTimer; + THolder<TGrpcTimer> PollTimer; + ui64 GrpcInflightMessages; + ui64 GrpcInflightBytes; + + std::atomic<size_t> InflightBytes; + bool CloseRequested; + size_t EventsBatchSize; + EPollingStatus PollingStatus; + THolder<TGrpcNotification> EventNotification; + bool EventNotificationTriggered; + TVector<TEvent> EventsBatch; + TVector<TEvent> SecondaryEventsBatch; + std::atomic<bool> ForkInProgress; + TAdaptiveLock Lock; + size_t MaxInflightBytes; + TFMaybe<size_t> AgentMaxReceiveMessage; + }; + + class TGrpcCall final: public TAtomicRefCount<TGrpcCall> { + public: + explicit TGrpcCall(TClientSession& session); + + void Start(); + + ~TGrpcCall(); + + void BeginClose(bool force); + + void Poison(); + + void NotifyMessageAdded(); + + inline bool Initialized() const { + return Initialized_; + } + + inline bool ReuseSessions() const { + return ReuseSessions_; + } + + private: + void EndAccept(EIOStatus status); + + void EndRead(EIOStatus status); + + void EndWrite(EIOStatus status); + + void EndFinish(EIOStatus status); + + void EndWritesDone(EIOStatus); + + void ScheduleWrite(); + + void BeginWritesDone(); + + bool CheckHasError(EIOStatus status, const char* method); + + void SetError(const TString& error); + + void EnsureFinishStarted(); + + void BeginRead(); + + void BeginWrite(); + + void ScheduleFinishOnError(); + + private: + TClientSession& Session; + TAsyncJoinerToken AsyncJoinerToken; + THolder<IIOCallback> AcceptTag; + THolder<IIOCallback> ReadTag; + THolder<IIOCallback> WriteTag; + THolder<IIOCallback> WritesDoneTag; + THolder<IIOCallback> FinishTag; + TScopeLogger Logger; + bool AcceptPending; + bool Initialized_; + bool ReadPending; + bool ReadsDone; + bool WritePending; + bool WritesBlocked; + bool WritesDonePending; + bool WritesDone; + bool ErrorOccured; + bool FinishRequested; + bool FinishStarted; + bool FinishDone; + bool Cancelled; + bool Poisoned; + bool PoisonPillSent; + bool ReuseSessions_; + grpc::Status FinishStatus; + grpc::ClientContext ClientContext; + std::unique_ptr<grpc::ClientAsyncReaderWriter<NUnifiedAgentProto::Request, NUnifiedAgentProto::Response>> ReaderWriter; + NUnifiedAgentProto::Request Request; + NUnifiedAgentProto::Response Response; + }; +} diff --git a/library/cpp/unified_agent_client/client_proto_weighing.h b/library/cpp/unified_agent_client/client_proto_weighing.h new file mode 100644 index 0000000000..728792e49f --- /dev/null +++ b/library/cpp/unified_agent_client/client_proto_weighing.h @@ -0,0 +1,75 @@ +#pragma once + +#include <library/cpp/unified_agent_client/f_maybe.h> +#include <library/cpp/unified_agent_client/proto_weighing.h> + +namespace NUnifiedAgent::NPW { + struct TMessageMetaItem: public TMessage { + TMessageMetaItem() + : TMessage() + , Key(this) + , Value(this) + , SkipStart(this) + , SkipLength(this) + { + } + + explicit TMessageMetaItem(TMessage* parent) + : TMessage(parent) + , Key(this) + , Value(this) + , SkipStart(this) + , SkipLength(this) + { + } + + explicit TMessageMetaItem(const NUnifiedAgent::TFMaybe<TFieldLink>& link) + : TMessage(link) + , Key(this) + , Value(this) + , SkipStart(this) + , SkipLength(this) + { + } + + TStringField Key; + TRepeatedPtrField<TStringField> Value; + TRepeatedField<ui32> SkipStart; + TRepeatedField<ui32> SkipLength; + }; + + struct TDataBatch: public TMessage { + TDataBatch() + : TMessage() + , SeqNo(this) + , Timestamp(this) + , Payload(this, 2) + , Meta(this, 2) + { + } + + TDataBatch(TMessage* parent) + : TMessage(parent) + , SeqNo(this) + , Timestamp(this) + , Payload(this, 2) + , Meta(this, 2) + { + } + + TRepeatedField<ui64> SeqNo; // 1 + TRepeatedField<ui64> Timestamp; // 2 + TRepeatedPtrField<TStringField> Payload; // 100 + TRepeatedPtrField<TMessageMetaItem> Meta; // 101 + }; + + struct TRequest: public TMessage { + TRequest() + : TMessage() + , DataBatch(this) + { + } + + TDataBatch DataBatch; + }; +} diff --git a/library/cpp/unified_agent_client/clock.cpp b/library/cpp/unified_agent_client/clock.cpp new file mode 100644 index 0000000000..192c998a02 --- /dev/null +++ b/library/cpp/unified_agent_client/clock.cpp @@ -0,0 +1,48 @@ +#include "clock.h" + +namespace NUnifiedAgent { + void TClock::Configure() { + Y_VERIFY(!Configured_); + + Configured_ = true; + } + + void TClock::SetBase(TInstant value) { + Y_VERIFY(Configured_); + + Base_.store(value.GetValue()); + } + + void TClock::ResetBase() { + Base_.store(0); + } + + void TClock::ResetBaseWithShift() { + Y_VERIFY(Configured_); + + Shift_.store(static_cast<i64>(Base_.exchange(0)) - static_cast<i64>(::Now().GetValue())); + } + + void TClock::SetShift(TDuration value) { + Y_VERIFY(Configured_); + + Shift_.fetch_add(value.GetValue()); + } + + void TClock::ResetShift() { + Shift_.store(0); + } + + TInstant TClock::Get() { + auto base = Base_.load(); + if (base == 0) { + base = ::Now().GetValue(); + } + base += Shift_.load(); + return TInstant::FromValue(base); + } + + bool TClock::Configured_{false}; + std::atomic<ui64> TClock::Base_{0}; + std::atomic<i64> TClock::Shift_{0}; +} diff --git a/library/cpp/unified_agent_client/clock.h b/library/cpp/unified_agent_client/clock.h new file mode 100644 index 0000000000..77ff44583e --- /dev/null +++ b/library/cpp/unified_agent_client/clock.h @@ -0,0 +1,37 @@ +#pragma once + +#include <util/datetime/base.h> + +#include <atomic> + +namespace NUnifiedAgent { + class TClock { + public: + static void Configure(); + + static inline bool Configured() { + return Configured_; + } + + static inline TInstant Now() { + return Configured_ ? Get() : TInstant::Now(); + } + + static void SetBase(TInstant value); + + static void ResetBase(); + + static void ResetBaseWithShift(); + + static void SetShift(TDuration value); + + static void ResetShift(); + + static TInstant Get(); + + private: + static bool Configured_; + static std::atomic<ui64> Base_; + static std::atomic<i64> Shift_; + }; +} diff --git a/library/cpp/unified_agent_client/counters.cpp b/library/cpp/unified_agent_client/counters.cpp new file mode 100644 index 0000000000..776a86ec4e --- /dev/null +++ b/library/cpp/unified_agent_client/counters.cpp @@ -0,0 +1,36 @@ +#include "counters.h" + +using namespace NMonitoring; + +namespace NUnifiedAgent { + TClientCounters::TClientCounters(const NMonitoring::TDynamicCounterPtr& counters) + : TDynamicCountersWrapper(counters) + , ActiveSessionsCount(GetCounter("ActiveSessionsCount", false)) + , ClientLogDroppedBytes(GetCounter("ClientLogDroppedBytes", true)) + { + } + + TIntrusivePtr<TClientSessionCounters> TClientCounters::GetDefaultSessionCounters() { + auto group = Unwrap()->GetSubgroup("session", "default"); + return MakeIntrusive<TClientSessionCounters>(group); + } + + TClientSessionCounters::TClientSessionCounters(const NMonitoring::TDynamicCounterPtr& counters) + : TDynamicCountersWrapper(counters) + , ReceivedMessages(GetCounter("ReceivedMessages", true)) + , ReceivedBytes(GetCounter("ReceivedBytes", true)) + , AcknowledgedMessages(GetCounter("AcknowledgedMessages", true)) + , AcknowledgedBytes(GetCounter("AcknowledgedBytes", true)) + , InflightMessages(GetCounter("InflightMessages", false)) + , InflightBytes(GetCounter("InflightBytes", false)) + , GrpcWriteBatchRequests(GetCounter("GrpcWriteBatchRequests", true)) + , GrpcInflightMessages(GetCounter("GrpcInflightMessages", false)) + , GrpcInflightBytes(GetCounter("GrpcInflightBytes", false)) + , GrpcCalls(GetCounter("GrpcCalls", true)) + , GrpcCallsInitialized(GetCounter("GrpcCallsInitialized", true)) + , DroppedMessages(GetCounter("DroppedMessages", true)) + , DroppedBytes(GetCounter("DroppedBytes", true)) + , ErrorsCount(GetCounter("ErrorsCount", true)) + { + } +} diff --git a/library/cpp/unified_agent_client/counters.h b/library/cpp/unified_agent_client/counters.h new file mode 100644 index 0000000000..3c2192c3c5 --- /dev/null +++ b/library/cpp/unified_agent_client/counters.h @@ -0,0 +1,38 @@ +#pragma once + +#include <library/cpp/unified_agent_client/dynamic_counters_wrapper.h> + +namespace NUnifiedAgent { + struct TClientSessionCounters; + + struct TClientCounters: public TDynamicCountersWrapper { + explicit TClientCounters(const NMonitoring::TDynamicCounterPtr& counters = + MakeIntrusive<NMonitoring::TDynamicCounters>()); + + NMonitoring::TDeprecatedCounter& ActiveSessionsCount; + NMonitoring::TDeprecatedCounter& ClientLogDroppedBytes; + + public: + TIntrusivePtr<TClientSessionCounters> GetDefaultSessionCounters(); + }; + + struct TClientSessionCounters: public TDynamicCountersWrapper { + explicit TClientSessionCounters(const NMonitoring::TDynamicCounterPtr& counters = + MakeIntrusive<NMonitoring::TDynamicCounters>()); + + NMonitoring::TDeprecatedCounter& ReceivedMessages; + NMonitoring::TDeprecatedCounter& ReceivedBytes; + NMonitoring::TDeprecatedCounter& AcknowledgedMessages; + NMonitoring::TDeprecatedCounter& AcknowledgedBytes; + NMonitoring::TDeprecatedCounter& InflightMessages; + NMonitoring::TDeprecatedCounter& InflightBytes; + NMonitoring::TDeprecatedCounter& GrpcWriteBatchRequests; + NMonitoring::TDeprecatedCounter& GrpcInflightMessages; + NMonitoring::TDeprecatedCounter& GrpcInflightBytes; + NMonitoring::TDeprecatedCounter& GrpcCalls; + NMonitoring::TDeprecatedCounter& GrpcCallsInitialized; + NMonitoring::TDeprecatedCounter& DroppedMessages; + NMonitoring::TDeprecatedCounter& DroppedBytes; + NMonitoring::TDeprecatedCounter& ErrorsCount; + }; +} diff --git a/library/cpp/unified_agent_client/duration_counter.cpp b/library/cpp/unified_agent_client/duration_counter.cpp new file mode 100644 index 0000000000..118778a226 --- /dev/null +++ b/library/cpp/unified_agent_client/duration_counter.cpp @@ -0,0 +1,41 @@ +#include "duration_counter.h" + +namespace NUnifiedAgent { + using namespace NMonitoring; + + TDurationUsCounter::TDurationUsCounter(const TString& name, TDynamicCounters& owner) + : Counter(*owner.GetCounter(name, true)) + , ActiveTimers() + , Lock() + { + } + + NHPTimer::STime* TDurationUsCounter::Begin() { + with_lock (Lock) { + ActiveTimers.push_back(0); + auto& result = ActiveTimers.back(); + NHPTimer::GetTime(&result); + return &result; + } + } + + void TDurationUsCounter::End(NHPTimer::STime* startTime) { + with_lock (Lock) { + Counter += static_cast<ui64>(NHPTimer::GetTimePassed(startTime) * 1000000); + *startTime = 0; + while (!ActiveTimers.empty() && ActiveTimers.front() == 0) { + ActiveTimers.pop_front(); + } + } + } + + void TDurationUsCounter::Update() { + with_lock (Lock) { + for (auto& startTime : ActiveTimers) { + if (startTime != 0) { + Counter += static_cast<ui64>(NHPTimer::GetTimePassed(&startTime) * 1000000); + } + } + } + } +} diff --git a/library/cpp/unified_agent_client/duration_counter.h b/library/cpp/unified_agent_client/duration_counter.h new file mode 100644 index 0000000000..dbdfc22ed4 --- /dev/null +++ b/library/cpp/unified_agent_client/duration_counter.h @@ -0,0 +1,43 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/generic/deque.h> +#include <util/system/hp_timer.h> +#include <util/system/mutex.h> + +namespace NUnifiedAgent { + class TDurationUsCounter { + public: + class TScope { + public: + TScope(TDurationUsCounter& counter) + : Counter(counter) + , StartTime(Counter.Begin()) + { + } + + ~TScope() { + Counter.End(StartTime); + } + + private: + TDurationUsCounter& Counter; + NHPTimer::STime* StartTime; + }; + + public: + TDurationUsCounter(const TString& name, NMonitoring::TDynamicCounters& owner); + + NHPTimer::STime* Begin(); + + void End(NHPTimer::STime* startTime); + + void Update(); + + private: + NMonitoring::TDeprecatedCounter& Counter; + TDeque<NHPTimer::STime> ActiveTimers; + TAdaptiveLock Lock; + }; +} diff --git a/library/cpp/unified_agent_client/dynamic_counters_wrapper.h b/library/cpp/unified_agent_client/dynamic_counters_wrapper.h new file mode 100644 index 0000000000..cac4c6813d --- /dev/null +++ b/library/cpp/unified_agent_client/dynamic_counters_wrapper.h @@ -0,0 +1,34 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NUnifiedAgent { + class TDynamicCountersWrapper: public TAtomicRefCount<TDynamicCountersWrapper> { + public: + explicit TDynamicCountersWrapper(const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters) + : Counters(counters) + { + } + + virtual ~TDynamicCountersWrapper() = default; + + const TIntrusivePtr<NMonitoring::TDynamicCounters>& Unwrap() const { + return Counters; + } + + protected: + NMonitoring::TDeprecatedCounter& GetCounter(const TString& value, bool derivative) { + return *Counters->GetCounter(value, derivative); + } + + private: + TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; + }; + + class TUpdatableCounters: public TDynamicCountersWrapper { + public: + using TDynamicCountersWrapper::TDynamicCountersWrapper; + + virtual void Update() = 0; + }; +} diff --git a/library/cpp/unified_agent_client/enum.h b/library/cpp/unified_agent_client/enum.h new file mode 100644 index 0000000000..b21e21acb0 --- /dev/null +++ b/library/cpp/unified_agent_client/enum.h @@ -0,0 +1,30 @@ +#pragma once + +#include <util/generic/serialized_enum.h> +#include <util/generic/vector.h> + +namespace NUnifiedAgent { + namespace NPrivate { + using TEnumNames = TVector<const TString*>; + + template <typename TEnum> + TEnumNames BuildEnumNames() { + const auto names = GetEnumNames<TEnum>(); + auto result = TEnumNames(names.size()); + size_t index = 0; + for (const auto& p: names) { + Y_VERIFY(static_cast<size_t>(p.first) == index); + result[index++] = &p.second; + } + return result; + } + + template <typename TEnum> + inline const auto EnumNames = BuildEnumNames<TEnum>(); + } + + template <typename TEnum, typename = std::enable_if_t<std::is_enum_v<TEnum>>> + inline const TString& NameOf(TEnum val) noexcept { + return *NPrivate::EnumNames<TEnum>[static_cast<size_t>(val)]; + } +} diff --git a/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp b/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp new file mode 100644 index 0000000000..a9eb423d13 --- /dev/null +++ b/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp @@ -0,0 +1,122 @@ +#include <library/cpp/unified_agent_client/client.h> + +#include <library/cpp/getopt/opt.h> + +#include <util/string/split.h> + +using namespace NUnifiedAgent; + +class TOptions { +public: + TString Uri; + TString SharedSecretKey; + TString SessionId; + TString SessionMeta; + + TOptions(int argc, const char* argv[]) { + NLastGetopt::TOpts opts; + TString logPriorityStr; + + opts + .AddLongOption("uri") + .RequiredArgument() + .Required() + .StoreResult(&Uri); + opts + .AddLongOption("shared-secret-key") + .RequiredArgument() + .Optional() + .StoreResult(&SharedSecretKey); + opts + .AddLongOption("session-id") + .RequiredArgument() + .Optional() + .StoreResult(&SessionId); + opts + .AddLongOption("session-meta", "key-value pairs separated by comma, e.g. 'k1=v1,k2=v2'") + .RequiredArgument() + .Optional() + .StoreResult(&SessionMeta); + + opts.AddHelpOption(); + opts.AddVersionOption(); + NLastGetopt::TOptsParseResult res(&opts, argc, argv); + } +}; + +bool TryParseMeta(const TString& s, THashMap<TString, TString>& meta) { + for (auto& t: StringSplitter(s).Split(',')) { + TString key; + TString value; + if (!StringSplitter(t.Token()).Split('=').TryCollectInto(&key, &value)) { + Cout << "invalid meta, can't extract key-value pair from [" << t.Token() << "]" << Endl; + return false; + } + meta[key] = value; + } + return true; +} + +bool TryParseLine(const TString& line, TVector<TString>& lineItems) { + lineItems = StringSplitter(line).Split('|').ToList<TString>(); + Y_VERIFY(lineItems.size() >= 1); + if (lineItems.size() > 2) { + Cout << "invalid line format, expected 'k1=v1,k2=v2|payload' or just 'payload'" << Endl; + return false; + } + return true; +} + +int main(int argc, const char* argv[]) { + TOptions options(argc, argv); + + TClientSessionPtr sessionPtr; + { + TLog emptyLog; + auto clientParameters = TClientParameters(options.Uri).SetLog(emptyLog); + if (!options.SharedSecretKey.Empty()) { + clientParameters.SetSharedSecretKey(options.SharedSecretKey); + } + auto clientPtr = MakeClient(clientParameters); + auto sessionParameters = TSessionParameters(); + if (!options.SessionId.Empty()) { + sessionParameters.SetSessionId(options.SessionId); + } + if (!options.SessionMeta.empty()) { + THashMap<TString, TString> sessionMeta; + if (!TryParseMeta(options.SessionMeta, sessionMeta)) { + return -1; + } + sessionParameters.SetMeta(sessionMeta); + } + sessionPtr = clientPtr->CreateSession(sessionParameters); + } + + TString line; + while (true) { + Cin.ReadLine(line); + if (line.Empty()) { + break; + } + + TVector<TString> lineItems; + if (!TryParseLine(line, lineItems)) { + continue; + } + + TClientMessage clientMessage; + clientMessage.Payload = lineItems.back(); + if (lineItems.size() == 2) { + THashMap<TString, TString> messageMeta; + if (!TryParseMeta(lineItems[0], messageMeta)) { + continue; + } + clientMessage.Meta = std::move(messageMeta); + } + sessionPtr->Send(std::move(clientMessage)); + } + + sessionPtr->Close(); + + return 0; +} diff --git a/library/cpp/unified_agent_client/f_maybe.h b/library/cpp/unified_agent_client/f_maybe.h new file mode 100644 index 0000000000..7abd4c0aac --- /dev/null +++ b/library/cpp/unified_agent_client/f_maybe.h @@ -0,0 +1,23 @@ +#pragma once + +#include <util/generic/maybe.h> + +namespace NUnifiedAgent { + template <typename T> + using TFMaybe = TMaybe<T, ::NMaybe::TPolicyUndefinedFail>; + + template <class T> + inline constexpr TFMaybe<std::decay_t<T>> MakeFMaybe(T&& value) { + return TMaybe<std::decay_t<T>, ::NMaybe::TPolicyUndefinedFail>(std::forward<T>(value)); + } + + template <class T, class... TArgs> + inline constexpr TFMaybe<T> MakeFMaybe(TArgs&&... args) { + return TFMaybe<T>(typename TFMaybe<T>::TInPlace{}, std::forward<TArgs>(args)...); + } + + template <class T> + inline constexpr TFMaybe<std::decay_t<T>> MakeFMaybe(const TMaybe<T>& source) { + return source.Defined() ? MakeFMaybe(*source) : Nothing(); + } +} diff --git a/library/cpp/unified_agent_client/grpc_io.cpp b/library/cpp/unified_agent_client/grpc_io.cpp new file mode 100644 index 0000000000..6d237d75ec --- /dev/null +++ b/library/cpp/unified_agent_client/grpc_io.cpp @@ -0,0 +1,161 @@ +#include "grpc_io.h" + +#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h> +#include <contrib/libs/grpc/src/core/lib/surface/completion_queue.h> +#include <contrib/libs/grpc/include/grpc/impl/codegen/log.h> + +#include <util/generic/yexception.h> +#include <util/string/cast.h> +#include <util/system/env.h> +#include <util/system/mutex.h> +#include <util/system/thread.h> + +namespace NUnifiedAgent { + namespace { + std::once_flag GrpcConfigured{}; + } + + TGrpcNotification::TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback) + : CompletionQueue(completionQueue) + , IOCallback(std::move(ioCallback)) + , Completion(MakeHolder<grpc_cq_completion>()) + , InQueue(false) + { + } + + TGrpcNotification::~TGrpcNotification() = default; + + void TGrpcNotification::Trigger() { + { + bool inQueue = false; + if (!InQueue.compare_exchange_strong(inQueue, true)) { + return; + } + } + grpc_core::ApplicationCallbackExecCtx callbackExecCtx; + grpc_core::ExecCtx execCtx; + IOCallback->Ref(); + Y_VERIFY(grpc_cq_begin_op(CompletionQueue.cq(), this)); + grpc_cq_end_op(CompletionQueue.cq(), this, nullptr, + [](void* self, grpc_cq_completion*) { + Y_VERIFY(static_cast<TGrpcNotification*>(self)->InQueue.exchange(false)); + }, + this, Completion.Get()); + } + + bool TGrpcNotification::FinalizeResult(void** tag, bool*) { + *tag = IOCallback.Get(); + return true; + } + + TGrpcTimer::TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback) + : CompletionQueue(completionQueue) + , IOCallback(std::move(ioCallback)) + , Alarm() + , AlarmIsSet(false) + , NextTriggerTime(Nothing()) + { + } + + void TGrpcTimer::Set(TInstant triggerTime) { + if (AlarmIsSet) { + NextTriggerTime = triggerTime; + Alarm.Cancel(); + } else { + AlarmIsSet = true; + Alarm.Set(&CompletionQueue, InstantToTimespec(triggerTime), Ref()); + } + } + + void TGrpcTimer::Cancel() { + NextTriggerTime.Clear(); + if (AlarmIsSet) { + Alarm.Cancel(); + } + } + + IIOCallback* TGrpcTimer::Ref() { + IOCallback->Ref(); + return this; + } + + void TGrpcTimer::OnIOCompleted(EIOStatus status) { + Y_VERIFY(AlarmIsSet); + if (NextTriggerTime) { + Alarm.Set(&CompletionQueue, InstantToTimespec(*NextTriggerTime), this); + NextTriggerTime.Clear(); + } else { + AlarmIsSet = false; + IOCallback->OnIOCompleted(status); + } + } + + TGrpcCompletionQueuePoller::TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue) + : Queue(queue) + , Thread() + { + } + + void TGrpcCompletionQueuePoller::Start() { + Thread = std::thread([this]() { + TThread::SetCurrentThreadName("ua_grpc_cq"); + void* tag; + bool ok; + while (Queue.Next(&tag, &ok)) { + try { + static_cast<IIOCallback*>(tag)->OnIOCompleted(ok ? EIOStatus::Ok : EIOStatus::Error); + } catch (...) { + Y_FAIL("unexpected exception [%s]", CurrentExceptionMessage().c_str()); + } + } + }); + } + + void TGrpcCompletionQueuePoller::Join() { + Thread.join(); + } + + TGrpcCompletionQueueHost::TGrpcCompletionQueueHost() + : CompletionQueue() + , Poller(CompletionQueue) + { + } + + void TGrpcCompletionQueueHost::Start() { + Poller.Start(); + } + + void TGrpcCompletionQueueHost::Stop() { + CompletionQueue.Shutdown(); + Poller.Join(); + } + + gpr_timespec InstantToTimespec(TInstant instant) { + gpr_timespec result; + result.clock_type = GPR_CLOCK_REALTIME; + result.tv_sec = static_cast<int64_t>(instant.Seconds()); + result.tv_nsec = instant.NanoSecondsOfSecond(); + return result; + } + + void EnsureGrpcConfigured() { + std::call_once(GrpcConfigured, []() { + const auto limitStr = GetEnv("UA_GRPC_EXECUTOR_THREADS_LIMIT"); + ui64 limit; + if (limitStr.Empty() || !TryFromString(limitStr, limit)) { + limit = 2; + } + grpc_core::Executor::SetThreadsLimit(limit); + }); + } + + void StartGrpcTracing() { + grpc_tracer_set_enabled("all", true); + gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); + } + + void FinishGrpcTracing() { + grpc_tracer_set_enabled("all", false); + gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR); + } +} diff --git a/library/cpp/unified_agent_client/grpc_io.h b/library/cpp/unified_agent_client/grpc_io.h new file mode 100644 index 0000000000..5f368a5943 --- /dev/null +++ b/library/cpp/unified_agent_client/grpc_io.h @@ -0,0 +1,141 @@ +#pragma once + +#include <library/cpp/unified_agent_client/async_joiner.h> +#include <library/cpp/unified_agent_client/f_maybe.h> + +#include <contrib/libs/grpc/include/grpcpp/alarm.h> +#include <contrib/libs/grpc/include/grpc++/grpc++.h> + +#include <thread> + +struct grpc_cq_completion; + +namespace NUnifiedAgent { + enum class EIOStatus { + Ok, + Error + }; + + class IIOCallback { + public: + virtual ~IIOCallback() = default; + + virtual IIOCallback* Ref() = 0; + + virtual void OnIOCompleted(EIOStatus status) = 0; + }; + + template<typename TCallback, typename TCounter> + class TIOCallback: public IIOCallback { + public: + explicit TIOCallback(TCallback&& callback, TCounter* counter) + : Callback(std::move(callback)) + , Counter(counter) + { + } + + IIOCallback* Ref() override { + Counter->Ref(); + return this; + } + + void OnIOCompleted(EIOStatus status) override { + Callback(status); + Counter->UnRef(); + } + + private: + TCallback Callback; + TCounter* Counter; + }; + + template<typename TCallback, typename TCounter> + THolder<IIOCallback> MakeIOCallback(TCallback&& callback, TCounter* counter) { + return MakeHolder<TIOCallback<TCallback, TCounter>>(std::move(callback), counter); + } + + template<typename TTarget, typename TCounter = TTarget> + THolder<IIOCallback> MakeIOCallback(TTarget* target, void (TTarget::*method)(EIOStatus), + TCounter* counter = nullptr) + { + return MakeIOCallback([target, method](EIOStatus status) { ((*target).*method)(status); }, + counter ? counter : target); + } + + class TGrpcNotification: private ::grpc::internal::CompletionQueueTag { + public: + TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback); + + ~TGrpcNotification(); + + void Trigger(); + + private: + bool FinalizeResult(void** tag, bool* status) override; + + private: + grpc::CompletionQueue& CompletionQueue; + THolder<IIOCallback> IOCallback; + THolder<grpc_cq_completion> Completion; + std::atomic<bool> InQueue; + }; + + class TGrpcTimer: private IIOCallback { + public: + TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback); + + void Set(TInstant triggerTime); + + void Cancel(); + + private: + IIOCallback* Ref() override; + + void OnIOCompleted(EIOStatus status) override; + + private: + grpc::CompletionQueue& CompletionQueue; + THolder<IIOCallback> IOCallback; + grpc::Alarm Alarm; + bool AlarmIsSet; + TFMaybe<TInstant> NextTriggerTime; + }; + + class TGrpcCompletionQueuePoller { + public: + explicit TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue); + + void Start(); + + void Join(); + + private: + grpc::CompletionQueue& Queue; + std::thread Thread; + }; + + class TGrpcCompletionQueueHost { + public: + TGrpcCompletionQueueHost(); + + void Start(); + + void Stop(); + + inline grpc::CompletionQueue& GetCompletionQueue() noexcept { + return CompletionQueue; + } + + private: + grpc::CompletionQueue CompletionQueue; + TGrpcCompletionQueuePoller Poller; + }; + + gpr_timespec InstantToTimespec(TInstant instant); + + void EnsureGrpcConfigured(); + + void StartGrpcTracing(); + + void FinishGrpcTracing(); +} diff --git a/library/cpp/unified_agent_client/grpc_status_code.cpp b/library/cpp/unified_agent_client/grpc_status_code.cpp new file mode 100644 index 0000000000..662bbbe7a5 --- /dev/null +++ b/library/cpp/unified_agent_client/grpc_status_code.cpp @@ -0,0 +1,56 @@ +#include <contrib/libs/grpc/include/grpcpp/impl/codegen/status_code_enum.h> + +#include <util/stream/output.h> + +namespace { + const char* GrpcStatusCodeToString(grpc::StatusCode statusCode) { + switch (statusCode) { + case grpc::OK: + return "OK"; + case grpc::CANCELLED: + return "CANCELLED"; + case grpc::UNKNOWN: + return "UNKNOWN"; + case grpc::INVALID_ARGUMENT: + return "INVALID_ARGUMENT"; + case grpc::DEADLINE_EXCEEDED: + return "DEADLINE_EXCEEDED"; + case grpc::NOT_FOUND: + return "NOT_FOUND"; + case grpc::ALREADY_EXISTS: + return "ALREADY_EXISTS"; + case grpc::PERMISSION_DENIED: + return "PERMISSION_DENIED"; + case grpc::UNAUTHENTICATED: + return "UNAUTHENTICATED"; + case grpc::RESOURCE_EXHAUSTED: + return "RESOURCE_EXHAUSTED"; + case grpc::FAILED_PRECONDITION: + return "FAILED_PRECONDITION"; + case grpc::ABORTED: + return "ABORTED"; + case grpc::OUT_OF_RANGE: + return "OUT_OF_RANGE"; + case grpc::UNIMPLEMENTED: + return "UNIMPLEMENTED"; + case grpc::INTERNAL: + return "INTERNAL"; + case grpc::UNAVAILABLE: + return "UNAVAILABLE"; + case grpc::DATA_LOSS: + return "DATA_LOSS"; + default: + return nullptr; + } + } +} + +template <> +void Out<grpc::StatusCode>(IOutputStream& o, grpc::StatusCode statusCode) { + const auto* s = GrpcStatusCodeToString(statusCode); + if (s == nullptr) { + o << "grpc::StatusCode [" << static_cast<int>(statusCode) << "]"; + } else { + o << s; + } +} diff --git a/library/cpp/unified_agent_client/helpers.cpp b/library/cpp/unified_agent_client/helpers.cpp new file mode 100644 index 0000000000..01908660f3 --- /dev/null +++ b/library/cpp/unified_agent_client/helpers.cpp @@ -0,0 +1,65 @@ +#include "helpers.h" +#include <util/string/ascii.h> + +namespace NUnifiedAgent::NPrivate { + bool IsUtf8(const THashMap<TString, TString>& meta) { + for (const auto& p : meta) { + if (!IsUtf(p.first) || !IsUtf(p.second)) { + return false; + } + } + return true; + } + + ResultReplacingNonUTF ReplaceNonUTF(TStringBuf message, char signBrokenSymbol, size_t maxSize) { + ResultReplacingNonUTF result; + if (maxSize == 0) { + result.IsTruncated = !message.empty(); + return result; + } + if (message.empty()) { + return result; + } + + auto currentPoint = reinterpret_cast<const unsigned char*>(&message[0]); + auto endPoint = currentPoint + message.size(); + + auto pushSignBroken = [&result, signBrokenSymbol]() { + if (result.Data.empty() || result.Data.back() != signBrokenSymbol) { + result.Data.push_back(signBrokenSymbol); + } + ++result.BrokenCount; + }; + + while (currentPoint < endPoint) { + wchar32 rune = 0; + size_t rune_len = 0; + auto statusRead = SafeReadUTF8Char(rune, rune_len, currentPoint, endPoint); + + if (statusRead == RECODE_OK) { + if (rune_len == 1 && !IsAsciiAlnum(*currentPoint) && !IsAsciiPunct(*currentPoint) && !IsAsciiSpace(*currentPoint)) { + ++currentPoint; + pushSignBroken(); + } else { + while (rune_len != 0) { + result.Data.push_back(*currentPoint); + ++currentPoint; + --rune_len; + } + } + } else if (statusRead == RECODE_BROKENSYMBOL) { + ++currentPoint; + pushSignBroken(); + } else { + pushSignBroken(); + break; + } + + if (result.Data.size() >= maxSize && currentPoint < endPoint) { + result.IsTruncated = true; + break; + } + } + return result; + } +} diff --git a/library/cpp/unified_agent_client/helpers.h b/library/cpp/unified_agent_client/helpers.h new file mode 100644 index 0000000000..9c8f727315 --- /dev/null +++ b/library/cpp/unified_agent_client/helpers.h @@ -0,0 +1,17 @@ +#pragma once + +#include "client.h" + +#include <util/charset/utf8.h> + +namespace NUnifiedAgent::NPrivate { + bool IsUtf8(const THashMap<TString, TString>& meta); + + struct ResultReplacingNonUTF { + bool IsTruncated{false}; + size_t BrokenCount{0}; + TString Data; + }; + + ResultReplacingNonUTF ReplaceNonUTF(TStringBuf message, char signBrokenSymbol = '?', size_t maxSize = std::numeric_limits<size_t>::max()); +} diff --git a/library/cpp/unified_agent_client/logger.cpp b/library/cpp/unified_agent_client/logger.cpp new file mode 100644 index 0000000000..abe049d7eb --- /dev/null +++ b/library/cpp/unified_agent_client/logger.cpp @@ -0,0 +1,131 @@ +#include "logger.h" + +#include <library/cpp/unified_agent_client/clock.h> +#include <library/cpp/unified_agent_client/helpers.h> + +#include <library/cpp/logger/log.h> + +#include <util/datetime/base.h> +#include <util/stream/str.h> +#include <util/system/getpid.h> +#include <util/system/thread.h> + +namespace NUnifiedAgent { + namespace { + TString FormatLogLine(ELogPriority logLevel, const TStringBuf message, const TString& scope) { + TString result; + { + TStringOutput output(result); + output << FormatIsoLocal(TClock::Now()) + << " " << GetPID() + << " " << TThread::CurrentThreadId() + << " " << logLevel; + if (!scope.Empty()) { + output << " " << scope; + } + output << " " << message << "\n"; + } + return result; + } + } + + TLogger::TThrottlerWithLock::TThrottlerWithLock(size_t rateLimitBytes) + : Throttler(rateLimitBytes, rateLimitBytes / 2) + , Lock() + { + } + + bool TLogger::TThrottlerWithLock::TryConsume(double tokens) { + with_lock(Lock) { + return Throttler.TryConsume(tokens); + } + } + + TLogger::TLogger(TLog& log, TFMaybe<size_t> rateLimitBytes) + : DefaultLogContext{log, log.IsNullLog() ? ELogPriority::TLOG_EMERG : log.FiltrationLevel()} + , TracingLogContexts() + , CurrentLogContext_() + , Errors(nullptr) + , DroppedBytes(nullptr) + , Throttler(rateLimitBytes.Defined() ? MakeHolder<TThrottlerWithLock>(*rateLimitBytes) : nullptr) + , Lock() + { + SetCurrentLogContext(DefaultLogContext); + } + + void TLogger::SetCurrentLogContext(TLogContext& logContext) { + CurrentLogContext_.store(logContext.Log.IsNullLog() ? nullptr : &logContext, std::memory_order_release); + } + + void TLogger::Log(TLog& log, ELogPriority logPriority, const TStringBuf message, const TString& scope) const { + try { + const auto logLine = FormatLogLine(logPriority, NUnifiedAgent::NPrivate::ReplaceNonUTF(message).Data, scope); + if (Throttler && &log == &DefaultLogContext.Log && !Throttler->TryConsume(logLine.size())) { + if (DroppedBytes) { + DroppedBytes->Add(logLine.size()); + } + return; + } + log.Write(logPriority, logLine); + } catch (...) { + } + } + + void TLogger::StartTracing(ELogPriority logPriority) noexcept { + with_lock(Lock) { + auto& logContext = GetOrCreateTracingLogContext(logPriority); + SetTracing(logContext, "started"); + } + } + + void TLogger::FinishTracing() noexcept { + with_lock(Lock) { + SetTracing(DefaultLogContext, "finished"); + } + } + + void TLogger::SetTracing(TLogContext& logContext, const char* action) { + // Lock must be held + + SetCurrentLogContext(logContext); + + Log(logContext.Log, + TLOG_INFO, + Sprintf("tracing %s, log priority is set to [%s]", + action, ToString(logContext.Priority).c_str()), + ""); + } + + auto TLogger::GetOrCreateTracingLogContext(ELogPriority logPriority) -> TLogContext& { + // Lock must be held + + for (const auto& c: TracingLogContexts) { + if (c->Priority == logPriority) { + return *c; + } + } + + auto newLogContext = MakeHolder<TLogContext>(); + newLogContext->Log = TLog("cerr", logPriority); + newLogContext->Priority = logPriority; + auto* result = newLogContext.Get(); + TracingLogContexts.push_back(std::move(newLogContext)); + return *result; + } + + TScopeLogger::TScopeLogger() + : Logger(nullptr) + , Scope() + , Errors(nullptr) + { + } + + TScopeLogger::TScopeLogger(TLogger* logger, + const TString& scope, + NMonitoring::TDeprecatedCounter* errors) + : Logger(logger) + , Scope(scope) + , Errors(errors) + { + } +} diff --git a/library/cpp/unified_agent_client/logger.h b/library/cpp/unified_agent_client/logger.h new file mode 100644 index 0000000000..d83cba92de --- /dev/null +++ b/library/cpp/unified_agent_client/logger.h @@ -0,0 +1,157 @@ +#pragma once + +#include <library/cpp/unified_agent_client/f_maybe.h> +#include <library/cpp/unified_agent_client/throttling.h> + +#include <library/cpp/logger/log.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/generic/string.h> +#include <util/string/join.h> +#include <util/string/printf.h> +#include <util/system/file.h> + +#include <atomic> + +#define YLOG(logPriority, message, logger) \ + do { \ + const auto __logPriority = logPriority; \ + if (auto* log = logger.Accept(__logPriority, false); log != nullptr) { \ + logger.Log(*log, __logPriority, message); \ + } \ + } while (false) + +#define YLOG_EMERG(msg) YLOG(TLOG_EMERG, msg, Logger) +#define YLOG_ALERT(msg) YLOG(TLOG_ALERT, msg, Logger) +#define YLOG_CRIT(msg) YLOG(TLOG_CRIT, msg, Logger) +#define YLOG_ERR(msg) YLOG(TLOG_ERR, msg, Logger) +#define YLOG_WARNING(msg) YLOG(TLOG_WARNING, msg, Logger) +#define YLOG_NOTICE(msg) YLOG(TLOG_NOTICE, msg, Logger) +#define YLOG_INFO(msg) YLOG(TLOG_INFO, msg, Logger) +#define YLOG_DEBUG(msg) YLOG(TLOG_DEBUG, msg, Logger) +#define YLOG_RESOURCES(msg) YLOG(TLOG_RESOURCES , msg, Logger) + +#define YLOG_FATAL(msg) \ + YLOG(TLOG_CRIT, msg, Logger); \ + _Exit(1); + +namespace NUnifiedAgent { + class TScopeLogger; + + class TLogger { + public: + TLogger(TLog& log, TFMaybe<size_t> rateLimitBytes); + + void StartTracing(ELogPriority logPriority) noexcept; + + void FinishTracing() noexcept; + + inline TScopeLogger Child(const TString& v, NMonitoring::TDeprecatedCounter* errors = nullptr); + + inline void SetErrorsCounter(NMonitoring::TDeprecatedCounter* counter) noexcept { + Errors = counter; + } + + inline void SetDroppedBytesCounter(NMonitoring::TDeprecatedCounter* counter) noexcept { + DroppedBytes = counter; + } + + inline bool HasRateLimit() const noexcept { + return Throttler != nullptr; + } + + friend class TScopeLogger; + + private: + void Log(TLog& log, ELogPriority logPriority, const TStringBuf message, const TString& scope) const; + + inline TLog* Accept(ELogPriority logPriority, NMonitoring::TDeprecatedCounter* errors) const noexcept { + if ((logPriority <= TLOG_ERR) && (errors != nullptr)) { + ++(*errors); + } + auto* result = CurrentLogContext_.load(std::memory_order_acquire); + return result != nullptr && static_cast<int>(logPriority) <= static_cast<int>(result->Priority) + ? &result->Log + : nullptr; + } + + private: + struct TLogContext { + TLog Log; + ELogPriority Priority; + }; + + class TThrottlerWithLock { + public: + explicit TThrottlerWithLock(size_t rateLimitBytes); + + bool TryConsume(double tokens); + + private: + TThrottler Throttler; + TAdaptiveLock Lock; + }; + + private: + void SetCurrentLogContext(TLogContext& logContext); + + TLogContext& GetOrCreateTracingLogContext(ELogPriority logPriority); + + void SetTracing(TLogContext& logContext, const char* action); + + private: + TLogContext DefaultLogContext; + TVector<THolder<TLogContext>> TracingLogContexts; + std::atomic<TLogContext*> CurrentLogContext_; + NMonitoring::TDeprecatedCounter* Errors; + NMonitoring::TDeprecatedCounter* DroppedBytes; + const THolder<TThrottlerWithLock> Throttler; + TAdaptiveLock Lock; + }; + + class TScopeLogger { + public: + TScopeLogger(); + + inline void Log(TLog& log, ELogPriority logPriority, const TStringBuf message) const { + if (Logger) { + Logger->Log(log, logPriority, message, Scope); + } + } + + inline TLog* Accept(ELogPriority logPriority, bool silent) const noexcept { + return Logger ? Logger->Accept(logPriority, silent ? nullptr : Errors) : nullptr; + } + + inline TScopeLogger Child(const TString& v, NMonitoring::TDeprecatedCounter* errors = nullptr) { + return Logger + ? Logger->Child(Join('/', Scope, v), errors == nullptr ? Errors : errors) + : TScopeLogger(); + } + + inline TLogger* Unwrap() noexcept { + return Logger; + } + + friend class TLogger; + + private: + TScopeLogger(TLogger* logger, + const TString& scope, + NMonitoring::TDeprecatedCounter* errors); + + private: + TLogger* Logger; + TString Scope; + NMonitoring::TDeprecatedCounter* Errors; + }; + + inline TScopeLogger TLogger::Child(const TString& v, NMonitoring::TDeprecatedCounter* errors) { + return TScopeLogger(this, v, errors == nullptr ? Errors : errors); + } + + inline ELogPriority ToLogPriority(int level) noexcept { + const auto result = ClampVal(level, 0, static_cast<int>(TLOG_RESOURCES)); + return static_cast<ELogPriority>(result); + } +} diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt b/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt new file mode 100644 index 0000000000..9642a192c4 --- /dev/null +++ b/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt @@ -0,0 +1,45 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-unified_agent_client-proto) +set_property(TARGET cpp-unified_agent_client-proto PROPERTY + PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h +) +target_link_libraries(cpp-unified_agent_client-proto PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + tools-enum_parser-enum_serialization_runtime + contrib-libs-protobuf +) +target_proto_messages(cpp-unified_agent_client-proto PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto +) +generate_enum_serilization(cpp-unified_agent_client-proto + ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/proto/unified_agent.pb.h +) +target_proto_addincls(cpp-unified_agent_client-proto + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(cpp-unified_agent_client-proto + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +target_proto_plugin(cpp-unified_agent_client-proto + grpc_cpp + grpc_cpp +) diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt b/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..da20d2e479 --- /dev/null +++ b/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt @@ -0,0 +1,46 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-unified_agent_client-proto) +set_property(TARGET cpp-unified_agent_client-proto PROPERTY + PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h +) +target_link_libraries(cpp-unified_agent_client-proto PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + tools-enum_parser-enum_serialization_runtime + contrib-libs-protobuf +) +target_proto_messages(cpp-unified_agent_client-proto PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto +) +generate_enum_serilization(cpp-unified_agent_client-proto + ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/proto/unified_agent.pb.h +) +target_proto_addincls(cpp-unified_agent_client-proto + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(cpp-unified_agent_client-proto + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +target_proto_plugin(cpp-unified_agent_client-proto + grpc_cpp + grpc_cpp +) diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt b/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt new file mode 100644 index 0000000000..da20d2e479 --- /dev/null +++ b/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt @@ -0,0 +1,46 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-unified_agent_client-proto) +set_property(TARGET cpp-unified_agent_client-proto PROPERTY + PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h +) +target_link_libraries(cpp-unified_agent_client-proto PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + tools-enum_parser-enum_serialization_runtime + contrib-libs-protobuf +) +target_proto_messages(cpp-unified_agent_client-proto PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto +) +generate_enum_serilization(cpp-unified_agent_client-proto + ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/proto/unified_agent.pb.h +) +target_proto_addincls(cpp-unified_agent_client-proto + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(cpp-unified_agent_client-proto + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +target_proto_plugin(cpp-unified_agent_client-proto + grpc_cpp + grpc_cpp +) diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.txt b/library/cpp/unified_agent_client/proto/CMakeLists.txt new file mode 100644 index 0000000000..5bb4faffb4 --- /dev/null +++ b/library/cpp/unified_agent_client/proto/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/library/cpp/unified_agent_client/proto/unified_agent.proto b/library/cpp/unified_agent_client/proto/unified_agent.proto new file mode 100644 index 0000000000..68efe35747 --- /dev/null +++ b/library/cpp/unified_agent_client/proto/unified_agent.proto @@ -0,0 +1,101 @@ +syntax = "proto3"; +import "google/protobuf/descriptor.proto"; + +package NUnifiedAgentProto; + +option java_package = "com.yandex.unified_agent"; +option go_package = "a.yandex-team.ru/library/cpp/unified_agent_client/proto;unifiedagent"; + +extend google.protobuf.FileOptions { + bool GenerateYaStyle = 66777; +} + +message Request { + message SessionMetaItem { + string name = 1; + string value = 2; + } + + message Initialize { + // Session_id provided by server, use it in case of reconnects. + string session_id = 1; + + // Session metadata + repeated SessionMetaItem meta = 2; + + string shared_secret_key = 3; + } + + message MessageMetaItem { + // Arbitrary key-value pairs. Can be used by agent filters to modify/filter messages + // or to route them to target outputs. + + // Meta items of all messages should be grouped by meta key, it's expected in the 'key' field. + // Meta values should be passed in the 'value' sequence, it corresponds to the payload + // sequence from DataBatch. If some messages don't have a meta with this key, the range of such messages + // can be passed via skip_start/skip_length sequences. + // For example, [{m:v1}, {}, {}, {m: v2}, {}, {m: v3}, {}, {}] can be represented as follows: + // key: 'm' + // value: ['v1', 'v2', 'v3'] + // skip_start: [1, 4] + // skip_length: [2, 1] + + string key = 1; + repeated string value = 2; + repeated uint32 skip_start = 3; + repeated uint32 skip_length = 4; + } + + message DataBatch { + repeated uint64 seq_no = 1; + repeated uint64 timestamp = 2; //microseconds + repeated bytes payload = 100; + repeated MessageMetaItem meta = 101; + } + + oneof request { + Initialize initialize = 1; + DataBatch data_batch = 2; + } +} + +message Response { + message Initialized { + // Session identifier for log and deduplication purposes. + string session_id = 1; + + // Application can skip all formed messages by seq_no upto last_seq_no - they are consumed by server. + uint64 last_seq_no = 2; + } + + message Ack { + uint64 seq_no = 1; + } + + oneof response { + Initialized initialized = 1; + Ack ack = 2; + } +} + +service UnifiedAgentService { + rpc Session(stream Request) returns (stream Response); +} + + +// dataflow: +// Request.initialize -> UnifiedAgent; +// specify session_id when this is a retry. Сlient can already have sesison_id from previous init response, +// or it can use some pregenerated sessionId for each session. +// Response.initializeded -> client; +// Request.entry -> UnifiedAgent; +// .... +// Response.ack -> client; +// when this record is consumed by UnifiedAgent with choosen garanties UnifiedAgent will send ack to client; +// client can forget about this log record now +// +// grpc finish session -> client; +// something went wrong; client must reconnect and retry all not acknowleged records +// +// Exactly once retries - when reconnect, client must provide previous session_id and same seq_no`s +// for records - only in this case UnifiedAgent can dedup. diff --git a/library/cpp/unified_agent_client/proto_weighing.cpp b/library/cpp/unified_agent_client/proto_weighing.cpp new file mode 100644 index 0000000000..7a532213ea --- /dev/null +++ b/library/cpp/unified_agent_client/proto_weighing.cpp @@ -0,0 +1,99 @@ +#include "proto_weighing.h" + +#include <google/protobuf/io/coded_stream.h> + +namespace NUnifiedAgent::NPW { + template <typename T> + inline size_t SizeOf(T value); + + using CodedOutputStream = google::protobuf::io::CodedOutputStream; + + template <> + inline size_t SizeOf(ui64 value) { + return CodedOutputStream::VarintSize64(value); + } + + template <> + inline size_t SizeOf(ui32 value) { + return CodedOutputStream::VarintSize32(value); + } + + template <> + inline size_t SizeOf(i64 value) { + return CodedOutputStream::VarintSize64(static_cast<google::protobuf::uint64>(value)); + } + + TFieldLink::TFieldLink(TLengthDelimited* container, bool repeated, size_t keySize) + : Container(container) + , OuterSize(0) + , Repeated(repeated) + , KeySize(keySize) + { + } + + void TFieldLink::SetValueSize(bool empty, size_t size) { + const auto newOuterSize = empty && !Repeated ? 0 : KeySize + static_cast<int>(size); + Container->IncSize(newOuterSize - OuterSize); + OuterSize = newOuterSize; + } + + TLengthDelimited::TLengthDelimited(const TFMaybe<TFieldLink>& link) + : Link(link) + , ByteSize(0) + { + } + + void TLengthDelimited::IncSize(int sizeDelta) { + ByteSize += sizeDelta; + if (Link) { + const auto byteSize = static_cast<ui32>(ByteSize); + Link->SetValueSize(false, byteSize + SizeOf(byteSize)); + } + } + + template <typename T> + void TRepeatedField<T>::Add(T value) { + IncSize(static_cast<int>(SizeOf(value))); + } + + template <typename T> + TNumberField<T>::TNumberField(const TFieldLink& link) + : Link(link) + { + } + + template <typename T> + void TNumberField<T>::SetValue(T value) { + Link.SetValueSize(value == 0, SizeOf(value)); + } + + template <typename T> + TFixedNumberField<T>::TFixedNumberField(const TFieldLink& link) + : Link(link) + { + } + + template <typename T> + void TFixedNumberField<T>::SetValue() { + Link.SetValueSize(false, sizeof(T)); + } + + TStringField::TStringField(const TFieldLink& link) + : Link(link) + { + } + + void TStringField::SetValue(const TString& value) { + Link.SetValueSize(value.Empty(), value.Size() + SizeOf(static_cast<ui32>(value.Size()))); + } + + template class TNumberField<ui64>; + template class TNumberField<ui32>; + template class TNumberField<i64>; + template class TFixedNumberField<ui64>; + template class TFixedNumberField<ui32>; + template class TFixedNumberField<i64>; + template class TRepeatedField<ui64>; + template class TRepeatedField<ui32>; + template class TRepeatedField<i64>; +} diff --git a/library/cpp/unified_agent_client/proto_weighing.h b/library/cpp/unified_agent_client/proto_weighing.h new file mode 100644 index 0000000000..47cf577e14 --- /dev/null +++ b/library/cpp/unified_agent_client/proto_weighing.h @@ -0,0 +1,138 @@ +#pragma once + +#include <library/cpp/unified_agent_client/f_maybe.h> + +#include <util/generic/deque.h> +#include <util/generic/string.h> + +namespace NUnifiedAgent::NPW { + class TLengthDelimited; + + class TFieldLink { + public: + TFieldLink(TLengthDelimited* container, bool repeated = false, size_t keySize = 1); + + void SetValueSize(bool empty, size_t size); + + private: + TLengthDelimited* Container; + int OuterSize; + bool Repeated; + size_t KeySize; + }; + + class TLengthDelimited { + public: + explicit TLengthDelimited(const TFMaybe<TFieldLink>& link = Nothing()); + + void IncSize(int sizeDelta); + + size_t ByteSizeLong() const { + return static_cast<size_t>(ByteSize); + } + + private: + TFMaybe<TFieldLink> Link; + int ByteSize; + }; + + using TMessage = TLengthDelimited; + + template <typename T> + class TRepeatedField: public TLengthDelimited { + public: + static_assert(std::is_same_v<T, ui32> || + std::is_same_v<T, ui64> || + std::is_same_v<T, i64>, + "type is not supported"); + + using TLengthDelimited::TLengthDelimited; + + void Add(T value); + }; + + template <typename T> + class TRepeatedPtrField { + public: + explicit TRepeatedPtrField(TMessage* message, size_t keySize = 1) + : Message(message) + , Children() + , KeySize(keySize) + { + } + + size_t GetSize() const { + return Children.size(); + } + + T& Get(size_t index) { + return Children[index]; + } + + T& Add() { + if constexpr (std::is_constructible<T, TFieldLink>::value) { + Children.emplace_back(TFieldLink(Message, true, KeySize)); + } else { + Children.emplace_back(Message); + } + return Children.back(); + } + + private: + TMessage* Message; + TDeque<T> Children; + size_t KeySize; + }; + + template <typename T> + class TNumberField { + public: + static_assert(std::is_same_v<T, ui32> || + std::is_same_v<T, ui64> || + std::is_same_v<T, i64>, + "type is not supported"); + + explicit TNumberField(const TFieldLink& link); + + void SetValue(T value); + + private: + TFieldLink Link; + }; + + template <typename T> + class TFixedNumberField { + public: + static_assert(std::is_same_v<T, ui32> || + std::is_same_v<T, ui64> || + std::is_same_v<T, i64>, + "type is not supported"); + + explicit TFixedNumberField(const TFieldLink& link); + + void SetValue(); + + private: + TFieldLink Link; + }; + + class TStringField { + public: + explicit TStringField(const TFieldLink& link); + + void SetValue(const TString& value); + + private: + TFieldLink Link; + }; + + extern template class TNumberField<ui64>; + extern template class TNumberField<ui32>; + extern template class TNumberField<i64>; + extern template class TFixedNumberField<ui64>; + extern template class TFixedNumberField<ui32>; + extern template class TFixedNumberField<i64>; + extern template class TRepeatedField<ui64>; + extern template class TRepeatedField<ui32>; + extern template class TRepeatedField<i64>; +} diff --git a/library/cpp/unified_agent_client/registrar.cpp b/library/cpp/unified_agent_client/registrar.cpp new file mode 100644 index 0000000000..41f6eb34ca --- /dev/null +++ b/library/cpp/unified_agent_client/registrar.cpp @@ -0,0 +1,8 @@ +#include "backend_creator.h" + +namespace NUnifiedAgent { + + ILogBackendCreator::TFactory::TRegistrator<NUnifiedAgent::TLogBackendCreator> TLogBackendCreator::Registrar("unified_agent"); + +} + diff --git a/library/cpp/unified_agent_client/throttling.cpp b/library/cpp/unified_agent_client/throttling.cpp new file mode 100644 index 0000000000..271f7b0e7e --- /dev/null +++ b/library/cpp/unified_agent_client/throttling.cpp @@ -0,0 +1,67 @@ +#include "throttling.h" + +#include <util/datetime/cputimer.h> + +namespace NUnifiedAgent { + TThrottler::TThrottler(double rate, TDuration updatePeriod) + : CyclesPerMillisecond(GetCyclesPerMillisecond()) + , UpdatePeriod(updatePeriod.MilliSeconds() * CyclesPerMillisecond) + , PeriodTokens(updatePeriod.SecondsFloat() * rate) + , AvailableTokens(0) + , ExpirationTime(0) + { + } + + TThrottler::TThrottler(double rate, double burst) + : TThrottler(rate, TDuration::Seconds(burst / rate)) + { + } + + void TThrottler::Consume(double& tokens, TFMaybe<TDuration>& nextCheckDelay) { + const auto updateTime = UpdateTokens(); + + if (tokens <= AvailableTokens) { + AvailableTokens -= tokens; + tokens = 0.0; + nextCheckDelay = Nothing(); + } else { + tokens -= AvailableTokens; + AvailableTokens = 0.0; + nextCheckDelay = TDuration::MicroSeconds((ExpirationTime - updateTime) * 1000 / CyclesPerMillisecond + 1); + } + } + + bool TThrottler::TryConsume(double tokens) { + UpdateTokens(); + + if (tokens > AvailableTokens) { + return false; + } + AvailableTokens -= tokens; + return true; + } + + void TThrottler::ConsumeAndWait(double tokens) { + TFMaybe<TDuration> nextCheckDelay; + while (true) { + Consume(tokens, nextCheckDelay); + if (!nextCheckDelay.Defined()) { + return; + } + Sleep(*nextCheckDelay); + } + } + + ui64 TThrottler::UpdateTokens() { + const auto updateTime = GetCycleCount(); + if (updateTime >= ExpirationTime) { + if (ExpirationTime == 0) { + ExpirationTime = updateTime + UpdatePeriod; + } else { + ExpirationTime += ((updateTime - ExpirationTime) / UpdatePeriod + 1) * UpdatePeriod; + } + AvailableTokens = PeriodTokens; + } + return updateTime; + } +} diff --git a/library/cpp/unified_agent_client/throttling.h b/library/cpp/unified_agent_client/throttling.h new file mode 100644 index 0000000000..1e5db1e8fa --- /dev/null +++ b/library/cpp/unified_agent_client/throttling.h @@ -0,0 +1,30 @@ +#pragma once + +#include <library/cpp/unified_agent_client/f_maybe.h> + +#include <util/datetime/base.h> + +namespace NUnifiedAgent { + class TThrottler { + public: + explicit TThrottler(double rate, TDuration updatePeriod = TDuration::MilliSeconds(100)); + + TThrottler(double rate, double burst); + + void Consume(double& tokens, TFMaybe<TDuration>& nextCheckDelay); + + bool TryConsume(double tokens); + + void ConsumeAndWait(double tokens); + + private: + ui64 UpdateTokens(); + + private: + ui64 CyclesPerMillisecond; + ui64 UpdatePeriod; + double PeriodTokens; + double AvailableTokens; + ui64 ExpirationTime; + }; +} diff --git a/library/cpp/unified_agent_client/variant.h b/library/cpp/unified_agent_client/variant.h new file mode 100644 index 0000000000..e261aa9af3 --- /dev/null +++ b/library/cpp/unified_agent_client/variant.h @@ -0,0 +1,18 @@ +#pragma once + +#include <variant> + +namespace NUnifiedAgent { + template<class... Ts> struct TOverloaded : Ts... { using Ts::operator()...; }; + template<class... Ts> TOverloaded(Ts...) -> TOverloaded<Ts...>; + + template <class T, class... U> + auto Visit(T&& variant, U&&... visitorOverloads) { + return std::visit(TOverloaded{std::forward<U>(visitorOverloads)...}, std::forward<T>(variant)); + } + + template <typename TTarget, typename... TSourceTypes> + auto CastTo(std::variant<TSourceTypes...>&& variant) { + return Visit(variant, [](auto& p) -> TTarget { return std::move(p); }); + } +} diff --git a/ydb/apps/ydbd/CMakeLists.darwin.txt b/ydb/apps/ydbd/CMakeLists.darwin.txt index 2ded59f519..dbfba1c967 100644 --- a/ydb/apps/ydbd/CMakeLists.darwin.txt +++ b/ydb/apps/ydbd/CMakeLists.darwin.txt @@ -59,7 +59,6 @@ target_link_options(ydbd PRIVATE CoreFoundation ) target_sources(ydbd PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/log_backend.cpp ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/export.cpp ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/sqs.cpp ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/main.cpp diff --git a/ydb/apps/ydbd/CMakeLists.linux-aarch64.txt b/ydb/apps/ydbd/CMakeLists.linux-aarch64.txt index 4363019c93..3aeaceee3a 100644 --- a/ydb/apps/ydbd/CMakeLists.linux-aarch64.txt +++ b/ydb/apps/ydbd/CMakeLists.linux-aarch64.txt @@ -61,7 +61,6 @@ target_link_options(ydbd PRIVATE -ldl ) target_sources(ydbd PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/log_backend.cpp ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/export.cpp ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/sqs.cpp ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/main.cpp diff --git a/ydb/apps/ydbd/CMakeLists.linux.txt b/ydb/apps/ydbd/CMakeLists.linux.txt index 19aea829d2..554dfec80f 100644 --- a/ydb/apps/ydbd/CMakeLists.linux.txt +++ b/ydb/apps/ydbd/CMakeLists.linux.txt @@ -63,7 +63,6 @@ target_link_options(ydbd PRIVATE -ldl ) target_sources(ydbd PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/log_backend.cpp ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/export.cpp ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/sqs.cpp ${CMAKE_SOURCE_DIR}/ydb/apps/ydbd/main.cpp diff --git a/ydb/apps/ydbd/log_backend.cpp b/ydb/apps/ydbd/log_backend.cpp deleted file mode 100644 index dc50353efa..0000000000 --- a/ydb/apps/ydbd/log_backend.cpp +++ /dev/null @@ -1,31 +0,0 @@ -#include "log_backend.h" -#include <ydb/core/base/counters.h> - -namespace NKikimr { - -TAutoPtr<TLogBackend> TAuditLogBackendFactory::CreateLogBackend( - const TKikimrRunConfig& runConfig, - NMonitoring::TDynamicCounterPtr) -{ - TAutoPtr<TLogBackend> logBackend; - if (!runConfig.AppConfig.HasAuditConfig()) - return logBackend; - - const auto& auditConfig = runConfig.AppConfig.GetAuditConfig(); - if (auditConfig.HasAuditFilePath()) { - const auto& filePath = auditConfig.GetAuditFilePath(); - try { - logBackend = new TFileLogBackend(filePath); - } catch (const TFileError& ex) { - Cerr << "TAuditLogBackendFactory: failed to open file '" << filePath << "': " << ex.what() << Endl; - exit(1); - } - } - - if (logBackend) { - return logBackend; - } - return NActors::CreateStderrBackend(); -} - -} // NKikimr diff --git a/ydb/apps/ydbd/log_backend.h b/ydb/apps/ydbd/log_backend.h deleted file mode 100644 index 6daab1cc13..0000000000 --- a/ydb/apps/ydbd/log_backend.h +++ /dev/null @@ -1,13 +0,0 @@ -#pragma once -#include <ydb/core/driver_lib/run/log_backend.h> - -namespace NKikimr { - -class TAuditLogBackendFactory : public TLogBackendFactory { -public: - virtual TAutoPtr<TLogBackend> CreateLogBackend( - const TKikimrRunConfig& runConfig, - NMonitoring::TDynamicCounterPtr counters) override; -}; - -} // NKikimr diff --git a/ydb/apps/ydbd/main.cpp b/ydb/apps/ydbd/main.cpp index 558efcaee1..3f24369b36 100644 --- a/ydb/apps/ydbd/main.cpp +++ b/ydb/apps/ydbd/main.cpp @@ -1,6 +1,5 @@ #include "sqs.h" #include "export.h" -#include "log_backend.h" #include <ydb/core/driver_lib/run/main.h> #include <ydb/core/security/ticket_parser.h> #include <ydb/core/ymq/actor/auth_factory.h> @@ -16,7 +15,6 @@ int main(int argc, char **argv) { SetupTerminateHandler(); auto factories = std::make_shared<NKikimr::TModuleFactories>(); - factories->AuditLogBackendFactory = std::make_shared<NKikimr::TAuditLogBackendFactory>(); factories->DataShardExportFactory = std::make_shared<TDataShardExportFactory>(); factories->CreateTicketParser = NKikimr::CreateTicketParser; factories->FolderServiceFactory = NKikimr::NFolderService::CreateMockFolderServiceActor; diff --git a/ydb/core/CMakeLists.txt b/ydb/core/CMakeLists.txt index 5064bfc71f..568ae75971 100644 --- a/ydb/core/CMakeLists.txt +++ b/ydb/core/CMakeLists.txt @@ -31,6 +31,7 @@ add_subdirectory(kesus) add_subdirectory(keyvalue) add_subdirectory(kqp) add_subdirectory(load_test) +add_subdirectory(log_backend) add_subdirectory(metering) add_subdirectory(mind) add_subdirectory(mon) diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin.txt b/ydb/core/driver_lib/run/CMakeLists.darwin.txt index dc4f71dca1..91bdf6787a 100644 --- a/ydb/core/driver_lib/run/CMakeLists.darwin.txt +++ b/ydb/core/driver_lib/run/CMakeLists.darwin.txt @@ -65,6 +65,7 @@ target_link_libraries(run PUBLIC ydb-core-kqp core-kqp-rm_service ydb-core-load_test + ydb-core-log_backend ydb-core-metering ydb-core-mind core-mind-address_classification @@ -141,7 +142,6 @@ target_sources(run PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/dummy.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/factories.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/kikimr_services_initializers.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/log_backend.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/main.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/run.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/service_initializer.cpp diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt index 8df330ddbd..49d1f9a476 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt @@ -66,6 +66,7 @@ target_link_libraries(run PUBLIC ydb-core-kqp core-kqp-rm_service ydb-core-load_test + ydb-core-log_backend ydb-core-metering ydb-core-mind core-mind-address_classification @@ -142,7 +143,6 @@ target_sources(run PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/dummy.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/factories.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/kikimr_services_initializers.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/log_backend.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/main.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/run.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/service_initializer.cpp diff --git a/ydb/core/driver_lib/run/CMakeLists.linux.txt b/ydb/core/driver_lib/run/CMakeLists.linux.txt index 8df330ddbd..49d1f9a476 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux.txt @@ -66,6 +66,7 @@ target_link_libraries(run PUBLIC ydb-core-kqp core-kqp-rm_service ydb-core-load_test + ydb-core-log_backend ydb-core-metering ydb-core-mind core-mind-address_classification @@ -142,7 +143,6 @@ target_sources(run PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/dummy.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/factories.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/kikimr_services_initializers.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/log_backend.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/main.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/run.cpp ${CMAKE_SOURCE_DIR}/ydb/core/driver_lib/run/service_initializer.cpp diff --git a/ydb/core/driver_lib/run/factories.h b/ydb/core/driver_lib/run/factories.h index f905621ce3..22e6b5f45b 100644 --- a/ydb/core/driver_lib/run/factories.h +++ b/ydb/core/driver_lib/run/factories.h @@ -1,5 +1,4 @@ #pragma once -#include "log_backend.h" #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_devicemode.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/tx/datashard/export_iface.h> @@ -31,10 +30,6 @@ namespace NKikimr { // A way to parameterize YDB binary, we do it via a set of factories struct TModuleFactories { - // A way to parameterize log backend - std::shared_ptr<ILogBackendFactory> LogBackendFactory; - // - std::shared_ptr<ILogBackendFactory> AuditLogBackendFactory; // A backend factory for Query Replay std::shared_ptr<NKqp::IQueryReplayBackendFactory> QueryReplayBackendFactory; // diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 4281556e7b..2036c12d5b 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -49,6 +49,8 @@ #include <ydb/core/grpc_services/grpc_mon.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> +#include <ydb/core/log_backend/log_backend.h> + #include <ydb/core/kesus/proxy/proxy.h> #include <ydb/core/kesus/tablet/tablet.h> @@ -2359,12 +2361,9 @@ TAuditWriterInitializer::TAuditWriterInitializer(const TKikimrRunConfig &runConf void TAuditWriterInitializer::InitializeServices(TActorSystemSetup* setup, const TAppData* appData) { - TAutoPtr<TLogBackend> fileBackend; - if (Factories && Factories->AuditLogBackendFactory) { - fileBackend = Factories->AuditLogBackendFactory->CreateLogBackend(KikimrRunConfig, appData->Counters); - } + auto fileBackend = CreateAuditLogBackendWithUnifiedAgent(KikimrRunConfig, appData->Counters); if (!fileBackend) - return; + return; const auto format = Config.GetAuditConfig().GetFormat(); auto actor = NAudit::CreateAuditWriter(std::move(fileBackend), format); diff --git a/ydb/core/driver_lib/run/log_backend.cpp b/ydb/core/driver_lib/run/log_backend.cpp deleted file mode 100644 index 3a33ab6cac..0000000000 --- a/ydb/core/driver_lib/run/log_backend.cpp +++ /dev/null @@ -1,32 +0,0 @@ -#include "log_backend.h" - -namespace NKikimr { - -TAutoPtr<TLogBackend> TLogBackendFactory::CreateLogBackendFromLogConfig(const TKikimrRunConfig& runConfig) { - TAutoPtr<TLogBackend> logBackend; - if (runConfig.AppConfig.HasLogConfig()) { - const auto& logConfig = runConfig.AppConfig.GetLogConfig(); - if (logConfig.HasSysLog() && logConfig.GetSysLog()) { - const TString& service = logConfig.GetSysLogService(); - logBackend = NActors::CreateSysLogBackend(service ? service : "KIKIMR", false, true); - } else if (logConfig.HasBackendFileName()) { - logBackend = NActors::CreateFileBackend(logConfig.GetBackendFileName()); - } - } - return logBackend; -} - -TAutoPtr<TLogBackend> TLogBackendFactory::CreateLogBackend( - const TKikimrRunConfig& runConfig, - ::NMonitoring::TDynamicCounterPtr counters) { - Y_UNUSED(counters); - TAutoPtr<TLogBackend> logBackend = CreateLogBackendFromLogConfig(runConfig); - if (logBackend) { - return logBackend; - } else { - return NActors::CreateStderrBackend(); - } -} - -} // NKikimr - diff --git a/ydb/core/driver_lib/run/log_backend.h b/ydb/core/driver_lib/run/log_backend.h deleted file mode 100644 index d641a0fd9d..0000000000 --- a/ydb/core/driver_lib/run/log_backend.h +++ /dev/null @@ -1,29 +0,0 @@ -#pragma once - -#include "config.h" -#include <library/cpp/logger/backend.h> - -namespace NKikimr { - -// Interface for LogBackend creation -class ILogBackendFactory { -public: - virtual TAutoPtr<TLogBackend> CreateLogBackend( - const TKikimrRunConfig& runConfig, - ::NMonitoring::TDynamicCounterPtr counters) = 0; - - virtual ~ILogBackendFactory() {} -}; - -// Default implementation if ILogBackendFactory -class TLogBackendFactory : public ILogBackendFactory { -protected: - TAutoPtr<TLogBackend> CreateLogBackendFromLogConfig(const TKikimrRunConfig& runConfig); - -public: - virtual TAutoPtr<TLogBackend> CreateLogBackend( - const TKikimrRunConfig& runConfig, - ::NMonitoring::TDynamicCounterPtr counters) override; -}; - -} // NKikimr diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index c25ff2807c..1f73ffea88 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -32,6 +32,7 @@ #include <ydb/core/formats/clickhouse_block.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/grpc_services/grpc_mon.h> +#include <ydb/core/log_backend/log_backend.h> #include <ydb/core/mon/sync_http_mon.h> #include <ydb/core/mon/async_http_mon.h> #include <ydb/core/mon/crossref.h> @@ -1089,13 +1090,8 @@ void TKikimrRunner::InitializeAppData(const TKikimrRunConfig& runConfig) void TKikimrRunner::InitializeLogSettings(const TKikimrRunConfig& runConfig) { - if (ModuleFactories && ModuleFactories->LogBackendFactory) { - auto logBackend = ModuleFactories->LogBackendFactory->CreateLogBackend(runConfig, Counters); - LogBackend.reset(logBackend.Release()); - } else { - auto logBackend = TLogBackendFactory().CreateLogBackend(runConfig, Counters); - LogBackend.reset(logBackend.Release()); - } + auto logBackend = CreateLogBackendWithUnifiedAgent(runConfig, Counters); + LogBackend.reset(logBackend.Release()); if (!runConfig.AppConfig.HasLogConfig()) return; diff --git a/ydb/core/log_backend/CMakeLists.darwin.txt b/ydb/core/log_backend/CMakeLists.darwin.txt new file mode 100644 index 0000000000..94831d48f3 --- /dev/null +++ b/ydb/core/log_backend/CMakeLists.darwin.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ydb-core-log_backend) +target_compile_options(ydb-core-log_backend PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ydb-core-log_backend PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-unified_agent_client + ydb-core-base +) +target_sources(ydb-core-log_backend PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend_build.cpp +) diff --git a/ydb/core/log_backend/CMakeLists.linux-aarch64.txt b/ydb/core/log_backend/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..57ef405c9b --- /dev/null +++ b/ydb/core/log_backend/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ydb-core-log_backend) +target_compile_options(ydb-core-log_backend PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ydb-core-log_backend PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-unified_agent_client + ydb-core-base +) +target_sources(ydb-core-log_backend PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend_build.cpp +) diff --git a/ydb/core/log_backend/CMakeLists.linux.txt b/ydb/core/log_backend/CMakeLists.linux.txt new file mode 100644 index 0000000000..57ef405c9b --- /dev/null +++ b/ydb/core/log_backend/CMakeLists.linux.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ydb-core-log_backend) +target_compile_options(ydb-core-log_backend PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ydb-core-log_backend PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-unified_agent_client + ydb-core-base +) +target_sources(ydb-core-log_backend PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/log_backend/log_backend_build.cpp +) diff --git a/ydb/core/log_backend/CMakeLists.txt b/ydb/core/log_backend/CMakeLists.txt new file mode 100644 index 0000000000..5bb4faffb4 --- /dev/null +++ b/ydb/core/log_backend/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/core/log_backend/log_backend.cpp b/ydb/core/log_backend/log_backend.cpp new file mode 100644 index 0000000000..7a5b970986 --- /dev/null +++ b/ydb/core/log_backend/log_backend.cpp @@ -0,0 +1,66 @@ +#include "log_backend.h" +#include "log_backend_build.h" +#include <ydb/core/base/counters.h> + +namespace NKikimr { + +TAutoPtr<TLogBackend> CreateLogBackendWithUnifiedAgent( + const TKikimrRunConfig& runConfig, + NMonitoring::TDynamicCounterPtr counters) +{ + if (runConfig.AppConfig.HasLogConfig()) { + const auto& logConfig = runConfig.AppConfig.GetLogConfig(); + TAutoPtr<TLogBackend> logBackend = TLogBackendBuildHelper::CreateLogBackendFromLogConfig(logConfig); + if (logConfig.HasUAClientConfig()) { + const auto& uaClientConfig = logConfig.GetUAClientConfig(); + auto uaCounters = GetServiceCounters(counters, "utils")->GetSubgroup("subsystem", "ua_client"); + auto logName = uaClientConfig.GetLogName(); + TAutoPtr<TLogBackend> uaLogBackend = TLogBackendBuildHelper::CreateLogBackendFromUAClientConfig(uaClientConfig, uaCounters, logName); + logBackend = logBackend ? NActors::CreateCompositeLogBackend({logBackend, uaLogBackend}) : uaLogBackend; + } + if (logBackend) { + return logBackend; + } + } + + return NActors::CreateStderrBackend(); +} + +TAutoPtr<TLogBackend> CreateAuditLogBackendWithUnifiedAgent( + const TKikimrRunConfig& runConfig, + NMonitoring::TDynamicCounterPtr counters) +{ + TAutoPtr<TLogBackend> logBackend; + if (!runConfig.AppConfig.HasAuditConfig()) + return logBackend; + + const auto& auditConfig = runConfig.AppConfig.GetAuditConfig(); + if (auditConfig.HasAuditFilePath()) { + const auto& filePath = auditConfig.GetAuditFilePath(); + try { + logBackend = new TFileLogBackend(filePath); + } catch (const TFileError& ex) { + Cerr << "TAuditLogBackendFactoryWithUnifiedAgent: failed to open file '" << filePath << "': " << ex.what() << Endl; + exit(1); + } + } + + if (auditConfig.GetUnifiedAgentEnable() && runConfig.AppConfig.HasLogConfig() && runConfig.AppConfig.GetLogConfig().HasUAClientConfig()) { + const auto& logConfig = runConfig.AppConfig.GetLogConfig(); + const auto& uaClientConfig = logConfig.GetUAClientConfig(); + auto uaCounters = GetServiceCounters(counters, "utils")->GetSubgroup("subsystem", "ua_client"); + auto logName = runConfig.AppConfig.GetAuditConfig().HasLogName() + ? runConfig.AppConfig.GetAuditConfig().GetLogName() + : uaClientConfig.GetLogName(); + TAutoPtr<TLogBackend> uaLogBackend = TLogBackendBuildHelper::CreateLogBackendFromUAClientConfig(uaClientConfig, uaCounters, logName); + logBackend = logBackend ? NActors::CreateCompositeLogBackend({logBackend, uaLogBackend}) : uaLogBackend; + } + + if (logBackend) { + return logBackend; + } + return NActors::CreateStderrBackend(); +} + +} // NKikimr + diff --git a/ydb/core/log_backend/log_backend.h b/ydb/core/log_backend/log_backend.h new file mode 100644 index 0000000000..f9de92f39b --- /dev/null +++ b/ydb/core/log_backend/log_backend.h @@ -0,0 +1,18 @@ +#pragma once + +#include <ydb/core/driver_lib/run/config.h> + +#include <library/cpp/logger/backend.h> + +namespace NKikimr { + +TAutoPtr<TLogBackend> CreateLogBackendWithUnifiedAgent( + const TKikimrRunConfig& runConfig, + NMonitoring::TDynamicCounterPtr counters); + +TAutoPtr<TLogBackend> CreateAuditLogBackendWithUnifiedAgent( + const TKikimrRunConfig& runConfig, + NMonitoring::TDynamicCounterPtr counters); + +} // NKikimr + diff --git a/ydb/core/log_backend/log_backend_build.cpp b/ydb/core/log_backend/log_backend_build.cpp new file mode 100644 index 0000000000..0bdd98ffeb --- /dev/null +++ b/ydb/core/log_backend/log_backend_build.cpp @@ -0,0 +1,51 @@ +#include "log_backend_build.h" +#include <util/system/getpid.h> +#include <library/cpp/actors/core/log.h> + +namespace NKikimr { + +TAutoPtr<TLogBackend> TLogBackendBuildHelper::CreateLogBackendFromLogConfig(const NKikimrConfig::TLogConfig& logConfig, const TString& defaultIdent) { + TAutoPtr<TLogBackend> logBackend; + if (logConfig.HasSysLog() && logConfig.GetSysLog()) { + const TString& service = logConfig.GetSysLogService(); + logBackend = NActors::CreateSysLogBackend(service ? service : defaultIdent, false, true); + } else if (logConfig.HasBackendFileName()) { + logBackend = NActors::CreateFileBackend(logConfig.GetBackendFileName()); + } + return logBackend; +} + +TAutoPtr<TLogBackend> TLogBackendBuildHelper::CreateLogBackendFromUAClientConfig(const NKikimrConfig::TUAClientConfig& uaClientConfig, NMonitoring::TDynamicCounterPtr uaCounters, const TString& logName) { + auto parameters = NUnifiedAgent::TClientParameters(uaClientConfig.GetUri()) + .SetCounters(uaCounters) + .SetMaxInflightBytes(uaClientConfig.GetMaxInflightBytes()); + if (uaClientConfig.HasSharedSecretKey()) { + parameters.SetSharedSecretKey(uaClientConfig.GetSharedSecretKey()); + } + if (uaClientConfig.HasGrpcReconnectDelayMs()) { + parameters.SetGrpcReconnectDelay(TDuration::MilliSeconds(uaClientConfig.GetGrpcReconnectDelayMs())); + } + if (uaClientConfig.HasGrpcSendDelayMs()) { + parameters.SetGrpcSendDelay(TDuration::MilliSeconds(uaClientConfig.GetGrpcSendDelayMs())); + } + if (uaClientConfig.HasGrpcMaxMessageSize()) { + parameters.SetGrpcMaxMessageSize(uaClientConfig.GetGrpcMaxMessageSize()); + } + if (uaClientConfig.HasClientLogFile()) { + TLog log(uaClientConfig.GetClientLogFile(), + static_cast<ELogPriority>(uaClientConfig.GetClientLogPriority())); + parameters.SetLog(log); + } + + auto sessionParameters = NUnifiedAgent::TSessionParameters(); + sessionParameters.Meta.ConstructInPlace(); + (*sessionParameters.Meta)["_pid"] = ToString(GetPID()); + if (logName) { + (*sessionParameters.Meta)["_log_name"] = logName; + } + + TAutoPtr<TLogBackend> uaLogBackend = MakeLogBackend(parameters, sessionParameters).Release(); + return uaLogBackend; +} + +} // NKikimr diff --git a/ydb/core/log_backend/log_backend_build.h b/ydb/core/log_backend/log_backend_build.h new file mode 100644 index 0000000000..ab672bb819 --- /dev/null +++ b/ydb/core/log_backend/log_backend_build.h @@ -0,0 +1,21 @@ +#pragma once + +#include <ydb/core/base/counters.h> +#include <ydb/core/protos/config.pb.h> +#include <library/cpp/logger/backend.h> +#include <library/cpp/unified_agent_client/backend.h> +#include <util/generic/ptr.h> + +namespace NKikimr { + +class TLogBackendBuildHelper { +public: + static TAutoPtr<TLogBackend> CreateLogBackendFromLogConfig( + const NKikimrConfig::TLogConfig& logConfig, + const TString& defaultIdent = "KIKIMR"); + static TAutoPtr<TLogBackend> CreateLogBackendFromUAClientConfig( + const NKikimrConfig::TUAClientConfig& uaClientConfig, + NMonitoring::TDynamicCounterPtr uaCounters, + const TString& logName); +}; +} |