|
|
#pragma once
#include <library/cpp/unified_agent_client/counters.h>
#include <library/cpp/logger/log.h>
#include <library/cpp/threading/future/future.h>
#include <util/datetime/base.h>
#include <util/generic/hash.h>
#include <util/generic/maybe.h>
#include <util/generic/string.h>
namespace NUnifiedAgent {
struct TClientParameters {
// uri format https://github.com/grpc/grpc/blob/master/doc/naming.md
// for example: unix:///unified_agent for unix domain sockets or localhost:12345 for tcp
explicit TClientParameters(const TString& uri);
// Simple way to protect against writing to unintended/invalid Unified Agent endpoint.
// Must correspond to 'shared_secret_key' grpc input parameter
// (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6333542#L219),
// session would end with error otherwise.
//
// Default: not set
TClientParameters& SetSharedSecretKey(const TString& sharedSecretKey) {
SharedSecretKey = sharedSecretKey;
return *this;
}
// Max bytes count that have been received by client session but not acknowledged yet.
// When exceeded, new messages will be discarded, an error message
// will be written to the TLog instance and drop counter will be incremented.
//
// Default: 10 mb
TClientParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
MaxInflightBytes = maxInflightBytes;
return *this;
}
// TLog instance for client library's own logs.
//
// Default: TLoggerOperator<TGlobalLog>::Log()
TClientParameters& SetLog(TLog log) {
Log = std::move(log);
return *this;
}
// Throttle client library log by rate limit in bytes, excess will be discarded.
//
// Default: not set
TClientParameters& SetLogRateLimit(size_t bytesPerSec) {
LogRateLimitBytes = bytesPerSec;
return *this;
}
// Try to establish new grpc session if the current one become broken.
// Session may break either due to agent unavailability, or the agent itself may
// reject new session creation if it does not satisfy certain
// conditions - shared_secret_key does not match, the session creation rate has been
// exceeded, invalid session metadata has been used and so on.
// Attempts to establish a grpc session will continue indefinitely.
//
// Default: 50 millis
TClientParameters& SetGrpcReconnectDelay(TDuration delay) {
GrpcReconnectDelay = delay;
return *this;
}
// Grpc usually writes data to the socket faster than it comes from the client.
// This means that it's possible that each TClientMessage would be sent in it's own grpc message.
// This is expensive in terms of cpu, since grpc makes at least one syscall
// for each message on the sender and receiver sides.
// To avoid a large number of syscalls, the client holds incoming messages
// in internal buffer in hope of being able to assemble bigger grpc batch.
// This parameter sets the timeout for this delay - from IClientSession::Send
// call to the actual sending of the corresponding grpc message.
//
// Default: 10 millis.
TClientParameters& SetGrpcSendDelay(TDuration delay) {
GrpcSendDelay = delay;
return *this;
}
// Client library sends messages to grpc in batches, this parameter
// establishes upper limit on the size of single batch in bytes.
// If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185)
// in grpc input config, it must be grater than GrpcMaxMessageSize.
//
// Default: 1 mb
TClientParameters& SetGrpcMaxMessageSize(size_t size) {
GrpcMaxMessageSize = size;
return *this;
}
// Enable forks handling in client library.
// Multiple threads and concurrent forks are all supported is this regime.
//
// Default: false
TClientParameters& SetEnableForkSupport(bool value) {
EnableForkSupport = value;
return *this;
}
// Client library counters.
// App can set this to some leaf of it's TDynamicCounters tree.
// Actual provided counters are listed in TClientCounters.
//
// Default: not set
TClientParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
return SetCounters(MakeIntrusive<TClientCounters>(counters));
}
TClientParameters& SetCounters(const TIntrusivePtr<TClientCounters>& counters) {
Counters = counters;
return *this;
}
public:
static const size_t DefaultMaxInflightBytes;
static const size_t DefaultGrpcMaxMessageSize;
static const TDuration DefaultGrpcSendDelay;
public:
TString Uri;
TMaybe<TString> SharedSecretKey;
size_t MaxInflightBytes;
TLog Log;
TMaybe<size_t> LogRateLimitBytes;
TDuration GrpcReconnectDelay;
TDuration GrpcSendDelay;
bool EnableForkSupport;
size_t GrpcMaxMessageSize;
TIntrusivePtr<TClientCounters> Counters;
};
struct TSessionParameters {
TSessionParameters();
// Session unique identifier.
// It's guaranteed that for messages with the same sessionId relative
// ordering of the messages will be preserved at all processing stages
// in library, in Unified Agent and in other systems that respect ordering (e.g., Logbroker)
//
// Default: generated automatically by Unified Agent.
TSessionParameters& SetSessionId(TString sessionId) {
SessionId = std::move(sessionId);
return *this;
}
// Session metadata as key-value set.
// Can be used by agent filters and outputs for validation/routing/enrichment/etc.
//
// Default: not set
TSessionParameters& SetMeta(THashMap<TString, TString> meta) {
Meta = std::move(meta);
return *this;
}
// Session counters.
// Actual provided counters are listed in TClientSessionCounters.
//
// Default: A single common for all sessions subgroup of client TDynamicCounters instance
// with label ('session': 'default').
TSessionParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
return SetCounters(MakeIntrusive<TClientSessionCounters>(counters));
}
TSessionParameters& SetCounters(const TIntrusivePtr<TClientSessionCounters>& counters) {
Counters = counters;
return *this;
}
// Max bytes count that have been received by client session but not acknowledged yet.
// When exceeded, new messages will be discarded, an error message
// will be written to the TLog instance and drop counter will be incremented.
//
// Default: value from client settings
TSessionParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
MaxInflightBytes = maxInflightBytes;
return *this;
}
public:
TMaybe<TString> SessionId;
TMaybe<THashMap<TString, TString>> Meta;
TIntrusivePtr<TClientSessionCounters> Counters;
TMaybe<size_t> MaxInflightBytes;
};
// Message data to be sent to unified agent.
struct TClientMessage {
// Opaque message payload.
TString Payload;
// Message metadata as key-value set.
// Can be used by agent filters and outputs for validation/routing/enrichment/etc.
//
// Default: not set
TMaybe<THashMap<TString, TString>> Meta{};
// Message timestamp.
//
// Default: time the client library has received this instance of TClientMessage.
TMaybe<TInstant> Timestamp{};
};
// Message size as it is accounted in byte-related metrics (ReceivedBytes, InflightBytes, etc).
size_t SizeOf(const TClientMessage& message);
class IClientSession: public TAtomicRefCount<IClientSession> {
public:
virtual ~IClientSession() = default;
// Places the message into send queue. Actual grpc call may occur later asynchronously,
// based on settings GrpcSendDelay and GrpcMaxMessageSize.
// A message can be discarded if the limits defined by the GrpcMaxMessageSize and MaxInflightBytes
// settings are exceeded, or if the Close method has already been called.
// In this case an error message will be written to the TLog instance
// and drop counter will be incremented.
virtual void Send(TClientMessage&& message) = 0;
void Send(const TClientMessage& message) {
Send(TClientMessage(message));
}
// Waits until either all current inflight messages are
// acknowledged or the specified deadline is reached.
// Upon the deadline grpc connection would be forcefully dropped (via grpc::ClientContext::TryCancel).
virtual NThreading::TFuture<void> CloseAsync(TInstant deadline) = 0;
void Close(TInstant deadline) {
CloseAsync(deadline).Wait();
}
void Close(TDuration timeout = TDuration::Seconds(3)) {
Close(Now() + timeout);
}
};
using TClientSessionPtr = TIntrusivePtr<IClientSession>;
class IClient: public TAtomicRefCount<IClient> {
public:
virtual ~IClient() = default;
virtual TClientSessionPtr CreateSession(const TSessionParameters& parameters = {}) = 0;
virtual void StartTracing(ELogPriority) {
}
virtual void FinishTracing() {
}
};
using TClientPtr = TIntrusivePtr<IClient>;
TClientPtr MakeClient(const TClientParameters& parameters);
}
|