aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/unified_agent_client/client.h
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 10:54:08 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 12:28:07 +0300
commitfc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch)
treec15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/unified_agent_client/client.h
parent8a749596d40e91c896a1907afcd108d9221fbde1 (diff)
downloadydb-e9cbe5c5cf67db853d223fd365c9f05b695f7b96.tar.gz
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/unified_agent_client/client.h')
-rw-r--r--library/cpp/unified_agent_client/client.h256
1 files changed, 256 insertions, 0 deletions
diff --git a/library/cpp/unified_agent_client/client.h b/library/cpp/unified_agent_client/client.h
new file mode 100644
index 0000000000..62e1210803
--- /dev/null
+++ b/library/cpp/unified_agent_client/client.h
@@ -0,0 +1,256 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/counters.h>
+
+#include <library/cpp/logger/log.h>
+#include <library/cpp/threading/future/future.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/hash.h>
+#include <util/generic/maybe.h>
+#include <util/generic/string.h>
+
+namespace NUnifiedAgent {
+ struct TClientParameters {
+ // uri format https://github.com/grpc/grpc/blob/master/doc/naming.md
+ // for example: unix:///unified_agent for unix domain sockets or localhost:12345 for tcp
+ explicit TClientParameters(const TString& uri);
+
+ // Simple way to protect against writing to unintended/invalid Unified Agent endpoint.
+ // Must correspond to 'shared_secret_key' grpc input parameter
+ // (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6333542#L219),
+ // session would end with error otherwise.
+ //
+ // Default: not set
+ TClientParameters& SetSharedSecretKey(const TString& sharedSecretKey) {
+ SharedSecretKey = sharedSecretKey;
+ return *this;
+ }
+
+ // Max bytes count that have been received by client session but not acknowledged yet.
+ // When exceeded, new messages will be discarded, an error message
+ // will be written to the TLog instance and drop counter will be incremented.
+ //
+ // Default: 10 mb
+ TClientParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
+ MaxInflightBytes = maxInflightBytes;
+ return *this;
+ }
+
+ // TLog instance for client library's own logs.
+ //
+ // Default: TLoggerOperator<TGlobalLog>::Log()
+ TClientParameters& SetLog(TLog& log) {
+ Log = log;
+ return *this;
+ }
+
+ // Throttle client library log by rate limit in bytes, excess will be discarded.
+ //
+ // Default: not set
+ TClientParameters& SetLogRateLimit(size_t bytesPerSec) {
+ LogRateLimitBytes = bytesPerSec;
+ return *this;
+ }
+
+ // Try to establish new grpc session if the current one become broken.
+ // Session may break either due to agent unavailability, or the agent itself may
+ // reject new session creation if it does not satisfy certain
+ // conditions - shared_secret_key does not match, the session creation rate has been
+ // exceeded, invalid session metadata has been used and so on.
+ // Attempts to establish a grpc session will continue indefinitely.
+ //
+ // Default: 50 millis
+ TClientParameters& SetGrpcReconnectDelay(TDuration delay) {
+ GrpcReconnectDelay = delay;
+ return *this;
+ }
+
+ // Grpc usually writes data to the socket faster than it comes from the client.
+ // This means that it's possible that each TClientMessage would be sent in it's own grpc message.
+ // This is expensive in terms of cpu, since grpc makes at least one syscall
+ // for each message on the sender and receiver sides.
+ // To avoid a large number of syscalls, the client holds incoming messages
+ // in internal buffer in hope of being able to assemble bigger grpc batch.
+ // This parameter sets the timeout for this delay - from IClientSession::Send
+ // call to the actual sending of the corresponding grpc message.
+ //
+ // Default: 10 millis.
+ TClientParameters& SetGrpcSendDelay(TDuration delay) {
+ GrpcSendDelay = delay;
+ return *this;
+ }
+
+ // Client library sends messages to grpc in batches, this parameter
+ // establishes upper limit on the size of single batch in bytes.
+ // If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185)
+ // in grpc input config, it must be grater than GrpcMaxMessageSize.
+ //
+ // Default: 1 mb
+ TClientParameters& SetGrpcMaxMessageSize(size_t size) {
+ GrpcMaxMessageSize = size;
+ return *this;
+ }
+
+ // Enable forks handling in client library.
+ // Multiple threads and concurrent forks are all supported is this regime.
+ //
+ // Default: false
+ TClientParameters& SetEnableForkSupport(bool value) {
+ EnableForkSupport = value;
+ return *this;
+ }
+
+ // Client library counters.
+ // App can set this to some leaf of it's TDynamicCounters tree.
+ // Actual provided counters are listed in TClientCounters.
+ //
+ // Default: not set
+ TClientParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
+ return SetCounters(MakeIntrusive<TClientCounters>(counters));
+ }
+
+ TClientParameters& SetCounters(const TIntrusivePtr<TClientCounters>& counters) {
+ Counters = counters;
+ return *this;
+ }
+
+ public:
+ static const size_t DefaultMaxInflightBytes;
+ static const size_t DefaultGrpcMaxMessageSize;
+ static const TDuration DefaultGrpcSendDelay;
+
+ public:
+ TString Uri;
+ TMaybe<TString> SharedSecretKey;
+ size_t MaxInflightBytes;
+ TLog Log;
+ TMaybe<size_t> LogRateLimitBytes;
+ TDuration GrpcReconnectDelay;
+ TDuration GrpcSendDelay;
+ bool EnableForkSupport;
+ size_t GrpcMaxMessageSize;
+ TIntrusivePtr<TClientCounters> Counters;
+ };
+
+ struct TSessionParameters {
+ TSessionParameters();
+
+ // Session unique identifier.
+ // It's guaranteed that for messages with the same sessionId relative
+ // ordering of the messages will be preserved at all processing stages
+ // in library, in Unified Agent and in other systems that respect ordering (e.g., Logbroker)
+ //
+ // Default: generated automatically by Unified Agent.
+ TSessionParameters& SetSessionId(const TString& sessionId) {
+ SessionId = sessionId;
+ return *this;
+ }
+
+ // Session metadata as key-value set.
+ // Can be used by agent filters and outputs for validation/routing/enrichment/etc.
+ //
+ // Default: not set
+ TSessionParameters& SetMeta(const THashMap<TString, TString>& meta) {
+ Meta = meta;
+ return *this;
+ }
+
+ // Session counters.
+ // Actual provided counters are listed in TClientSessionCounters.
+ //
+ // Default: A single common for all sessions subgroup of client TDynamicCounters instance
+ // with label ('session': 'default').
+ TSessionParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
+ return SetCounters(MakeIntrusive<TClientSessionCounters>(counters));
+ }
+
+ TSessionParameters& SetCounters(const TIntrusivePtr<TClientSessionCounters>& counters) {
+ Counters = counters;
+ return *this;
+ }
+
+ // Max bytes count that have been received by client session but not acknowledged yet.
+ // When exceeded, new messages will be discarded, an error message
+ // will be written to the TLog instance and drop counter will be incremented.
+ //
+ // Default: value from client settings
+ TSessionParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
+ MaxInflightBytes = maxInflightBytes;
+ return *this;
+ }
+
+ public:
+ TMaybe<TString> SessionId;
+ TMaybe<THashMap<TString, TString>> Meta;
+ TIntrusivePtr<TClientSessionCounters> Counters;
+ TMaybe<size_t> MaxInflightBytes;
+ };
+
+ // Message data to be sent to unified agent.
+ struct TClientMessage {
+ // Opaque message payload.
+ TString Payload;
+
+ // Message metadata as key-value set.
+ // Can be used by agent filters and outputs for validation/routing/enrichment/etc.
+ //
+ // Default: not set
+ TMaybe<THashMap<TString, TString>> Meta{};
+
+ // Message timestamp.
+ //
+ // Default: time the client library has received this instance of TClientMessage.
+ TMaybe<TInstant> Timestamp{};
+ };
+
+ // Message size as it is accounted in byte-related metrics (ReceivedBytes, InflightBytes, etc).
+ size_t SizeOf(const TClientMessage& message);
+
+ class IClientSession: public TAtomicRefCount<IClientSession> {
+ public:
+ virtual ~IClientSession() = default;
+
+ // Places the message into send queue. Actual grpc call may occur later asynchronously,
+ // based on settings GrpcSendDelay and GrpcMaxMessageSize.
+ // A message can be discarded if the limits defined by the GrpcMaxMessageSize and MaxInflightBytes
+ // settings are exceeded, or if the Close method has already been called.
+ // In this case an error message will be written to the TLog instance
+ // and drop counter will be incremented.
+ virtual void Send(TClientMessage&& message) = 0;
+
+ void Send(const TClientMessage& message) {
+ Send(TClientMessage(message));
+ }
+
+ // Waits until either all current inflight messages are
+ // acknowledged or the specified deadline is reached.
+ // Upon the deadline grpc connection would be forcefully dropped (via grpc::ClientContext::TryCancel).
+ virtual NThreading::TFuture<void> CloseAsync(TInstant deadline) = 0;
+
+ void Close(TInstant deadline) {
+ CloseAsync(deadline).Wait();
+ }
+
+ void Close(TDuration timeout = TDuration::Seconds(3)) {
+ Close(Now() + timeout);
+ }
+ };
+ using TClientSessionPtr = TIntrusivePtr<IClientSession>;
+
+ class IClient: public TAtomicRefCount<IClient> {
+ public:
+ virtual ~IClient() = default;
+
+ virtual TClientSessionPtr CreateSession(const TSessionParameters& parameters = {}) = 0;
+
+ virtual void StartTracing(ELogPriority) {
+ }
+
+ virtual void FinishTracing() {
+ }
+ };
+ using TClientPtr = TIntrusivePtr<IClient>;
+
+ TClientPtr MakeClient(const TClientParameters& parameters);
+}