diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library/cpp/unified_agent_client/client_impl.cpp | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library/cpp/unified_agent_client/client_impl.cpp')
-rw-r--r-- | library/cpp/unified_agent_client/client_impl.cpp | 1274 |
1 files changed, 1274 insertions, 0 deletions
diff --git a/library/cpp/unified_agent_client/client_impl.cpp b/library/cpp/unified_agent_client/client_impl.cpp new file mode 100644 index 0000000000..4db98120fd --- /dev/null +++ b/library/cpp/unified_agent_client/client_impl.cpp @@ -0,0 +1,1274 @@ +#include "client_impl.h" +#include "helpers.h" + +#include <contrib/libs/grpc/include/grpc/grpc.h> +#include <contrib/libs/grpc/src/core/lib/gpr/string.h> +#include <contrib/libs/grpc/src/core/lib/gprpp/fork.h> +#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h> + +#include <util/charset/utf8.h> +#include <util/generic/size_literals.h> +#include <util/system/env.h> + +using namespace NThreading; +using namespace NMonitoring; + +namespace NUnifiedAgent::NPrivate { + std::shared_ptr<grpc::Channel> CreateChannel(const grpc::string& target) { + grpc::ChannelArguments args; + args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE); + args.SetMaxReceiveMessageSize(Max<int>()); + args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000); + args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000); + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100); + args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 200); + args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 5000); + args.SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000); + args.SetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE, 1024*1024); + return grpc::CreateCustomChannel(target, grpc::InsecureChannelCredentials(), args); + } + + void AddMeta(NUnifiedAgentProto::Request_Initialize& init, const TString& name, const TString& value) { + auto* metaItem = init.MutableMeta()->Add(); + metaItem->SetName(name); + metaItem->SetValue(value); + } + + std::atomic<ui64> TClient::Id{0}; + + TClient::TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector) + : Parameters(parameters) + , ForkProtector(forkProtector) + , Counters(parameters.Counters ? parameters.Counters : MakeIntrusive<TClientCounters>()) + , Log(parameters.Log) + , MainLogger(Log, MakeFMaybe(Parameters.LogRateLimitBytes)) + , Logger(MainLogger.Child(Sprintf("ua_%lu", Id.fetch_add(1)))) + , Channel(nullptr) + , Stub(nullptr) + , ActiveCompletionQueue(nullptr) + , SessionLogLabel(0) + , ActiveSessions() + , Started(false) + , Destroyed(false) + , Lock() + { + MainLogger.SetDroppedBytesCounter(&Counters->ClientLogDroppedBytes); + + if (ForkProtector != nullptr) { + ForkProtector->Register(*this); + } + + EnsureStarted(); + + YLOG_INFO(Sprintf("created, uri [%s]", Parameters.Uri.c_str())); + } + + TClient::~TClient() { + with_lock(Lock) { + Y_VERIFY(ActiveSessions.empty(), "active sessions found"); + + EnsureStoppedNoLock(); + + Destroyed = true; + } + + if (ForkProtector != nullptr) { + ForkProtector->Unregister(*this); + } + + YLOG_INFO(Sprintf("destroyed, uri [%s]", Parameters.Uri.c_str())); + } + + TClientSessionPtr TClient::CreateSession(const TSessionParameters& parameters) { + return MakeIntrusive<TClientSession>(this, parameters); + } + + void TClient::StartTracing(ELogPriority logPriority) { + MainLogger.StartTracing(logPriority); + StartGrpcTracing(); + YLOG_INFO("tracing started"); + } + + void TClient::FinishTracing() { + FinishGrpcTracing(); + MainLogger.FinishTracing(); + YLOG_INFO("tracing finished"); + } + + void TClient::RegisterSession(TClientSession* session) { + with_lock(Lock) { + ActiveSessions.push_back(session); + } + } + + void TClient::UnregisterSession(TClientSession* session) { + with_lock(Lock) { + const auto it = Find(ActiveSessions, session); + Y_VERIFY(it != ActiveSessions.end()); + ActiveSessions.erase(it); + } + } + + void TClient::PreFork() { + YLOG_INFO("pre fork started"); + + Lock.Acquire(); + + auto futures = TVector<TFuture<void>>(Reserve(ActiveSessions.size())); + for (auto* s: ActiveSessions) { + futures.push_back(s->PreFork()); + } + YLOG_INFO("waiting for sessions"); + WaitAll(futures).Wait(); + + EnsureStoppedNoLock(); + + YLOG_INFO("shutdown grpc executor"); + grpc_core::Executor::SetThreadingAll(false); + + YLOG_INFO("pre fork finished"); + } + + void TClient::PostForkParent() { + YLOG_INFO("post fork parent started"); + + if (!Destroyed) { + EnsureStartedNoLock(); + } + Lock.Release(); + + for (auto* s: ActiveSessions) { + s->PostForkParent(); + } + + YLOG_INFO("post fork parent finished"); + } + + void TClient::PostForkChild() { + YLOG_INFO("post fork child started"); + + Lock.Release(); + + for (auto* s: ActiveSessions) { + s->PostForkChild(); + } + + YLOG_INFO("post fork child finished"); + } + + void TClient::EnsureStarted() { + with_lock(Lock) { + EnsureStartedNoLock(); + } + } + + void TClient::EnsureStartedNoLock() { + // Lock must be held + + if (Started) { + return; + } + + Channel = CreateChannel(Parameters.Uri); + Stub = NUnifiedAgentProto::UnifiedAgentService::NewStub(Channel); + ActiveCompletionQueue = MakeHolder<TGrpcCompletionQueueHost>(); + ActiveCompletionQueue->Start(); + + Started = true; + } + + void TClient::EnsureStoppedNoLock() { + // Lock must be held + + if (!Started) { + return; + } + + YLOG_INFO("stopping"); + ActiveCompletionQueue->Stop(); + ActiveCompletionQueue = nullptr; + Stub = nullptr; + Channel = nullptr; + YLOG_INFO("stopped"); + + Started = false; + } + + TScopeLogger TClient::CreateSessionLogger() { + return Logger.Child(ToString(SessionLogLabel.fetch_add(1))); + } + + TForkProtector::TForkProtector() + : Clients() + , GrpcInitializer() + , Enabled(grpc_core::Fork::Enabled()) + , Lock() + { + } + + void TForkProtector::Register(TClient& client) { + if (!Enabled) { + return; + } + + Y_VERIFY(grpc_is_initialized()); + Y_VERIFY(grpc_core::Fork::Enabled()); + + with_lock(Lock) { + Clients.push_back(&client); + } + } + + void TForkProtector::Unregister(TClient& client) { + if (!Enabled) { + return; + } + + with_lock(Lock) { + const auto it = Find(Clients, &client); + Y_VERIFY(it != Clients.end()); + Clients.erase(it); + } + } + + std::shared_ptr<TForkProtector> TForkProtector::Get(bool createIfNotExists) { + with_lock(InstanceLock) { + auto result = Instance.lock(); + if (!result && createIfNotExists) { + SetEnv("GRPC_ENABLE_FORK_SUPPORT", "true"); + result = std::make_shared<TForkProtector>(); + if (!result->Enabled) { + TLog log("cerr"); + TLogger logger(log, Nothing()); + auto scopeLogger = logger.Child("ua client"); + YLOG(TLOG_WARNING, + "Grpc is already initialized, can't enable fork support. " + "If forks are possible, please set environment variable GRPC_ENABLE_FORK_SUPPORT to 'true'. " + "If not, you can suppress this warning by setting EnableForkSupport " + "to false when creating the ua client.", + scopeLogger); + } else if (!SubscribedToForks) { + SubscribedToForks = true; + #ifdef _unix_ + pthread_atfork( + &TForkProtector::PreFork, + &TForkProtector::PostForkParent, + &TForkProtector::PostForkChild); + #endif + } + + Instance = result; + } + return result; + } + } + + void TForkProtector::PreFork() { + auto self = Get(false); + if (!self) { + return; + } + self->Lock.Acquire(); + for (auto* c : self->Clients) { + c->PreFork(); + } + } + + void TForkProtector::PostForkParent() { + auto self = Get(false); + if (!self) { + return; + } + for (auto* c : self->Clients) { + c->PostForkParent(); + } + self->Lock.Release(); + } + + void TForkProtector::PostForkChild() { + auto self = Get(false); + if (!self) { + return; + } + for (auto* c : self->Clients) { + c->PostForkChild(); + } + self->Lock.Release(); + } + + std::weak_ptr<TForkProtector> TForkProtector::Instance{}; + TMutex TForkProtector::InstanceLock{}; + bool TForkProtector::SubscribedToForks{false}; + + TClientSession::TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters) + : AsyncJoiner() + , Client(client) + , OriginalSessionId(MakeFMaybe(parameters.SessionId)) + , SessionId(OriginalSessionId) + , Meta(MakeFMaybe(parameters.Meta)) + , Logger(Client->CreateSessionLogger()) + , CloseStarted(false) + , ForcedCloseStarted(false) + , Closed(false) + , ForkInProgressLocal(false) + , Started(false) + , ClosePromise() + , ActiveGrpcCall(nullptr) + , WriteQueue() + , TrimmedCount(0) + , NextIndex(0) + , AckSeqNo(Nothing()) + , PollerLastEventTimestamp() + , Counters(parameters.Counters ? parameters.Counters : Client->GetCounters()->GetDefaultSessionCounters()) + , MakeGrpcCallTimer(nullptr) + , ForceCloseTimer(nullptr) + , PollTimer(nullptr) + , GrpcInflightMessages(0) + , GrpcInflightBytes(0) + , InflightBytes(0) + , CloseRequested(false) + , EventsBatchSize(0) + , PollingStatus(EPollingStatus::Inactive) + , EventNotification(nullptr) + , EventNotificationTriggered(false) + , EventsBatch() + , SecondaryEventsBatch() + , ForkInProgress(false) + , Lock() + , MaxInflightBytes( + parameters.MaxInflightBytes.GetOrElse(Client->GetParameters().MaxInflightBytes)) + , AgentMaxReceiveMessage(Nothing()) { + if (Meta.Defined() && !IsUtf8(*Meta)) { + throw std::runtime_error("session meta contains non UTF-8 characters"); + } + Y_ENSURE(!(Client->GetParameters().EnableForkSupport && SessionId.Defined()), + "explicit session id is not supported with forks"); + Client->RegisterSession(this); + + with_lock(Lock) { + DoStart(); + } + } + + TFuture<void> TClientSession::PreFork() { + YLOG_INFO("pre fork started"); + + Lock.Acquire(); + + YLOG_INFO("triggering event notification"); + if (!EventNotificationTriggered) { + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + + YLOG_INFO("setting 'fork in progress' flag"); + ForkInProgress.store(true); + + if (!Started) { + ClosePromise.TrySetValue(); + } + YLOG_INFO("pre fork finished"); + return ClosePromise.GetFuture(); + } + + void TClientSession::PostForkParent() { + YLOG_INFO("post fork parent started"); + ForkInProgress.store(false); + ForkInProgressLocal = false; + Started = false; + + if (!CloseRequested) { + DoStart(); + + YLOG_INFO("triggering event notification"); + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + + Lock.Release(); + + YLOG_INFO("post fork parent finished"); + } + + void TClientSession::PostForkChild() { + YLOG_INFO("post fork child started"); + ForkInProgress.store(false); + ForkInProgressLocal = false; + Started = false; + + SessionId.Clear(); + TrimmedCount = 0; + NextIndex = 0; + AckSeqNo.Clear(); + PurgeWriteQueue(); + EventsBatch.clear(); + SecondaryEventsBatch.clear(); + EventsBatchSize = 0; + + Lock.Release(); + + YLOG_INFO("post fork child finished"); + } + + void TClientSession::SetAgentMaxReceiveMessage(size_t newValue) { + AgentMaxReceiveMessage = newValue; + } + + void TClientSession::DoStart() { + // Lock must be held + + Y_VERIFY(!Started); + YLOG_INFO("starting"); + + Client->EnsureStarted(); + + MakeGrpcCallTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + if (status == EIOStatus::Error) { + return; + } + MakeGrpcCall(); + }, &AsyncJoiner)); + ForceCloseTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + if (status == EIOStatus::Error) { + return; + } + YLOG_INFO("ForceCloseTimer"); + BeginClose(TInstant::Zero()); + }, &AsyncJoiner)); + PollTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + if (status == EIOStatus::Error) { + return; + } + Poll(); + }, &AsyncJoiner)); + EventNotification = MakeHolder<TGrpcNotification>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + Y_VERIFY(status == EIOStatus::Ok); + Poll(); + }, &AsyncJoiner)); + + CloseStarted = false; + ForcedCloseStarted = false; + Closed = false; + ClosePromise = NewPromise(); + EventNotificationTriggered = false; + PollerLastEventTimestamp = Now(); + PollingStatus = EPollingStatus::Inactive; + + ++Client->GetCounters()->ActiveSessionsCount; + MakeGrpcCallTimer->Set(Now()); + YLOG_INFO(Sprintf("started, sessionId [%s]", OriginalSessionId.GetOrElse("").c_str())); + + Started = true; + } + + void TClientSession::MakeGrpcCall() { + if (Closed) { + YLOG_INFO("MakeGrpcCall, session already closed"); + return; + } + Y_VERIFY(!ForcedCloseStarted); + Y_VERIFY(!ActiveGrpcCall); + ActiveGrpcCall = MakeIntrusive<TGrpcCall>(*this); + ActiveGrpcCall->Start(); + ++Counters->GrpcCalls; + if (CloseStarted) { + ActiveGrpcCall->BeginClose(false); + } + } + + TClientSession::~TClientSession() { + Close(TInstant::Zero()); + AsyncJoiner.Join().Wait(); + Client->UnregisterSession(this); + YLOG_INFO("destroyed"); + } + + void TClientSession::Send(TClientMessage&& message) { + const auto messageSize = SizeOf(message); + ++Counters->ReceivedMessages; + Counters->ReceivedBytes += messageSize; + if (messageSize > Client->GetParameters().GrpcMaxMessageSize) { + YLOG_ERR(Sprintf("message size [%lu] is greater than max grpc message size [%lu], message dropped", + messageSize, Client->GetParameters().GrpcMaxMessageSize)); + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + if (message.Meta.Defined() && !IsUtf8(*message.Meta)) { + YLOG_ERR("message meta contains non UTF-8 characters, message dropped"); + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + if (!message.Timestamp.Defined()) { + message.Timestamp = TInstant::Now(); + } + ++Counters->InflightMessages; + Counters->InflightBytes += messageSize; + { + auto g = Guard(Lock); + + if (!Started) { + DoStart(); + } + + if (CloseRequested) { + g.Release(); + YLOG_ERR(Sprintf("session is closing, message dropped, [%lu] bytes", messageSize)); + --Counters->InflightMessages; + Counters->InflightBytes -= messageSize; + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + if (InflightBytes.load() + messageSize > MaxInflightBytes) { + g.Release(); + YLOG_ERR(Sprintf("max inflight of [%lu] bytes reached, [%lu] bytes dropped", + MaxInflightBytes, messageSize)); + --Counters->InflightMessages; + Counters->InflightBytes -= messageSize; + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + InflightBytes.fetch_add(messageSize); + EventsBatch.push_back(TMessageReceivedEvent{std::move(message), messageSize}); + EventsBatchSize += messageSize; + if ((PollingStatus == EPollingStatus::Inactive || + EventsBatchSize >= Client->GetParameters().GrpcMaxMessageSize) && + !EventNotificationTriggered) + { + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + } + } + + TFuture<void> TClientSession::CloseAsync(TInstant deadline) { + YLOG_INFO(Sprintf("close, deadline [%s]", ToString(deadline).c_str())); + if (!ClosePromise.GetFuture().HasValue()) { + with_lock(Lock) { + if (!Started) { + return MakeFuture(); + } + + CloseRequested = true; + + EventsBatch.push_back(TCloseRequestedEvent{deadline}); + if (!EventNotificationTriggered) { + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + } + } + return ClosePromise.GetFuture(); + } + + void TClientSession::BeginClose(TInstant deadline) { + if (Closed) { + return; + } + if (!CloseStarted) { + CloseStarted = true; + YLOG_INFO("close started"); + } + const auto force = deadline == TInstant::Zero(); + if (force && !ForcedCloseStarted) { + ForcedCloseStarted = true; + YLOG_INFO("forced close started"); + } + if (!ActiveGrpcCall && (ForcedCloseStarted || WriteQueue.empty())) { + DoClose(); + } else { + if (!force) { + ForceCloseTimer->Set(deadline); + } + if (ActiveGrpcCall) { + ActiveGrpcCall->BeginClose(ForcedCloseStarted); + } + } + } + + void TClientSession::Poll() { + if (ForkInProgressLocal) { + return; + } + + const auto now = Now(); + const auto sendDelay = Client->GetParameters().GrpcSendDelay; + const auto oldPollingStatus = PollingStatus; + + { + if (!Lock.TryAcquire()) { + TSpinWait sw; + + while (Lock.IsLocked() || !Lock.TryAcquire()) { + if (ForkInProgress.load()) { + YLOG_INFO("poller 'fork in progress' signal received, stopping session"); + ForkInProgressLocal = true; + if (!ActiveGrpcCall || !ActiveGrpcCall->Initialized()) { + BeginClose(TInstant::Max()); + } else if (ActiveGrpcCall->ReuseSessions()) { + ActiveGrpcCall->Poison(); + BeginClose(TInstant::Max()); + } else { + BeginClose(TInstant::Zero()); + } + return; + } + sw.Sleep(); + } + } + + if (!EventsBatch.empty()) { + DoSwap(EventsBatch, SecondaryEventsBatch); + EventsBatchSize = 0; + PollerLastEventTimestamp = now; + } + const auto needNextPollStep = sendDelay != TDuration::Zero() && + !CloseRequested && + (now - PollerLastEventTimestamp) < 10 * sendDelay; + PollingStatus = needNextPollStep ? EPollingStatus::Active : EPollingStatus::Inactive; + EventNotificationTriggered = false; + + Lock.Release(); + } + + if (PollingStatus == EPollingStatus::Active) { + PollTimer->Set(now + sendDelay); + } + if (PollingStatus != oldPollingStatus) { + YLOG_DEBUG(Sprintf("poller %s", PollingStatus == EPollingStatus::Active ? "started" : "stopped")); + } + if (auto& batch = SecondaryEventsBatch; !batch.empty()) { + auto closeIt = FindIf(batch, [](const auto& e) { + return std::holds_alternative<TCloseRequestedEvent>(e); + }); + + if (auto it = begin(batch); it != closeIt) { + Y_VERIFY(!CloseStarted); + do { + auto& e = std::get<TMessageReceivedEvent>(*it++); + WriteQueue.push_back({std::move(e.Message), e.Size, false}); + } while (it != closeIt); + if (ActiveGrpcCall) { + ActiveGrpcCall->NotifyMessageAdded(); + } + } + + for (auto endIt = end(batch); closeIt != endIt; ++closeIt) { + const auto& e = std::get<TCloseRequestedEvent>(*closeIt); + BeginClose(e.Deadline); + } + + batch.clear(); + } + }; + + void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) { + auto& initializeMessage = *target.MutableInitialize(); + if (SessionId.Defined()) { + initializeMessage.SetSessionId(*SessionId); + } + if (Client->GetParameters().SharedSecretKey.Defined()) { + initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey); + } + if (Meta.Defined()) { + for (const auto& p: *Meta) { + AddMeta(initializeMessage, p.first, p.second); + } + } + if (!Meta.Defined() || Meta->find("_reusable") == Meta->end()) { + AddMeta(initializeMessage, "_reusable", "true"); + } + } + + TClientSession::TRequestBuilder::TRequestBuilder(NUnifiedAgentProto::Request& target, size_t RequestPayloadLimitBytes, + TFMaybe<size_t> serializedRequestLimitBytes) + : Target(target) + , PwTarget(MakeFMaybe<NPW::TRequest>()) + , MetaItems() + , RequestPayloadSize(0) + , RequestPayloadLimitBytes(RequestPayloadLimitBytes) + , SerializedRequestSize(0) + , SerializedRequestLimitBytes(serializedRequestLimitBytes) + , CountersInvalid(false) + { + } + + void TClientSession::TRequestBuilder::ResetCounters() { + RequestPayloadSize = 0; + SerializedRequestSize = 0; + PwTarget.Clear(); + PwTarget.ConstructInPlace(); + CountersInvalid = false; + } + + TClientSession::TRequestBuilder::TAddResult TClientSession::TRequestBuilder::TryAddMessage( + const TPendingMessage& message, size_t seqNo) { + Y_VERIFY(!CountersInvalid); + { + // add item to pwRequest to increase calculated size + PwTarget->DataBatch.SeqNo.Add(seqNo); + PwTarget->DataBatch.Timestamp.Add(message.Message.Timestamp->MicroSeconds()); + PwTarget->DataBatch.Payload.Add().SetValue(message.Message.Payload); + if (message.Message.Meta.Defined()) { + for (const auto &m: *message.Message.Meta) { + TMetaItemBuilder *metaItemBuilder = nullptr; + { + auto it = MetaItems.find(m.first); + if (it == MetaItems.end()) { + PwTarget->DataBatch.Meta.Add().Key.SetValue(m.first); + } else { + metaItemBuilder = &it->second; + } + } + size_t metaItemIdx = (metaItemBuilder != nullptr) ? metaItemBuilder->ItemIndex : + PwTarget->DataBatch.Meta.GetSize() - 1; + auto &pwMetaItem = PwTarget->DataBatch.Meta.Get(metaItemIdx); + pwMetaItem.Value.Add().SetValue(m.second); + const auto index = Target.GetDataBatch().SeqNoSize(); + if ((metaItemBuilder != nullptr && metaItemBuilder->ValueIndex != index) || + (metaItemBuilder == nullptr && index != 0)) { + const auto valueIdx = (metaItemBuilder) ? metaItemBuilder->ValueIndex : 0; + pwMetaItem.SkipStart.Add(valueIdx); + pwMetaItem.SkipLength.Add(index - valueIdx); + } + } + } + } + const auto newSerializedRequestSize = PwTarget->ByteSizeLong(); + const auto newPayloadSize = RequestPayloadSize + message.Size; + if ((SerializedRequestLimitBytes.Defined() && newSerializedRequestSize > *SerializedRequestLimitBytes) || + newPayloadSize > RequestPayloadLimitBytes) { + CountersInvalid = true; + return {true, newPayloadSize, newSerializedRequestSize}; + } + + { + // add item to the real request + auto& batch = *Target.MutableDataBatch(); + batch.AddSeqNo(seqNo); + batch.AddTimestamp(message.Message.Timestamp->MicroSeconds()); + batch.AddPayload(message.Message.Payload); + if (message.Message.Meta.Defined()) { + for (const auto &m: *message.Message.Meta) { + TMetaItemBuilder *metaItemBuilder; + { + auto it = MetaItems.find(m.first); + if (it == MetaItems.end()) { + batch.AddMeta()->SetKey(m.first); + auto insertResult = MetaItems.insert({m.first, {batch.MetaSize() - 1}}); + Y_VERIFY(insertResult.second); + metaItemBuilder = &insertResult.first->second; + } else { + metaItemBuilder = &it->second; + } + } + auto *metaItem = batch.MutableMeta(metaItemBuilder->ItemIndex); + metaItem->AddValue(m.second); + const auto index = batch.SeqNoSize() - 1; + if (metaItemBuilder->ValueIndex != index) { + metaItem->AddSkipStart(metaItemBuilder->ValueIndex); + metaItem->AddSkipLength(index - metaItemBuilder->ValueIndex); + } + metaItemBuilder->ValueIndex = index + 1; + } + } + SerializedRequestSize = newSerializedRequestSize; + RequestPayloadSize = newPayloadSize; + } + + return {false, newPayloadSize, newSerializedRequestSize}; + } + + void TClientSession::PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target) { + Y_VERIFY(AckSeqNo.Defined()); + TRequestBuilder requestBuilder(target, Client->GetParameters().GrpcMaxMessageSize, AgentMaxReceiveMessage); + const auto startIndex = NextIndex - TrimmedCount; + for (size_t i = startIndex; i < WriteQueue.size(); ++i) { + auto& queueItem = WriteQueue[i]; + if (queueItem.Skipped) { + NextIndex++; + continue; + } + + const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1); + const auto serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0; + if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) { + YLOG_ERR(Sprintf("single serialized message is too large [%lu] > [%lu], dropping it", + addResult.NewSerializedRequestSize, serializedLimitToLog)); + queueItem.Skipped = true; + ++Counters->DroppedMessages; + Counters->DroppedBytes += queueItem.Size; + ++Counters->ErrorsCount; + NextIndex++; + requestBuilder.ResetCounters(); + continue; + } + if (addResult.LimitExceeded) { + YLOG_DEBUG(Sprintf( + "batch limit exceeded: [%lu] > [%lu] (limit for serialized batch)" + "OR [%lu] > [%lu] (limit for raw batch)", + addResult.NewSerializedRequestSize, serializedLimitToLog, + addResult.NewRequestPayloadSize, Client->GetParameters().GrpcMaxMessageSize)); + break; + } + + NextIndex++; + } + const auto messagesCount = target.GetDataBatch().SeqNoSize(); + if (messagesCount == 0) { + return; + } + Y_VERIFY(requestBuilder.GetSerializedRequestSize() == target.ByteSizeLong(), + "failed to calculate size for message [%s]", target.ShortDebugString().c_str()); + GrpcInflightMessages += messagesCount; + GrpcInflightBytes += requestBuilder.GetRequestPayloadSize(); + YLOG_DEBUG(Sprintf("new write batch, [%lu] messages, [%lu] bytes, first seq_no [%lu], serialized size [%lu]", + messagesCount, requestBuilder.GetRequestPayloadSize(), + *target.GetDataBatch().GetSeqNo().begin(), requestBuilder.GetSerializedRequestSize())); + ++Counters->GrpcWriteBatchRequests; + Counters->GrpcInflightMessages += messagesCount; + Counters->GrpcInflightBytes += requestBuilder.GetRequestPayloadSize(); + } + + void TClientSession::Acknowledge(ui64 seqNo) { + size_t messagesCount = 0; + size_t bytesCount = 0; + size_t skippedMessagesCount = 0; + size_t skippedBytesCount = 0; + + if (AckSeqNo.Defined()) { + while (!WriteQueue.empty() && ((*AckSeqNo < seqNo) || WriteQueue.front().Skipped)) { + if (WriteQueue.front().Skipped) { + skippedMessagesCount++; + skippedBytesCount += WriteQueue.front().Size; + } else { + ++messagesCount; + bytesCount += WriteQueue.front().Size; + } + ++(*AckSeqNo); + WriteQueue.pop_front(); + ++TrimmedCount; + } + } + if (!AckSeqNo.Defined() || seqNo > *AckSeqNo) { + AckSeqNo = seqNo; + } + + Counters->AcknowledgedMessages += messagesCount; + Counters->AcknowledgedBytes += bytesCount; + Counters->InflightMessages -= (messagesCount + skippedMessagesCount); + Counters->InflightBytes -= (bytesCount + skippedBytesCount); + InflightBytes.fetch_sub(bytesCount); + Counters->GrpcInflightMessages -= messagesCount; + Counters->GrpcInflightBytes -= bytesCount; + GrpcInflightMessages -= messagesCount; + GrpcInflightBytes -= bytesCount; + + YLOG_DEBUG(Sprintf("ack [%lu], [%lu] messages, [%lu] bytes", seqNo, messagesCount, bytesCount)); + } + + void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) { + SessionId = sessionId; + Acknowledge(lastSeqNo); + NextIndex = TrimmedCount; + ++Counters->GrpcCallsInitialized; + Counters->GrpcInflightMessages -= GrpcInflightMessages; + Counters->GrpcInflightBytes -= GrpcInflightBytes; + GrpcInflightMessages = 0; + GrpcInflightBytes = 0; + YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%lu]", + sessionId.c_str(), lastSeqNo)); + } + + void TClientSession::OnGrpcCallFinished() { + Y_VERIFY(!Closed); + Y_VERIFY(ActiveGrpcCall); + ActiveGrpcCall = nullptr; + if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) { + DoClose(); + } else { + const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay; + MakeGrpcCallTimer->Set(reconnectTime); + YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str())); + } + } + + auto TClientSession::PurgeWriteQueue() -> TPurgeWriteQueueStats { + size_t bytesCount = 0; + for (const auto& m: WriteQueue) { + bytesCount += m.Size; + } + auto result = TPurgeWriteQueueStats{WriteQueue.size(), bytesCount}; + + Counters->DroppedMessages += WriteQueue.size(); + Counters->DroppedBytes += bytesCount; + Counters->InflightMessages -= WriteQueue.size(); + Counters->InflightBytes -= bytesCount; + Counters->GrpcInflightMessages -= GrpcInflightMessages; + Counters->GrpcInflightBytes -= GrpcInflightBytes; + + InflightBytes.fetch_sub(bytesCount); + GrpcInflightMessages = 0; + GrpcInflightBytes = 0; + WriteQueue.clear(); + + return result; + } + + void TClientSession::DoClose() { + Y_VERIFY(CloseStarted); + Y_VERIFY(!Closed); + Y_VERIFY(!ClosePromise.HasValue()); + MakeGrpcCallTimer->Cancel(); + ForceCloseTimer->Cancel(); + PollTimer->Cancel(); + if (!ForkInProgressLocal && WriteQueue.size() > 0) { + const auto stats = PurgeWriteQueue(); + ++Counters->ErrorsCount; + YLOG_ERR(Sprintf("DoClose, dropped [%lu] messages, [%lu] bytes", + stats.PurgedMessages, stats.PurgedBytes)); + } + --Client->GetCounters()->ActiveSessionsCount; + Closed = true; + ClosePromise.SetValue(); + YLOG_INFO("session closed"); + } + + TGrpcCall::TGrpcCall(TClientSession& session) + : Session(session) + , AsyncJoinerToken(&Session.GetAsyncJoiner()) + , AcceptTag(MakeIOCallback(this, &TGrpcCall::EndAccept)) + , ReadTag(MakeIOCallback(this, &TGrpcCall::EndRead)) + , WriteTag(MakeIOCallback(this, &TGrpcCall::EndWrite)) + , WritesDoneTag(MakeIOCallback(this, &TGrpcCall::EndWritesDone)) + , FinishTag(MakeIOCallback(this, &TGrpcCall::EndFinish)) + , Logger(session.GetLogger().Child("grpc")) + , AcceptPending(false) + , Initialized_(false) + , ReadPending(false) + , ReadsDone(false) + , WritePending(false) + , WritesBlocked(false) + , WritesDonePending(false) + , WritesDone(false) + , ErrorOccured(false) + , FinishRequested(false) + , FinishStarted(false) + , FinishDone(false) + , Cancelled(false) + , Poisoned(false) + , PoisonPillSent(false) + , ReuseSessions_(false) + , FinishStatus() + , ClientContext() + , ReaderWriter(nullptr) + , Request() + , Response() + { + } + + void TGrpcCall::Start() { + AcceptPending = true; + auto& client = Session.GetClient(); + ReaderWriter = client.GetStub().AsyncSession(&ClientContext, + &client.GetCompletionQueue(), + AcceptTag->Ref()); + YLOG_INFO("AsyncSession started"); + } + + TGrpcCall::~TGrpcCall() { + YLOG_INFO("destroyed"); + } + + void TGrpcCall::EnsureFinishStarted() { + if (!FinishStarted) { + FinishStarted = true; + ReaderWriter->Finish(&FinishStatus, FinishTag->Ref()); + YLOG_INFO("Finish started"); + } + } + + bool TGrpcCall::CheckHasError(EIOStatus status, const char* method) { + if (status == EIOStatus::Error) { + SetError(Sprintf("%s %s", method, ToString(status).c_str())); + return true; + } + if (ErrorOccured) { + ScheduleFinishOnError(); + return true; + } + return false; + } + + void TGrpcCall::SetError(const TString& error) { + if (!Cancelled) { + YLOG_ERR(error); + ++Session.GetCounters().ErrorsCount; + } + ErrorOccured = true; + ScheduleFinishOnError(); + } + + void TGrpcCall::ScheduleFinishOnError() { + if (!AcceptPending && !WritePending && !WritesDonePending) { + EnsureFinishStarted(); + } + } + + void TGrpcCall::BeginClose(bool force) { + if (force) { + if (!Cancelled) { + Cancelled = true; + ClientContext.TryCancel(); + SetError("forced close"); + } + return; + } + YLOG_INFO(Sprintf("Close Initialized [%d], AcceptPending [%d], " + "WritePending [%d], FinishRequested [%d], " + "ErrorOccured [%d]", + static_cast<int>(Initialized_), + static_cast<int>(AcceptPending), + static_cast<int>(WritePending), + static_cast<int>(FinishRequested), + static_cast<int>(ErrorOccured))); + if (ErrorOccured || FinishRequested) { + return; + } + FinishRequested = true; + if (!Initialized_ || WritePending) { + return; + } + WritesBlocked = true; + BeginWritesDone(); + } + + void TGrpcCall::Poison() { + Poisoned = true; + NotifyMessageAdded(); + } + + void TGrpcCall::NotifyMessageAdded() { + if (WritePending || !Initialized_ || ErrorOccured || FinishRequested) { + return; + } + ScheduleWrite(); + } + + void TGrpcCall::ScheduleWrite() { + Request.Clear(); + if (!Poisoned) { + Session.PrepareWriteBatchRequest(Request); + } else if (!PoisonPillSent) { + PoisonPillSent = true; + auto& batch = *Request.mutable_data_batch(); + batch.AddSeqNo(std::numeric_limits<::google::protobuf::uint64>::max()); + batch.AddTimestamp(Now().MicroSeconds()); + batch.AddPayload(""); + YLOG_INFO("poison pill sent"); + } + if (Request.GetDataBatch().GetSeqNo().empty()) { + if (FinishRequested) { + WritesBlocked = true; + BeginWritesDone(); + } + return; + } + + BeginWrite(); + } + + void TGrpcCall::EndAccept(EIOStatus status) { + Y_VERIFY(AcceptPending); + AcceptPending = false; + if (CheckHasError(status, "EndAccept")) { + return; + } + BeginRead(); + Request.Clear(); + Session.PrepareInitializeRequest(Request); + BeginWrite(); + } + + void TGrpcCall::EndRead(EIOStatus status) { + ReadPending = false; + if (FinishDone) { + Session.OnGrpcCallFinished(); + return; + } + if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) { + Y_VERIFY(!WritePending); + YLOG_INFO("EndRead ReadsDone"); + ReadsDone = true; + if (WritesDone) { + EnsureFinishStarted(); + return; + } + return; + } + if (CheckHasError(status, "EndRead")) { + return; + } + if (!Initialized_) { + const auto metadata = ClientContext.GetServerInitialMetadata(); + { + const auto it = metadata.find("ua-reuse-sessions"); + if (it != metadata.end() && it->second == "true") { + ReuseSessions_ = true; + } + } + { + const auto it = metadata.find("ua-max-receive-message-size"); + if (it != metadata.end()) { + Session.SetAgentMaxReceiveMessage(FromString<size_t>(TString{it->second.begin(), it->second.end()})); + } + } + + if (Response.response_case() != NUnifiedAgentProto::Response::kInitialized) { + SetError(Sprintf("EndRead while initializing, unexpected response_case [%d]", + static_cast<int>(Response.response_case()))); + return; + } + Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(), + Response.GetInitialized().GetLastSeqNo()); + Initialized_ = true; + if (!WritePending) { + ScheduleWrite(); + } + } else { + if (Response.response_case() != NUnifiedAgentProto::Response::kAck) { + SetError(Sprintf("EndRead unexpected response_case [%d]", + static_cast<int>(Response.response_case()))); + return; + } + Session.Acknowledge(Response.GetAck().GetSeqNo()); + } + BeginRead(); + } + + void TGrpcCall::EndWrite(EIOStatus status) { + WritePending = false; + if (CheckHasError(status, "EndWrite")) { + return; + } + if (!Initialized_) { + return; + } + ScheduleWrite(); + } + + void TGrpcCall::EndFinish(EIOStatus status) { + FinishDone = true; + const auto finishStatus = status == EIOStatus::Error + ? grpc::Status(grpc::UNKNOWN, "finish error") + : FinishStatus; + YLOG(finishStatus.ok() || Cancelled || Poisoned ? TLOG_INFO : TLOG_ERR, + Sprintf("EndFinish, code [%s], message [%s]", + ToString(finishStatus.error_code()).c_str(), + finishStatus.error_message().c_str()), + Logger); + if (!finishStatus.ok() && !Cancelled) { + ++Session.GetCounters().ErrorsCount; + } + if (!ReadPending) { + Session.OnGrpcCallFinished(); + } + } + + void TGrpcCall::EndWritesDone(EIOStatus status) { + YLOG_INFO(Sprintf("EndWritesDone [%s]", ToString(status).c_str())); + Y_VERIFY(!WritePending && !WritesDone && WritesDonePending); + WritesDonePending = false; + WritesDone = true; + if (CheckHasError(status, "EndWriteDone")) { + return; + } + if (ReadsDone) { + EnsureFinishStarted(); + } + } + + void TGrpcCall::BeginWritesDone() { + WritesDonePending = true; + ReaderWriter->WritesDone(WritesDoneTag->Ref()); + YLOG_INFO("WritesDone started"); + } + + void TGrpcCall::BeginRead() { + ReadPending = true; + Response.Clear(); + ReaderWriter->Read(&Response, ReadTag->Ref()); + YLOG_DEBUG("Read started"); + } + + void TGrpcCall::BeginWrite() { + WritePending = true; + ReaderWriter->Write(Request, WriteTag->Ref()); + YLOG_DEBUG("Write started"); + } +} + +namespace NUnifiedAgent { + size_t SizeOf(const TClientMessage& message) { + auto result = message.Payload.Size() + sizeof(TInstant); + if (message.Meta.Defined()) { + for (const auto& m: *message.Meta) { + result += m.first.Size() + m.second.Size(); + } + } + return result; + } + + TClientParameters::TClientParameters(const TString& uri) + : Uri(uri) + , SharedSecretKey(Nothing()) + , MaxInflightBytes(DefaultMaxInflightBytes) + , Log(TLoggerOperator<TGlobalLog>::Log()) + , LogRateLimitBytes(Nothing()) + , GrpcReconnectDelay(TDuration::MilliSeconds(50)) + , GrpcSendDelay(DefaultGrpcSendDelay) + , EnableForkSupport(false) + , GrpcMaxMessageSize(DefaultGrpcMaxMessageSize) + , Counters(nullptr) + { + } + + TSessionParameters::TSessionParameters() + : SessionId(Nothing()) + , Meta(Nothing()) + , Counters(nullptr) + , MaxInflightBytes() + { + } + + const size_t TClientParameters::DefaultMaxInflightBytes = 10_MB; + const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB; + const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10); + + TClientPtr MakeClient(const TClientParameters& parameters) { + if (!grpc_is_initialized()) { + EnsureGrpcConfigured(); + } + + std::shared_ptr<NPrivate::TForkProtector> forkProtector{}; +#ifdef _unix_ + if (parameters.EnableForkSupport) { + forkProtector = NPrivate::TForkProtector::Get(true); + } +#endif + return MakeIntrusive<NPrivate::TClient>(parameters, forkProtector); + } +} |