diff options
author | hor911 <hor911@ydb.tech> | 2023-02-09 12:40:11 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-02-09 12:40:11 +0300 |
commit | 24689527cd888aa8a640ecb5077e656b3520d373 (patch) | |
tree | a613ff4cd9567b7113e8376a17f8b85897a42790 /library/cpp/unified_agent_client/client_impl.h | |
parent | 8642d3642932f03663ba7d2d9670707c192207fd (diff) | |
download | ydb-24689527cd888aa8a640ecb5077e656b3520d373.tar.gz |
Log backend move
Diffstat (limited to 'library/cpp/unified_agent_client/client_impl.h')
-rw-r--r-- | library/cpp/unified_agent_client/client_impl.h | 364 |
1 files changed, 364 insertions, 0 deletions
diff --git a/library/cpp/unified_agent_client/client_impl.h b/library/cpp/unified_agent_client/client_impl.h new file mode 100644 index 0000000000..6adadf92e3 --- /dev/null +++ b/library/cpp/unified_agent_client/client_impl.h @@ -0,0 +1,364 @@ +#pragma once + +#include <library/cpp/unified_agent_client/client.h> +#include <library/cpp/unified_agent_client/client_proto_weighing.h> +#include <library/cpp/unified_agent_client/counters.h> +#include <library/cpp/unified_agent_client/logger.h> +#include <library/cpp/unified_agent_client/variant.h> +#include <library/cpp/unified_agent_client/proto/unified_agent.grpc.pb.h> +#include <library/cpp/unified_agent_client/grpc_io.h> + +#include <library/cpp/logger/global/global.h> + +#include <util/generic/deque.h> +#include <util/system/mutex.h> + +namespace NUnifiedAgent::NPrivate { + class TClientSession; + class TGrpcCall; + class TForkProtector; + + class TClient: public IClient { + public: + explicit TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector); + + ~TClient() override; + + TClientSessionPtr CreateSession(const TSessionParameters& parameters) override; + + void StartTracing(ELogPriority logPriority) override; + + void FinishTracing() override; + + inline const TIntrusivePtr<TClientCounters>& GetCounters() const noexcept { + return Counters; + } + + inline NUnifiedAgentProto::UnifiedAgentService::Stub& GetStub() noexcept { + return *Stub; + } + + TScopeLogger CreateSessionLogger(); + + inline const TClientParameters& GetParameters() const noexcept { + return Parameters; + } + + inline grpc::CompletionQueue& GetCompletionQueue() noexcept { + return ActiveCompletionQueue->GetCompletionQueue(); + } + + void RegisterSession(TClientSession* session); + + void UnregisterSession(TClientSession* session); + + void PreFork(); + + void PostForkParent(); + + void PostForkChild(); + + void EnsureStarted(); + + private: + void EnsureStartedNoLock(); + + void EnsureStoppedNoLock(); + + private: + const TClientParameters Parameters; + std::shared_ptr<TForkProtector> ForkProtector; + TIntrusivePtr<TClientCounters> Counters; + TLog Log; + TLogger MainLogger; + TScopeLogger Logger; + std::shared_ptr<grpc::Channel> Channel; + std::unique_ptr<NUnifiedAgentProto::UnifiedAgentService::Stub> Stub; + THolder<TGrpcCompletionQueueHost> ActiveCompletionQueue; + std::atomic<size_t> SessionLogLabel; + TVector<TClientSession*> ActiveSessions; + bool Started; + bool Destroyed; + TAdaptiveLock Lock; + static std::atomic<ui64> Id; + }; + + class TForkProtector { + public: + TForkProtector(); + + void Register(TClient& client); + + void Unregister(TClient& client); + + static std::shared_ptr<TForkProtector> Get(bool createIfNotExists); + + private: + static void PreFork(); + + static void PostForkParent(); + + static void PostForkChild(); + + private: + TVector<TClient*> Clients; + grpc::GrpcLibraryCodegen GrpcInitializer; + bool Enabled; + TAdaptiveLock Lock; + + static std::weak_ptr<TForkProtector> Instance; + static TMutex InstanceLock; + static bool SubscribedToForks; + }; + + class TClientSession: public IClientSession { + public: + TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters); + + ~TClientSession(); + + void Send(TClientMessage&& message) override; + + NThreading::TFuture<void> CloseAsync(TInstant deadline) override; + + inline TClient& GetClient() noexcept { + return *Client; + } + + inline TScopeLogger& GetLogger() noexcept { + return Logger; + } + + inline TClientSessionCounters& GetCounters() noexcept { + return *Counters; + } + + inline TAsyncJoiner& GetAsyncJoiner() noexcept { + return AsyncJoiner; + } + + void PrepareInitializeRequest(NUnifiedAgentProto::Request& target); + + void PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target); + + void Acknowledge(ui64 seqNo); + + void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo); + + void OnGrpcCallFinished(); + + NThreading::TFuture<void> PreFork(); + + void PostForkParent(); + + void PostForkChild(); + + void SetAgentMaxReceiveMessage(size_t); + + private: + enum class EPollingStatus { + Active, + Inactive + }; + + struct TCloseRequestedEvent { + TInstant Deadline; + }; + + struct TMessageReceivedEvent { + TClientMessage Message; + size_t Size; + }; + + struct TPurgeWriteQueueStats { + size_t PurgedMessages{}; + size_t PurgedBytes{}; + }; + + using TEvent = std::variant<TCloseRequestedEvent, TMessageReceivedEvent>; + + public: + struct TPendingMessage { + TClientMessage Message; + size_t Size; + bool Skipped; + }; + + class TRequestBuilder { + public: + struct TAddResult; + + public: + TRequestBuilder(NUnifiedAgentProto::Request &target, size_t RequestPayloadLimitBytes, + TFMaybe<size_t> serializedRequestLimitBytes); + + TAddResult TryAddMessage(const TPendingMessage& message, size_t seqNo); + + void ResetCounters(); + + inline size_t GetSerializedRequestSize() const { + return SerializedRequestSize; + } + + inline size_t GetRequestPayloadSize() const { + return RequestPayloadSize; + } + + public: + struct TAddResult { + bool LimitExceeded; + size_t NewRequestPayloadSize; // == actual value, if !LimitExceeded + size_t NewSerializedRequestSize; // == actual value, if !LimitExceeded + }; + + private: + struct TMetaItemBuilder { + size_t ItemIndex; + size_t ValueIndex{0}; + }; + + private: + NUnifiedAgentProto::Request& Target; + TFMaybe<NPW::TRequest> PwTarget; + THashMap<TString, TMetaItemBuilder> MetaItems; + size_t RequestPayloadSize; + size_t RequestPayloadLimitBytes; + size_t SerializedRequestSize; + TFMaybe<size_t> SerializedRequestLimitBytes; + bool CountersInvalid; + }; + + private: + void MakeGrpcCall(); + + void DoClose(); + + void BeginClose(TInstant deadline); + + void Poll(); + + TPurgeWriteQueueStats PurgeWriteQueue(); + + void DoStart(); + + private: + TAsyncJoiner AsyncJoiner; + TIntrusivePtr<TClient> Client; + TFMaybe<TString> OriginalSessionId; + TFMaybe<TString> SessionId; + TFMaybe<THashMap<TString, TString>> Meta; + TScopeLogger Logger; + bool CloseStarted; + bool ForcedCloseStarted; + bool Closed; + bool ForkInProgressLocal; + bool Started; + NThreading::TPromise<void> ClosePromise; + TIntrusivePtr<TGrpcCall> ActiveGrpcCall; + TDeque<TPendingMessage> WriteQueue; + size_t TrimmedCount; + size_t NextIndex; + TFMaybe<ui64> AckSeqNo; + TInstant PollerLastEventTimestamp; + TIntrusivePtr<TClientSessionCounters> Counters; + THolder<TGrpcTimer> MakeGrpcCallTimer; + THolder<TGrpcTimer> ForceCloseTimer; + THolder<TGrpcTimer> PollTimer; + ui64 GrpcInflightMessages; + ui64 GrpcInflightBytes; + + std::atomic<size_t> InflightBytes; + bool CloseRequested; + size_t EventsBatchSize; + EPollingStatus PollingStatus; + THolder<TGrpcNotification> EventNotification; + bool EventNotificationTriggered; + TVector<TEvent> EventsBatch; + TVector<TEvent> SecondaryEventsBatch; + std::atomic<bool> ForkInProgress; + TAdaptiveLock Lock; + size_t MaxInflightBytes; + TFMaybe<size_t> AgentMaxReceiveMessage; + }; + + class TGrpcCall final: public TAtomicRefCount<TGrpcCall> { + public: + explicit TGrpcCall(TClientSession& session); + + void Start(); + + ~TGrpcCall(); + + void BeginClose(bool force); + + void Poison(); + + void NotifyMessageAdded(); + + inline bool Initialized() const { + return Initialized_; + } + + inline bool ReuseSessions() const { + return ReuseSessions_; + } + + private: + void EndAccept(EIOStatus status); + + void EndRead(EIOStatus status); + + void EndWrite(EIOStatus status); + + void EndFinish(EIOStatus status); + + void EndWritesDone(EIOStatus); + + void ScheduleWrite(); + + void BeginWritesDone(); + + bool CheckHasError(EIOStatus status, const char* method); + + void SetError(const TString& error); + + void EnsureFinishStarted(); + + void BeginRead(); + + void BeginWrite(); + + void ScheduleFinishOnError(); + + private: + TClientSession& Session; + TAsyncJoinerToken AsyncJoinerToken; + THolder<IIOCallback> AcceptTag; + THolder<IIOCallback> ReadTag; + THolder<IIOCallback> WriteTag; + THolder<IIOCallback> WritesDoneTag; + THolder<IIOCallback> FinishTag; + TScopeLogger Logger; + bool AcceptPending; + bool Initialized_; + bool ReadPending; + bool ReadsDone; + bool WritePending; + bool WritesBlocked; + bool WritesDonePending; + bool WritesDone; + bool ErrorOccured; + bool FinishRequested; + bool FinishStarted; + bool FinishDone; + bool Cancelled; + bool Poisoned; + bool PoisonPillSent; + bool ReuseSessions_; + grpc::Status FinishStatus; + grpc::ClientContext ClientContext; + std::unique_ptr<grpc::ClientAsyncReaderWriter<NUnifiedAgentProto::Request, NUnifiedAgentProto::Response>> ReaderWriter; + NUnifiedAgentProto::Request Request; + NUnifiedAgentProto::Response Response; + }; +} |