diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/unified_agent_client/client.h | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-e9cbe5c5cf67db853d223fd365c9f05b695f7b96.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/unified_agent_client/client.h')
-rw-r--r-- | library/cpp/unified_agent_client/client.h | 256 |
1 files changed, 256 insertions, 0 deletions
diff --git a/library/cpp/unified_agent_client/client.h b/library/cpp/unified_agent_client/client.h new file mode 100644 index 0000000000..62e1210803 --- /dev/null +++ b/library/cpp/unified_agent_client/client.h @@ -0,0 +1,256 @@ +#pragma once + +#include <library/cpp/unified_agent_client/counters.h> + +#include <library/cpp/logger/log.h> +#include <library/cpp/threading/future/future.h> + +#include <util/datetime/base.h> +#include <util/generic/hash.h> +#include <util/generic/maybe.h> +#include <util/generic/string.h> + +namespace NUnifiedAgent { + struct TClientParameters { + // uri format https://github.com/grpc/grpc/blob/master/doc/naming.md + // for example: unix:///unified_agent for unix domain sockets or localhost:12345 for tcp + explicit TClientParameters(const TString& uri); + + // Simple way to protect against writing to unintended/invalid Unified Agent endpoint. + // Must correspond to 'shared_secret_key' grpc input parameter + // (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6333542#L219), + // session would end with error otherwise. + // + // Default: not set + TClientParameters& SetSharedSecretKey(const TString& sharedSecretKey) { + SharedSecretKey = sharedSecretKey; + return *this; + } + + // Max bytes count that have been received by client session but not acknowledged yet. + // When exceeded, new messages will be discarded, an error message + // will be written to the TLog instance and drop counter will be incremented. + // + // Default: 10 mb + TClientParameters& SetMaxInflightBytes(size_t maxInflightBytes) { + MaxInflightBytes = maxInflightBytes; + return *this; + } + + // TLog instance for client library's own logs. + // + // Default: TLoggerOperator<TGlobalLog>::Log() + TClientParameters& SetLog(TLog& log) { + Log = log; + return *this; + } + + // Throttle client library log by rate limit in bytes, excess will be discarded. + // + // Default: not set + TClientParameters& SetLogRateLimit(size_t bytesPerSec) { + LogRateLimitBytes = bytesPerSec; + return *this; + } + + // Try to establish new grpc session if the current one become broken. + // Session may break either due to agent unavailability, or the agent itself may + // reject new session creation if it does not satisfy certain + // conditions - shared_secret_key does not match, the session creation rate has been + // exceeded, invalid session metadata has been used and so on. + // Attempts to establish a grpc session will continue indefinitely. + // + // Default: 50 millis + TClientParameters& SetGrpcReconnectDelay(TDuration delay) { + GrpcReconnectDelay = delay; + return *this; + } + + // Grpc usually writes data to the socket faster than it comes from the client. + // This means that it's possible that each TClientMessage would be sent in it's own grpc message. + // This is expensive in terms of cpu, since grpc makes at least one syscall + // for each message on the sender and receiver sides. + // To avoid a large number of syscalls, the client holds incoming messages + // in internal buffer in hope of being able to assemble bigger grpc batch. + // This parameter sets the timeout for this delay - from IClientSession::Send + // call to the actual sending of the corresponding grpc message. + // + // Default: 10 millis. + TClientParameters& SetGrpcSendDelay(TDuration delay) { + GrpcSendDelay = delay; + return *this; + } + + // Client library sends messages to grpc in batches, this parameter + // establishes upper limit on the size of single batch in bytes. + // If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185) + // in grpc input config, it must be grater than GrpcMaxMessageSize. + // + // Default: 1 mb + TClientParameters& SetGrpcMaxMessageSize(size_t size) { + GrpcMaxMessageSize = size; + return *this; + } + + // Enable forks handling in client library. + // Multiple threads and concurrent forks are all supported is this regime. + // + // Default: false + TClientParameters& SetEnableForkSupport(bool value) { + EnableForkSupport = value; + return *this; + } + + // Client library counters. + // App can set this to some leaf of it's TDynamicCounters tree. + // Actual provided counters are listed in TClientCounters. + // + // Default: not set + TClientParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) { + return SetCounters(MakeIntrusive<TClientCounters>(counters)); + } + + TClientParameters& SetCounters(const TIntrusivePtr<TClientCounters>& counters) { + Counters = counters; + return *this; + } + + public: + static const size_t DefaultMaxInflightBytes; + static const size_t DefaultGrpcMaxMessageSize; + static const TDuration DefaultGrpcSendDelay; + + public: + TString Uri; + TMaybe<TString> SharedSecretKey; + size_t MaxInflightBytes; + TLog Log; + TMaybe<size_t> LogRateLimitBytes; + TDuration GrpcReconnectDelay; + TDuration GrpcSendDelay; + bool EnableForkSupport; + size_t GrpcMaxMessageSize; + TIntrusivePtr<TClientCounters> Counters; + }; + + struct TSessionParameters { + TSessionParameters(); + + // Session unique identifier. + // It's guaranteed that for messages with the same sessionId relative + // ordering of the messages will be preserved at all processing stages + // in library, in Unified Agent and in other systems that respect ordering (e.g., Logbroker) + // + // Default: generated automatically by Unified Agent. + TSessionParameters& SetSessionId(const TString& sessionId) { + SessionId = sessionId; + return *this; + } + + // Session metadata as key-value set. + // Can be used by agent filters and outputs for validation/routing/enrichment/etc. + // + // Default: not set + TSessionParameters& SetMeta(const THashMap<TString, TString>& meta) { + Meta = meta; + return *this; + } + + // Session counters. + // Actual provided counters are listed in TClientSessionCounters. + // + // Default: A single common for all sessions subgroup of client TDynamicCounters instance + // with label ('session': 'default'). + TSessionParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) { + return SetCounters(MakeIntrusive<TClientSessionCounters>(counters)); + } + + TSessionParameters& SetCounters(const TIntrusivePtr<TClientSessionCounters>& counters) { + Counters = counters; + return *this; + } + + // Max bytes count that have been received by client session but not acknowledged yet. + // When exceeded, new messages will be discarded, an error message + // will be written to the TLog instance and drop counter will be incremented. + // + // Default: value from client settings + TSessionParameters& SetMaxInflightBytes(size_t maxInflightBytes) { + MaxInflightBytes = maxInflightBytes; + return *this; + } + + public: + TMaybe<TString> SessionId; + TMaybe<THashMap<TString, TString>> Meta; + TIntrusivePtr<TClientSessionCounters> Counters; + TMaybe<size_t> MaxInflightBytes; + }; + + // Message data to be sent to unified agent. + struct TClientMessage { + // Opaque message payload. + TString Payload; + + // Message metadata as key-value set. + // Can be used by agent filters and outputs for validation/routing/enrichment/etc. + // + // Default: not set + TMaybe<THashMap<TString, TString>> Meta{}; + + // Message timestamp. + // + // Default: time the client library has received this instance of TClientMessage. + TMaybe<TInstant> Timestamp{}; + }; + + // Message size as it is accounted in byte-related metrics (ReceivedBytes, InflightBytes, etc). + size_t SizeOf(const TClientMessage& message); + + class IClientSession: public TAtomicRefCount<IClientSession> { + public: + virtual ~IClientSession() = default; + + // Places the message into send queue. Actual grpc call may occur later asynchronously, + // based on settings GrpcSendDelay and GrpcMaxMessageSize. + // A message can be discarded if the limits defined by the GrpcMaxMessageSize and MaxInflightBytes + // settings are exceeded, or if the Close method has already been called. + // In this case an error message will be written to the TLog instance + // and drop counter will be incremented. + virtual void Send(TClientMessage&& message) = 0; + + void Send(const TClientMessage& message) { + Send(TClientMessage(message)); + } + + // Waits until either all current inflight messages are + // acknowledged or the specified deadline is reached. + // Upon the deadline grpc connection would be forcefully dropped (via grpc::ClientContext::TryCancel). + virtual NThreading::TFuture<void> CloseAsync(TInstant deadline) = 0; + + void Close(TInstant deadline) { + CloseAsync(deadline).Wait(); + } + + void Close(TDuration timeout = TDuration::Seconds(3)) { + Close(Now() + timeout); + } + }; + using TClientSessionPtr = TIntrusivePtr<IClientSession>; + + class IClient: public TAtomicRefCount<IClient> { + public: + virtual ~IClient() = default; + + virtual TClientSessionPtr CreateSession(const TSessionParameters& parameters = {}) = 0; + + virtual void StartTracing(ELogPriority) { + } + + virtual void FinishTracing() { + } + }; + using TClientPtr = TIntrusivePtr<IClient>; + + TClientPtr MakeClient(const TClientParameters& parameters); +} |