diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 10:54:08 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-03-31 12:28:07 +0300 |
commit | fc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch) | |
tree | c15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library | |
parent | 8a749596d40e91c896a1907afcd108d9221fbde1 (diff) | |
download | ydb-e9cbe5c5cf67db853d223fd365c9f05b695f7b96.tar.gz |
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library')
74 files changed, 4459 insertions, 304 deletions
diff --git a/library/cpp/CMakeLists.darwin.txt b/library/cpp/CMakeLists.darwin.txt index 74f3a1430a..50514bafce 100644 --- a/library/cpp/CMakeLists.darwin.txt +++ b/library/cpp/CMakeLists.darwin.txt @@ -82,6 +82,7 @@ add_subdirectory(time_provider) add_subdirectory(timezone_conversion) add_subdirectory(tld) add_subdirectory(unicode) +add_subdirectory(unified_agent_client) add_subdirectory(uri) add_subdirectory(xml) add_subdirectory(yaml) diff --git a/library/cpp/CMakeLists.linux-aarch64.txt b/library/cpp/CMakeLists.linux-aarch64.txt index 7ccca0159a..2bc7249205 100644 --- a/library/cpp/CMakeLists.linux-aarch64.txt +++ b/library/cpp/CMakeLists.linux-aarch64.txt @@ -81,6 +81,7 @@ add_subdirectory(time_provider) add_subdirectory(timezone_conversion) add_subdirectory(tld) add_subdirectory(unicode) +add_subdirectory(unified_agent_client) add_subdirectory(uri) add_subdirectory(xml) add_subdirectory(yaml) diff --git a/library/cpp/CMakeLists.linux.txt b/library/cpp/CMakeLists.linux.txt index 74f3a1430a..50514bafce 100644 --- a/library/cpp/CMakeLists.linux.txt +++ b/library/cpp/CMakeLists.linux.txt @@ -82,6 +82,7 @@ add_subdirectory(time_provider) add_subdirectory(timezone_conversion) add_subdirectory(tld) add_subdirectory(unicode) +add_subdirectory(unified_agent_client) add_subdirectory(uri) add_subdirectory(xml) add_subdirectory(yaml) diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h index 8051f5ee57..cd2cfda1bb 100644 --- a/library/cpp/actors/core/actorsystem.h +++ b/library/cpp/actors/core/actorsystem.h @@ -122,7 +122,21 @@ namespace NActors { } ui32 GetThreads(ui32 poolId) const { - return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId); + auto result = GetThreadsOptional(poolId); + Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId); + return *result; + } + + std::optional<ui32> GetThreadsOptional(const ui32 poolId) const { + if (Y_LIKELY(Executors)) { + if (Y_LIKELY(poolId < ExecutorsCount)) { + return Executors[poolId]->GetDefaultThreadCount(); + } else { + return {}; + } + } else { + return CpuManager.GetThreadsOptional(poolId); + } } }; diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h index 0bf4b871d7..650b1f39f5 100644 --- a/library/cpp/actors/core/config.h +++ b/library/cpp/actors/core/config.h @@ -128,10 +128,10 @@ namespace NActors { Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId); } - ui32 GetThreads(ui32 poolId) const { + std::optional<ui32> GetThreadsOptional(ui32 poolId) const { for (const auto& p : Basic) { if (p.PoolId == poolId) { - return p.Threads; + return p.DefaultThreadCount; } } for (const auto& p : IO) { @@ -144,7 +144,13 @@ namespace NActors { return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount; } } - Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId); + return {}; + } + + ui32 GetThreads(ui32 poolId) const { + auto result = GetThreadsOptional(poolId); + Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId); + return *result; } }; diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h index f39415c7e2..c7c85e61fd 100644 --- a/library/cpp/actors/core/executor_pool.h +++ b/library/cpp/actors/core/executor_pool.h @@ -10,6 +10,11 @@ namespace NActors { struct TWorkerContext; class ISchedulerCookie; + struct TCpuConsumption { + double ConsumedUs = 0; + double BookedUs = 0; + }; + class IExecutorPool : TNonCopyable { public: const ui32 PoolId; @@ -131,14 +136,9 @@ namespace NActors { return false; } - virtual double GetThreadConsumedUs(i16 threadIdx) { - Y_UNUSED(threadIdx); - return 0.0; - } - - virtual double GetThreadBookedUs(i16 threadIdx) { + virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) { Y_UNUSED(threadIdx); - return 0.0; + return TCpuConsumption{0.0, 0.0}; } }; diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp index 0c984f8fb0..de04105991 100644 --- a/library/cpp/actors/core/executor_pool_basic.cpp +++ b/library/cpp/actors/core/executor_pool_basic.cpp @@ -334,6 +334,8 @@ namespace NActors { poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000); poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount); poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount); + poolStats.DefaultThreadCount = DefaultThreadCount; + poolStats.MaxThreadCount = MaxThreadCount; if (Harmonizer) { TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId); poolStats.IsNeedy = stats.IsNeedy; @@ -342,6 +344,7 @@ namespace NActors { poolStats.IncreasingThreadsByNeedyState = stats.IncreasingThreadsByNeedyState; poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState; poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState; + poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount; } statsCopy.resize(PoolThreads + 1); @@ -490,24 +493,14 @@ namespace NActors { return false; } - double TBasicExecutorPool::GetThreadConsumedUs(i16 threadIdx) { + TCpuConsumption TBasicExecutorPool::GetThreadCpuConsumption(i16 threadIdx) { if ((ui32)threadIdx >= PoolThreads) { - return 0; + return {0.0, 0.0}; } TThreadCtx& threadCtx = Threads[threadIdx]; TExecutorThreadStats stats; threadCtx.Thread->GetCurrentStats(stats); - return Ts2Us(stats.ElapsedTicks); - } - - double TBasicExecutorPool::GetThreadBookedUs(i16 threadIdx) { - if ((ui32)threadIdx >= PoolThreads) { - return 0; - } - TThreadCtx& threadCtx = Threads[threadIdx]; - TExecutorThreadStats stats; - threadCtx.Thread->GetCurrentStats(stats); - return stats.CpuNs / 1000.0; + return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs)}; } i16 TBasicExecutorPool::GetBlockingThreadCount() const { diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h index cd94a998f1..813f91dc9a 100644 --- a/library/cpp/actors/core/executor_pool_basic.h +++ b/library/cpp/actors/core/executor_pool_basic.h @@ -153,8 +153,7 @@ namespace NActors { i16 GetMinThreadCount() const override; i16 GetMaxThreadCount() const override; bool IsThreadBeingStopped(i16 threadIdx) const override; - double GetThreadConsumedUs(i16 threadIdx) override; - double GetThreadBookedUs(i16 threadIdx) override; + TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override; i16 GetBlockingThreadCount() const override; i16 GetPriority() const override; diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp index 6361bc6662..f96f65931a 100644 --- a/library/cpp/actors/core/executor_pool_basic_ut.cpp +++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp @@ -339,7 +339,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) { UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0); - //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined + //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined UNIT_ASSERT(stats[0].ElapsedTicks > 0); UNIT_ASSERT(stats[0].ParkedTicks > 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0); diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp index 133e9c5f2a..a7c7399d73 100644 --- a/library/cpp/actors/core/executor_pool_united_ut.cpp +++ b/library/cpp/actors/core/executor_pool_united_ut.cpp @@ -171,7 +171,7 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) { //UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); // depends on execution time and system load, so may be non-zero UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0); UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0); - //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined + //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined UNIT_ASSERT(stats[0].ElapsedTicks > 0); //UNIT_ASSERT(stats[0].ParkedTicks == 0); // per-pool parked time does not make sense for united pools UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0); diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp index f318d8909c..e2fd0c5f24 100644 --- a/library/cpp/actors/core/harmonizer.cpp +++ b/library/cpp/actors/core/harmonizer.cpp @@ -121,6 +121,7 @@ struct TPoolInfo { TAtomic IncreasingThreadsByNeedyState = 0; TAtomic DecreasingThreadsByStarvedState = 0; TAtomic DecreasingThreadsByHoggishState = 0; + TAtomic PotentialMaxThreadCount = 0; bool IsBeingStopped(i16 threadIdx); double GetBooked(i16 threadIdx); @@ -169,9 +170,10 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) { void TPoolInfo::PullStats(ui64 ts) { for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) { TThreadInfo &threadInfo = ThreadInfo[threadIdx]; - threadInfo.Consumed.Register(ts, Pool->GetThreadConsumedUs(threadIdx)); + TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx); + threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History)); - threadInfo.Booked.Register(ts, Pool->GetThreadBookedUs(threadIdx)); + threadInfo.Booked.Register(ts, cpuConsumption.BookedUs); LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History)); } } @@ -236,7 +238,7 @@ void THarmonizer::PullStats(ui64 ts) { } Y_FORCE_INLINE bool IsStarved(double consumed, double booked) { - return Max(consumed, booked) > 0.1 && consumed < booked * 0.7; + return consumed < booked * 0.7; } Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) { @@ -293,35 +295,43 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish); } double budget = total - Max(booked, lastSecondBooked); + i16 budgetInt = static_cast<i16>(Max(budget, 0.0)); if (budget < -0.1) { isStarvedPresent = true; } + for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) { + TPoolInfo& pool = Pools[poolIdx]; + AtomicSet(pool.PotentialMaxThreadCount, Min(pool.MaxThreadCount, budgetInt)); + } double overbooked = consumed - booked; if (isStarvedPresent) { - // last_starved_at_consumed_value = сумма по всем пулам consumed; - // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, - // использовать вместо total - if (beingStopped && beingStopped >= overbooked) { - // do nothing - } else { - TStackVec<size_t> reorder; - for (size_t i = 0; i < Pools.size(); ++i) { - reorder.push_back(i); - } - for (ui16 poolIdx : PriorityOrder) { - TPoolInfo &pool = Pools[poolIdx]; - i64 threadCount = pool.GetThreadCount(); - if (threadCount > pool.DefaultThreadCount) { - pool.SetThreadCount(threadCount - 1); - AtomicIncrement(pool.DecreasingThreadsByStarvedState); - overbooked--; - LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); - if (overbooked < 1) { - break; - } - } - } - } + // last_starved_at_consumed_value = сумма по всем пулам consumed; + // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total, + // использовать вместо total + if (beingStopped && beingStopped >= overbooked) { + // do nothing + } else { + TStackVec<size_t> reorder; + for (size_t i = 0; i < Pools.size(); ++i) { + reorder.push_back(i); + } + for (ui16 poolIdx : PriorityOrder) { + TPoolInfo &pool = Pools[poolIdx]; + i64 threadCount = pool.GetThreadCount(); + while (threadCount > pool.DefaultThreadCount) { + pool.SetThreadCount(threadCount - 1); + AtomicIncrement(pool.DecreasingThreadsByStarvedState); + overbooked--; + LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount); + if (overbooked < 1) { + break; + } + } + if (overbooked < 1) { + break; + } + } + } } else { for (size_t needyPoolIdx : needyPools) { TPoolInfo &pool = Pools[needyPoolIdx]; @@ -422,6 +432,7 @@ TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const { .IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)), .DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)), .DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)), + .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)), .IsNeedy = static_cast<bool>(flags & 1), .IsStarved = static_cast<bool>(flags & 2), .IsHoggish = static_cast<bool>(flags & 4), diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h index 61f13e43ac..bc6b938fe8 100644 --- a/library/cpp/actors/core/harmonizer.h +++ b/library/cpp/actors/core/harmonizer.h @@ -10,6 +10,7 @@ namespace NActors { ui64 IncreasingThreadsByNeedyState = 0; ui64 DecreasingThreadsByStarvedState = 0; ui64 DecreasingThreadsByHoggishState = 0; + i16 PotentialMaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h index 38629e2aa1..4c664a964a 100644 --- a/library/cpp/actors/core/mon_stats.h +++ b/library/cpp/actors/core/mon_stats.h @@ -65,6 +65,9 @@ namespace NActors { ui64 DecreasingThreadsByHoggishState = 0; i16 WrongWakenedThreadCount = 0; i16 CurrentThreadCount = 0; + i16 PotentialMaxThreadCount = 0; + i16 DefaultThreadCount = 0; + i16 MaxThreadCount = 0; bool IsNeedy = false; bool IsStarved = false; bool IsHoggish = false; @@ -76,7 +79,8 @@ namespace NActors { ui64 PreemptedEvents = 0; // Number of events experienced hard preemption ui64 NonDeliveredEvents = 0; ui64 EmptyMailboxActivation = 0; - ui64 CpuNs = 0; // nanoseconds thread was executing on CPU (accounts for preemtion) + ui64 CpuUs = 0; // microseconds thread was executing on CPU (accounts for preemtion) + ui64 SafeElapsedTicks = 0; ui64 WorstActivationTimeUs = 0; NHPTimer::STime ElapsedTicks = 0; NHPTimer::STime ParkedTicks = 0; @@ -120,7 +124,8 @@ namespace NActors { PreemptedEvents += RelaxedLoad(&other.PreemptedEvents); NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents); EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation); - CpuNs += RelaxedLoad(&other.CpuNs); + CpuUs += RelaxedLoad(&other.CpuUs); + SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks); RelaxedStore( &WorstActivationTimeUs, std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs))); diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h index 2179771fb6..c3a2947df1 100644 --- a/library/cpp/actors/core/worker_context.h +++ b/library/cpp/actors/core/worker_context.h @@ -137,7 +137,8 @@ namespace NActors { } void UpdateThreadTime() { - RelaxedStore(&WorkerStats.CpuNs, ThreadCPUTime() * 1000); + RelaxedStore(&WorkerStats.SafeElapsedTicks, (ui64)RelaxedLoad(&WorkerStats.ElapsedTicks)); + RelaxedStore(&WorkerStats.CpuUs, ThreadCPUTime()); } #else void GetCurrentStats(TExecutorThreadStats&) const {} diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h index b1217b1d63..d80951827d 100644 --- a/library/cpp/actors/helpers/pool_stats_collector.h +++ b/library/cpp/actors/helpers/pool_stats_collector.h @@ -126,6 +126,9 @@ private: NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount; NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount; NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCount; + NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCount; NMonitoring::TDynamicCounters::TCounterPtr IsNeedy; NMonitoring::TDynamicCounters::TCounterPtr IsStarved; NMonitoring::TDynamicCounters::TCounterPtr IsHoggish; @@ -178,6 +181,9 @@ private: MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true); WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true); CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false); + PotentialMaxThreadCount = PoolGroup->GetCounter("PotentialMaxThreadCount", false); + DefaultThreadCount = PoolGroup->GetCounter("DefaultThreadCount", false); + MaxThreadCount = PoolGroup->GetCounter("MaxThreadCount", false); IsNeedy = PoolGroup->GetCounter("IsNeedy", false); IsStarved = PoolGroup->GetCounter("IsStarved", false); IsHoggish = PoolGroup->GetCounter("IsHoggish", false); @@ -211,7 +217,7 @@ private: *NonDeliveredEvents = stats.NonDeliveredEvents; *DestroyedActors = stats.PoolDestroyedActors; *EmptyMailboxActivation = stats.EmptyMailboxActivation; - *CpuMicrosec = stats.CpuNs / 1000; + *CpuMicrosec = stats.CpuUs; *ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000; *ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000; *ActorRegistrations = stats.PoolActorRegistrations; @@ -222,6 +228,9 @@ private: *MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount; *WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount; *CurrentThreadCount = poolStats.CurrentThreadCount; + *PotentialMaxThreadCount = poolStats.PotentialMaxThreadCount; + *DefaultThreadCount = poolStats.DefaultThreadCount; + *MaxThreadCount = poolStats.MaxThreadCount; *IsNeedy = poolStats.IsNeedy; *IsStarved = poolStats.IsStarved; *IsHoggish = poolStats.IsHoggish; diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 43f376038b..b1b8ae0c75 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -52,9 +52,6 @@ namespace NActors { EvProcessPingRequest, EvGetSecureSocket, EvSecureSocket, - HandshakeBrokerTake, - HandshakeBrokerFree, - HandshakeBrokerPermit, //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // nonlocal messages; their indices must be preserved in order to work properly while doing rolling update @@ -101,18 +98,6 @@ namespace NActors { } }; - struct TEvHandshakeBrokerTake: public TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake") - }; - - struct TEvHandshakeBrokerFree: public TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree") - }; - - struct TEvHandshakeBrokerPermit: public TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> { - DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit") - }; - struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> { DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk") TEvHandshakeAsk(const TActorId& self, diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h deleted file mode 100644 index 70a7cb91dc..0000000000 --- a/library/cpp/actors/interconnect/handshake_broker.h +++ /dev/null @@ -1,78 +0,0 @@ -#pragma once - -#include <library/cpp/actors/core/actor.h> - -#include <deque> - -namespace NActors { - static constexpr ui32 DEFAULT_INFLIGHT = 100; - - class THandshakeBroker : public TActor<THandshakeBroker> { - private: - std::deque<TActorId> Waiting; - ui32 Capacity; - - void Handle(TEvHandshakeBrokerTake::TPtr &ev) { - if (Capacity > 0) { - Capacity -= 1; - Send(ev->Sender, new TEvHandshakeBrokerPermit()); - } else { - Waiting.push_back(ev->Sender); - } - } - - void Handle(TEvHandshakeBrokerFree::TPtr& ev) { - Y_UNUSED(ev); - if (Capacity == 0 && !Waiting.empty()) { - Send(Waiting.front(), new TEvHandshakeBrokerPermit()); - Waiting.pop_front(); - } else { - Capacity += 1; - } - } - - void PassAway() override { - while (!Waiting.empty()) { - Send(Waiting.front(), new TEvHandshakeBrokerPermit()); - Waiting.pop_front(); - } - TActor::PassAway(); - } - - public: - THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT) - : TActor(&TThis::StateFunc) - , Capacity(inflightLimit) - { - } - - static constexpr char ActorName[] = "HANDSHAKE_BROKER_ACTOR"; - - STFUNC(StateFunc) { - Y_UNUSED(ctx); - switch(ev->GetTypeRewrite()) { - hFunc(TEvHandshakeBrokerTake, Handle); - hFunc(TEvHandshakeBrokerFree, Handle); - cFunc(TEvents::TSystem::Poison, PassAway); - } - } - - void Bootstrap() { - Become(&TThis::StateFunc); - }; - }; - - inline IActor* CreateHandshakeBroker() { - return new THandshakeBroker(); - } - - inline TActorId MakeHandshakeBrokerOutId() { - char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'}; - return TActorId(0, TStringBuf(std::begin(x), std::end(x))); - } - - inline TActorId MakeHandshakeBrokerInId() { - char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'}; - return TActorId(0, TStringBuf(std::begin(x), std::end(x))); - } -}; diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index a9c6b1dd11..dc651f3762 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -1,5 +1,4 @@ #include "interconnect_handshake.h" -#include "handshake_broker.h" #include "interconnect_tcp_proxy.h" #include <library/cpp/actors/core/actor_coroutine.h> @@ -97,13 +96,8 @@ namespace NActors { THashMap<ui32, TInstant> LastLogNotice; const TDuration MuteDuration = TDuration::Seconds(15); TInstant Deadline; - TActorId HandshakeBroker; public: - static constexpr IActor::EActivityType ActorActivityType() { - return IActor::INTERCONNECT_HANDSHAKE; - } - THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) : TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors @@ -119,7 +113,6 @@ namespace NActors { Y_VERIFY(SelfVirtualId); Y_VERIFY(SelfVirtualId.NodeId()); Y_VERIFY(PeerNodeId); - HandshakeBroker = MakeHandshakeBrokerOutId(); } THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) @@ -135,7 +128,6 @@ namespace NActors { } else { PeerAddr.clear(); } - HandshakeBroker = MakeHandshakeBrokerInId(); } void UpdatePrefix() { @@ -145,64 +137,45 @@ namespace NActors { void Run() override { UpdatePrefix(); - bool isBrokerActive = false; - - if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) { - isBrokerActive = true; - WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit"); + // set up overall handshake process timer + TDuration timeout = Common->Settings.Handshake; + if (timeout == TDuration::Zero()) { + timeout = DEFAULT_HANDSHAKE_TIMEOUT; } + timeout += ResolveTimeout * 2; + Deadline = Now() + timeout; + Schedule(Deadline, new TEvents::TEvWakeup); try { - // set up overall handshake process timer - TDuration timeout = Common->Settings.Handshake; - if (timeout == TDuration::Zero()) { - timeout = DEFAULT_HANDSHAKE_TIMEOUT; - } - timeout += ResolveTimeout * 2; - Deadline = Now() + timeout; - Schedule(Deadline, new TEvents::TEvWakeup); - - try { - if (Socket) { - PerformIncomingHandshake(); - } else { - PerformOutgoingHandshake(); - } - - // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings - if (ProgramInfo) { - if (Params.Encryption) { - EstablishSecureConnection(); - } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { - Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); - } - } - } catch (const TExHandshakeFailed&) { - ProgramInfo.Clear(); + if (Socket) { + PerformIncomingHandshake(); + } else { + PerformOutgoingHandshake(); } + // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings if (ProgramInfo) { - LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); - Y_VERIFY(NextPacketFromPeer); - if (PollerToken) { - Y_VERIFY(PollerToken->RefCount() == 1); - PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + if (Params.Encryption) { + EstablishSecureConnection(); + } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) { + Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required"); } - SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, - *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); - } - } catch (const TDtorException&) { - throw; // we can't use actor system when handling this exception - } catch (...) { - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); } - throw; + } catch (const TExHandshakeFailed&) { + ProgramInfo.Clear(); } - if (isBrokerActive) { - Send(HandshakeBroker, new TEvHandshakeBrokerFree()); + if (ProgramInfo) { + LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded"); + Y_VERIFY(NextPacketFromPeer); + if (PollerToken) { + Y_VERIFY(PollerToken->RefCount() == 1); + PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor + } + SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId, + *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params))); } + Socket.Reset(); } @@ -1022,11 +995,12 @@ namespace NActors { const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params) { return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), self, peer, nodeId, nextPacket, - std::move(peerHostName), std::move(params))); + std::move(peerHostName), std::move(params)), IActor::INTERCONNECT_HANDSHAKE); } IActor* CreateIncomingHandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) { - return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket))); + return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket)), + IActor::INTERCONNECT_HANDSHAKE); } } diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index a8c505d94d..fdf035499f 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -39,7 +39,7 @@ namespace NActors { SetPrefix(Sprintf("InputSession %s [node %" PRIu32 "]", SelfId().ToString().data(), NodeId)); Become(&TThis::WorkingState, DeadPeerTimeout, new TEvCheckDeadPeer); LOG_DEBUG_IC_SESSION("ICIS01", "InputSession created"); - LastReceiveTimestamp = TActivationContext::Now(); + LastReceiveTimestamp = TActivationContext::Monotonic(); ReceiveData(); } @@ -437,7 +437,7 @@ namespace NActors { } } - LastReceiveTimestamp = TActivationContext::Now(); + LastReceiveTimestamp = TActivationContext::Monotonic(); return true; } @@ -473,7 +473,7 @@ namespace NActors { } void TInputSessionTCP::HandleCheckDeadPeer() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); if (now >= LastReceiveTimestamp + DeadPeerTimeout) { ReceiveData(); if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) { @@ -481,7 +481,7 @@ namespace NActors { DestroySession(TDisconnectReason::DeadPeer()); } } - Schedule(LastReceiveTimestamp + DeadPeerTimeout - now, new TEvCheckDeadPeer); + Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer); } void TInputSessionTCP::HandlePingResponse(TDuration passed) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp index 7e2d8ccb94..b4cc263a4c 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp @@ -40,7 +40,6 @@ namespace NActors { SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId)); SwitchToInitialState(); - PassAwayTimestamp = TActivationContext::Now() + TDuration::Seconds(15); LOG_INFO_IC("ICP01", "ready to work"); } @@ -563,7 +562,7 @@ namespace NActors { ValidateEvent(ev, "EnqueueSessionEvent"); const ui32 size = ev->GetSize(); PendingSessionEventsSize += size; - PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev); + PendingSessionEvents.emplace_back(TActivationContext::Monotonic() + Common->Settings.MessagePendingTimeout, size, ev); ScheduleCleanupEventQueue(); CleanupEventQueue(); } @@ -810,7 +809,7 @@ namespace NActors { if (!CleanupEventQueueScheduled && PendingSessionEvents) { // apply batching at 50 ms granularity - Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue); + Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Monotonic()), new TEvCleanupEventQueue); CleanupEventQueueScheduled = true; } } @@ -827,7 +826,7 @@ namespace NActors { void TInterconnectProxyTCP::CleanupEventQueue() { ICPROXY_PROFILED; - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); while (PendingSessionEvents) { TPendingSessionEvent& ev = PendingSessionEvents.front(); if (now >= ev.Deadline || PendingSessionEventsSize > Common->Settings.MessagePendingSize) { diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h index 023e5bd1ee..b750e278e1 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h @@ -175,11 +175,18 @@ namespace NActors { Become(std::forward<TArgs>(args)...); Y_VERIFY(!Terminated || CurrentStateFunc() == &TThis::HoldByError); // ensure we never escape this state if (CurrentStateFunc() != &TThis::PendingActivation) { - PassAwayTimestamp = TInstant::Max(); + PassAwayTimestamp = TMonotonic::Max(); + } else if (DynamicPtr) { + PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15); + if (!PassAwayScheduled) { + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), + {}, nullptr, 0)); + PassAwayScheduled = true; + } } } - TInstant PassAwayTimestamp; + TMonotonic PassAwayTimestamp; bool PassAwayScheduled = false; void SwitchToInitialState() { @@ -189,17 +196,18 @@ namespace NActors { " PendingIncomingHandshakeEvents# %zu State# %s", LogPrefix.data(), PendingSessionEvents.size(), PendingIncomingHandshakeEvents.size(), State); SwitchToState(__LINE__, "PendingActivation", &TThis::PendingActivation); - if (DynamicPtr && !PassAwayScheduled && PassAwayTimestamp != TInstant::Max()) { - TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), - {}, nullptr, 0)); - PassAwayScheduled = true; - } } void HandlePassAwayIfNeeded() { Y_VERIFY(PassAwayScheduled); - if (PassAwayTimestamp != TInstant::Max()) { + const TMonotonic now = TActivationContext::Monotonic(); + if (now >= PassAwayTimestamp) { PassAway(); + } else if (PassAwayTimestamp != TMonotonic::Max()) { + TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(), + {}, nullptr, 0)); + } else { + PassAwayScheduled = false; } } @@ -387,11 +395,11 @@ namespace NActors { // hold all events before connection is established struct TPendingSessionEvent { - TInstant Deadline; + TMonotonic Deadline; ui32 Size; THolder<IEventHandle> Event; - TPendingSessionEvent(TInstant deadline, ui32 size, TAutoPtr<IEventHandle> event) + TPendingSessionEvent(TMonotonic deadline, ui32 size, TAutoPtr<IEventHandle> event) : Deadline(deadline) , Size(size) , Event(event) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index feb55a16ad..18df8e42ff 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -232,7 +232,7 @@ namespace NActors { CloseOnIdleWatchdog.Arm(SelfId()); // reset activity timestamps - LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now(); + LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic(); LOG_INFO_IC_SESSION("ICS10", "traffic start"); @@ -315,7 +315,7 @@ namespace NActors { bool needConfirm = false; // update activity timer for dead peer checker - LastInputActivityTimestamp = TActivationContext::Now(); + LastInputActivityTimestamp = TActivationContext::Monotonic(); if (msg.NumDataBytes) { UnconfirmedBytes += msg.NumDataBytes; @@ -326,7 +326,7 @@ namespace NActors { } // reset payload watchdog that controls close-on-idle behaviour - LastPayloadActivityTimestamp = TActivationContext::Now(); + LastPayloadActivityTimestamp = TActivationContext::Monotonic(); CloseOnIdleWatchdog.Reset(); } @@ -654,7 +654,7 @@ namespace NActors { void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) { if (period != TDuration::Max()) { - const TInstant when = TActivationContext::Now() + period; + const TMonotonic when = TActivationContext::Monotonic() + period; if (when < ForcePacketTimestamp) { ForcePacketTimestamp = when; ScheduleFlush(); @@ -664,7 +664,7 @@ namespace NActors { void TInterconnectSessionTCP::ScheduleFlush() { if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) { - Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush); + Schedule(ForcePacketTimestamp, new TEvFlush); FlushSchedule.push(ForcePacketTimestamp); MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size()); ++FlushEventsScheduled; @@ -672,7 +672,7 @@ namespace NActors { } void TInterconnectSessionTCP::HandleFlush() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); while (FlushSchedule && now >= FlushSchedule.top()) { FlushSchedule.pop(); } @@ -682,14 +682,14 @@ namespace NActors { ++ConfirmPacketsForcedByTimeout; ++FlushEventsProcessed; MakePacket(false); // just generate confirmation packet if we have preconditions for this - } else if (ForcePacketTimestamp != TInstant::Max()) { + } else if (ForcePacketTimestamp != TMonotonic::Max()) { ScheduleFlush(); } } } void TInterconnectSessionTCP::ResetFlushLogic() { - ForcePacketTimestamp = TInstant::Max(); + ForcePacketTimestamp = TMonotonic::Max(); UnconfirmedBytes = 0; const TDuration ping = Proxy->Common->Settings.PingPeriod; if (ping != TDuration::Zero() && !NumEventsInReadyChannels) { @@ -761,7 +761,7 @@ namespace NActors { } // update payload activity timer - LastPayloadActivityTimestamp = TActivationContext::Now(); + LastPayloadActivityTimestamp = TActivationContext::Monotonic(); } else if (pingMask) { serial = *pingMask; @@ -923,7 +923,7 @@ namespace NActors { flagState = EFlag::GREEN; do { - auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp; + auto lastInputDelay = TActivationContext::Monotonic() - LastInputActivityTimestamp; if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) { flagState = EFlag::ORANGE; break; @@ -1006,7 +1006,7 @@ namespace NActors { } void TInterconnectSessionTCP::IssuePingRequest() { - const TInstant now = TActivationContext::Now(); + const TMonotonic now = TActivationContext::Monotonic(); if (now >= LastPingTimestamp + PingPeriodicity) { LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request"); if (Socket) { @@ -1175,6 +1175,8 @@ namespace NActors { ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0; + const TMonotonic now = TActivationContext::Monotonic(); + MON_VAR(OutputStuckFlag) MON_VAR(SendQueue.size()) MON_VAR(SendQueueCache.size()) @@ -1184,8 +1186,8 @@ namespace NActors { MON_VAR(InflightDataAmount) MON_VAR(unsentQueueSize) MON_VAR(SendBufferSize) - MON_VAR(LastInputActivityTimestamp) - MON_VAR(LastPayloadActivityTimestamp) + MON_VAR(now - LastInputActivityTimestamp) + MON_VAR(now - LastPayloadActivityTimestamp) MON_VAR(LastHandshakeDone) MON_VAR(OutputCounter) MON_VAR(LastSentSerial) @@ -1204,7 +1206,7 @@ namespace NActors { clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data()); } - MON_VAR(LastPingTimestamp) + MON_VAR(now - LastPingTimestamp) MON_VAR(GetPingRTT()) MON_VAR(clockSkew) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 51c5bfa453..598a5c9220 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -266,7 +266,7 @@ namespace NActors { } const TDuration DeadPeerTimeout; - TInstant LastReceiveTimestamp; + TMonotonic LastReceiveTimestamp; void HandleCheckDeadPeer(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -413,15 +413,15 @@ namespace NActors { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // pinger - TInstant LastPingTimestamp; + TMonotonic LastPingTimestamp; static constexpr TDuration PingPeriodicity = TDuration::Seconds(1); void IssuePingRequest(); void Handle(TEvProcessPingRequest::TPtr ev); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - TInstant LastInputActivityTimestamp; - TInstant LastPayloadActivityTimestamp; + TMonotonic LastInputActivityTimestamp; + TMonotonic LastPayloadActivityTimestamp; TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog; TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog; @@ -481,8 +481,8 @@ namespace NActors { // time at which we want to send confirmation packet even if there was no outgoing data ui64 UnconfirmedBytes = 0; - TInstant ForcePacketTimestamp = TInstant::Max(); - TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule; + TMonotonic ForcePacketTimestamp = TMonotonic::Max(); + TPriorityQueue<TMonotonic, TVector<TMonotonic>, std::greater<TMonotonic>> FlushSchedule; size_t MaxFlushSchedule = 0; ui64 FlushEventsScheduled = 0; ui64 FlushEventsProcessed = 0; diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h index c190105a59..fe62006e3b 100644 --- a/library/cpp/actors/interconnect/watchdog_timer.h +++ b/library/cpp/actors/interconnect/watchdog_timer.h @@ -8,7 +8,7 @@ namespace NActors { const TDuration Timeout; const TCallback Callback; - TInstant LastResetTimestamp; + TMonotonic LastResetTimestamp; TEvent* ExpectedEvent = nullptr; ui32 Iteration = 0; @@ -29,7 +29,7 @@ namespace NActors { } void Reset() { - LastResetTimestamp = TActivationContext::Now(); + LastResetTimestamp = TActivationContext::Monotonic(); } void Disarm() { @@ -38,11 +38,11 @@ namespace NActors { void operator()(typename TEvent::TPtr& ev) { if (ev->Get() == ExpectedEvent) { - const TInstant now = TActivationContext::Now(); - const TInstant barrier = LastResetTimestamp + Timeout; + const TMonotonic now = TActivationContext::Monotonic(); + const TMonotonic barrier = LastResetTimestamp + Timeout; if (now < barrier) { // the time hasn't come yet - Schedule(barrier - now, TActorIdentity(ev->Recipient)); + Schedule(barrier, TActorIdentity(ev->Recipient)); } else if (Iteration < NumIterationsBeforeFiring) { // time has come, but we will still give actor a chance to process some messages and rearm timer ++Iteration; @@ -57,7 +57,8 @@ namespace NActors { } private: - void Schedule(TDuration timeout, const TActorIdentity& actor) { + template<typename T> + void Schedule(T&& timeout, const TActorIdentity& actor) { auto ev = MakeHolder<TEvent>(); ExpectedEvent = ev.Get(); Iteration = 0; diff --git a/library/cpp/actors/util/rc_buf.h b/library/cpp/actors/util/rc_buf.h index 1a492064ee..a2bce33fba 100644 --- a/library/cpp/actors/util/rc_buf.h +++ b/library/cpp/actors/util/rc_buf.h @@ -113,10 +113,11 @@ public: private: static int Compare(const TContiguousSpan& x, const TContiguousSpan& y) { - if (int res = std::memcmp(x.data(), y.data(), std::min(x.size(), y.size())); res) { - return res; + int res = 0; + if (const size_t common = std::min(x.size(), y.size())) { + res = std::memcmp(x.data(), y.data(), common); } - return x.size() - y.size(); + return res ? res : x.size() - y.size(); } }; diff --git a/library/cpp/charset/CMakeLists.darwin.txt b/library/cpp/charset/CMakeLists.darwin.txt index cb5c16891d..ed68dc36f8 100644 --- a/library/cpp/charset/CMakeLists.darwin.txt +++ b/library/cpp/charset/CMakeLists.darwin.txt @@ -6,13 +6,11 @@ # original buildsystem will not be accepted. -find_package(Iconv REQUIRED) add_library(library-cpp-charset) target_link_libraries(library-cpp-charset PUBLIC contrib-libs-cxxsupp yutil - Iconv::Iconv ) target_sources(library-cpp-charset PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/charset/generated/cp_data.cpp diff --git a/library/cpp/charset/CMakeLists.linux-aarch64.txt b/library/cpp/charset/CMakeLists.linux-aarch64.txt index 03b01e7c69..1d9903b843 100644 --- a/library/cpp/charset/CMakeLists.linux-aarch64.txt +++ b/library/cpp/charset/CMakeLists.linux-aarch64.txt @@ -6,14 +6,12 @@ # original buildsystem will not be accepted. -find_package(Iconv REQUIRED) add_library(library-cpp-charset) target_link_libraries(library-cpp-charset PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil - Iconv::Iconv ) target_sources(library-cpp-charset PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/charset/generated/cp_data.cpp diff --git a/library/cpp/charset/CMakeLists.linux.txt b/library/cpp/charset/CMakeLists.linux.txt index 03b01e7c69..1d9903b843 100644 --- a/library/cpp/charset/CMakeLists.linux.txt +++ b/library/cpp/charset/CMakeLists.linux.txt @@ -6,14 +6,12 @@ # original buildsystem will not be accepted. -find_package(Iconv REQUIRED) add_library(library-cpp-charset) target_link_libraries(library-cpp-charset PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp yutil - Iconv::Iconv ) target_sources(library-cpp-charset PRIVATE ${CMAKE_SOURCE_DIR}/library/cpp/charset/generated/cp_data.cpp diff --git a/library/cpp/charset/iconv.cpp b/library/cpp/charset/iconv.cpp index dc604c8492..0b6dbace9e 100644 --- a/library/cpp/charset/iconv.cpp +++ b/library/cpp/charset/iconv.cpp @@ -5,20 +5,22 @@ using namespace NICONVPrivate; TDescriptor::TDescriptor(const char* from, const char* to) - : Descriptor_(libiconv_open(to, from)) + : Descriptor_(iconv_open(to, from)) , From_(from) , To_(to) { +#if defined(USE_ICONV_EXTENSIONS) if (!Invalid()) { int temp = 1; libiconvctl(Descriptor_, ICONV_SET_DISCARD_ILSEQ, &temp); } +#endif } TDescriptor::~TDescriptor() { if (!Invalid()) { - libiconv_close(Descriptor_); + iconv_close(Descriptor_); } } @@ -31,7 +33,7 @@ size_t NICONVPrivate::RecodeImpl(const TDescriptor& descriptor, const char* in, char* outPtr = out; size_t inSizeMod = inSize; size_t outSizeMod = outSize; - size_t res = libiconv(descriptor.Get(), &inPtr, &inSizeMod, &outPtr, &outSizeMod); + size_t res = iconv(descriptor.Get(), &inPtr, &inSizeMod, &outPtr, &outSizeMod); read = inSize - inSizeMod; written = outSize - outSizeMod; diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index c4b7e9c040..4e869ef5f6 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -113,6 +113,10 @@ public: return FinishPromise_.GetFuture(); } + bool IsClientLost() const override { + return ClientLost_.load(); + } + TString GetPeer() const override { return TString(this->Context.peer()); } @@ -496,6 +500,7 @@ private: void OnFinish(EQueueEventStatus evStatus) { if (this->Context.IsCancelled()) { + ClientLost_.store(true); FinishPromise_.SetValue(EFinishStatus::CANCEL); } else { FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR); @@ -556,6 +561,7 @@ private: NThreading::TPromise<EFinishStatus> FinishPromise_; bool SkipUpdateCountersOnError = false; IStreamAdaptor::TPtr StreamAdaptor_; + std::atomic<bool> ClientLost_ = false; }; template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer> diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index 42b78ed7df..60b38805ed 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -116,6 +116,9 @@ public: //! Returns true if server is using ssl virtual bool SslServer() const = 0; + + //! Returns true if client was not interested in result (but we still must send response to make grpc happy) + virtual bool IsClientLost() const = 0; }; } // namespace NGrpc diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp index 97472206e2..0c05c7404e 100644 --- a/library/cpp/grpc/server/grpc_server.cpp +++ b/library/cpp/grpc/server/grpc_server.cpp @@ -130,11 +130,16 @@ void TGRpcServer::Start() { builder.SetOption(std::make_unique<TKeepAliveOption>()); } - if (Options_.UseCompletionQueuePerThread) { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - CQS_.push_back(builder.AddCompletionQueue()); - } - } else { + size_t completionQueueCount = 1; + if (Options_.WorkersPerCompletionQueue) { + size_t threadsPerQueue = Max(std::size_t{1}, Options_.WorkersPerCompletionQueue); + completionQueueCount = (Options_.WorkerThreads + threadsPerQueue - 1) / threadsPerQueue; // ceiling + } else if (Options_.UseCompletionQueuePerThread) { + completionQueueCount = Options_.WorkerThreads; + } + + CQS_.reserve(completionQueueCount); + for (size_t i = 0; i < completionQueueCount; ++i) { CQS_.push_back(builder.AddCompletionQueue()); } @@ -159,23 +164,15 @@ void TGRpcServer::Start() { size_t index = 0; for (IGRpcServicePtr service : Services_) { // TODO: provide something else for services instead of ServerCompletionQueue - service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger); + service->InitService(CQS_, Options_.Logger, index++); } - if (Options_.UseCompletionQueuePerThread) { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - auto* cq = &CQS_[i]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); - })); - } - } else { - for (size_t i = 0; i < Options_.WorkerThreads; ++i) { - auto* cq = &CQS_[0]; - Ts.push_back(SystemThreadFactory()->Run([cq] { - PullEvents(cq->get()); - })); - } + Ts.reserve(Options_.WorkerThreads); + for (size_t i = 0; i < Options_.WorkerThreads; ++i) { + auto* cq = &CQS_[i % CQS_.size()]; + Ts.push_back(SystemThreadFactory()->Run([cq] { + PullEvents(cq->get()); + })); } if (Options_.ExternalListener) { diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h index c9b48a6676..6da5076046 100644 --- a/library/cpp/grpc/server/grpc_server.h +++ b/library/cpp/grpc/server/grpc_server.h @@ -54,7 +54,14 @@ struct TServerOptions { //! Number of worker threads. DECLARE_FIELD(WorkerThreads, size_t, 2); - //! Create one completion queue per thread + //! Number of workers per completion queue, i.e. when + // WorkerThreads=8 and PriorityWorkersPerCompletionQueue=2 + // there will be 4 completion queues. When set to 0 then + // only UseCompletionQueuePerThread affects number of CQ. + DECLARE_FIELD(WorkersPerCompletionQueue, size_t, 0); + + //! Obsolete. Create one completion queue per thread. + // Setting true equals to the WorkersPerCompletionQueue=1 DECLARE_FIELD(UseCompletionQueuePerThread, bool, false); //! Memory quota size for grpc server in bytes. Zero means unlimited. @@ -122,6 +129,15 @@ class ICancelableContext { public: virtual void Shutdown() = 0; virtual ~ICancelableContext() = default; + +private: + template<class T> + friend class TGrpcServiceBase; + + // Shard assigned by RegisterRequestCtx. This field is not thread-safe + // because RegisterRequestCtx may only be called once for a single service, + // so it's only assigned once. + size_t ShardIndex = size_t(-1); }; template <class TLimit> @@ -166,7 +182,17 @@ class IGRpcService: public TThrRefBase { public: virtual grpc::Service* GetService() = 0; virtual void StopService() noexcept = 0; + virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0; + + virtual void InitService( + const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs, + TLoggerPtr logger, + size_t index) + { + InitService(cqs[index % cqs.size()].get(), logger); + } + virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0; virtual bool IsUnsafeToShutdown() const = 0; virtual size_t RequestsInProgress() const = 0; @@ -236,13 +262,15 @@ public: using TCurrentGRpcService = T; void StopService() noexcept override { - with_lock(Lock_) { - AtomicSet(ShuttingDown_, 1); - - // Send TryCansel to event (can be send after finishing). - // Actual dtors will be called from grpc thread, so deadlock impossible - for (auto* request : Requests_) { - request->Shutdown(); + AtomicSet(ShuttingDown_, 1); + + for (auto& shard : Shards_) { + with_lock(shard.Lock_) { + // Send TryCansel to event (can be send after finishing). + // Actual dtors will be called from grpc thread, so deadlock impossible + for (auto* request : shard.Requests_) { + request->Shutdown(); + } } } } @@ -263,8 +291,10 @@ public: size_t RequestsInProgress() const override { size_t c = 0; - with_lock(Lock_) { - c = Requests_.size(); + for (auto& shard : Shards_) { + with_lock(shard.Lock_) { + c += shard.Requests_.size(); + } } return c; } @@ -290,23 +320,29 @@ public: } bool RegisterRequestCtx(ICancelableContext* req) { - with_lock(Lock_) { - auto r = Requests_.emplace(req); - Y_VERIFY(r.second, "Ctx already registered"); + if (Y_LIKELY(req->ShardIndex == size_t(-1))) { + req->ShardIndex = NextShard_.fetch_add(1, std::memory_order_relaxed) % Shards_.size(); + } + auto& shard = Shards_[req->ShardIndex]; + with_lock(shard.Lock_) { if (IsShuttingDown()) { - // Server is already shutting down - Requests_.erase(r.first); return false; } + + auto r = shard.Requests_.emplace(req); + Y_VERIFY(r.second, "Ctx already registered"); } return true; } void DeregisterRequestCtx(ICancelableContext* req) { - with_lock(Lock_) { - Y_VERIFY(Requests_.erase(req), "Ctx is not registered"); + Y_VERIFY(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index"); + + auto& shard = Shards_[req->ShardIndex]; + with_lock(shard.Lock_) { + Y_VERIFY(shard.Requests_.erase(req), "Ctx is not registered"); } } @@ -325,8 +361,14 @@ private: bool SslServer_ = false; bool NeedAuth_ = false; - THashSet<ICancelableContext*> Requests_; - TAdaptiveLock Lock_; + struct TShard { + TAdaptiveLock Lock_; + THashSet<ICancelableContext*> Requests_; + }; + + // Note: benchmarks showed 4 shards is enough to scale to ~30 threads + TVector<TShard> Shards_{ size_t(4) }; + std::atomic<size_t> NextShard_{ 0 }; }; class TGRpcServer { diff --git a/library/cpp/unified_agent_client/CMakeLists.darwin.txt b/library/cpp/unified_agent_client/CMakeLists.darwin.txt new file mode 100644 index 0000000000..bad1e374fd --- /dev/null +++ b/library/cpp/unified_agent_client/CMakeLists.darwin.txt @@ -0,0 +1,63 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(proto) + +add_library(library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp +) +generate_enum_serilization(library-cpp-unified_agent_client + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/grpc_io.h +) + +add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client.global PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp +) diff --git a/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt b/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..ea3391bf18 --- /dev/null +++ b/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt @@ -0,0 +1,65 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(proto) + +add_library(library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp +) +generate_enum_serilization(library-cpp-unified_agent_client + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/grpc_io.h +) + +add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client.global PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp +) diff --git a/library/cpp/unified_agent_client/CMakeLists.linux.txt b/library/cpp/unified_agent_client/CMakeLists.linux.txt new file mode 100644 index 0000000000..ea3391bf18 --- /dev/null +++ b/library/cpp/unified_agent_client/CMakeLists.linux.txt @@ -0,0 +1,65 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(proto) + +add_library(library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp +) +generate_enum_serilization(library-cpp-unified_agent_client + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/grpc_io.h +) + +add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client) +target_include_directories(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc + ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include +) +target_link_libraries(library-cpp-unified_agent_client.global PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + cpp-logger-global + cpp-threading-future + cpp-monlib-dynamic_counters + cpp-unified_agent_client-proto + tools-enum_parser-enum_serialization_runtime +) +target_sources(library-cpp-unified_agent_client.global PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp +) diff --git a/library/cpp/unified_agent_client/CMakeLists.txt b/library/cpp/unified_agent_client/CMakeLists.txt new file mode 100644 index 0000000000..3e0811fb22 --- /dev/null +++ b/library/cpp/unified_agent_client/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/library/cpp/unified_agent_client/async_joiner.h b/library/cpp/unified_agent_client/async_joiner.h new file mode 100644 index 0000000000..ce392ef7bc --- /dev/null +++ b/library/cpp/unified_agent_client/async_joiner.h @@ -0,0 +1,42 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> + +namespace NUnifiedAgent { + class TAsyncJoiner { + public: + inline TAsyncJoiner() + : Promise(NThreading::NewPromise()) + , Refs(1) + { + } + + inline i64 Ref(i64 count = 1) noexcept { + const auto result = Refs.fetch_add(count); + Y_VERIFY(result >= 1, "already joined"); + return result; + } + + inline i64 UnRef() noexcept { + const auto prev = Refs.fetch_sub(1); + Y_VERIFY(prev >= 1); + if (prev == 1) { + auto p = Promise; + p.SetValue(); + } + return prev; + } + + inline NThreading::TFuture<void> Join() noexcept { + auto result = Promise; + UnRef(); + return result; + } + + private: + NThreading::TPromise<void> Promise; + std::atomic<i64> Refs; + }; + + using TAsyncJoinerToken = TIntrusivePtr<TAsyncJoiner>; +} diff --git a/library/cpp/unified_agent_client/backend.cpp b/library/cpp/unified_agent_client/backend.cpp new file mode 100644 index 0000000000..b3c4b4ebcf --- /dev/null +++ b/library/cpp/unified_agent_client/backend.cpp @@ -0,0 +1,112 @@ +#include "backend.h" + +#include <library/cpp/unified_agent_client/enum.h> + +#include <library/cpp/logger/record.h> + +#include <util/datetime/base.h> +#include <util/generic/guid.h> +#include <util/generic/serialized_enum.h> + +namespace NUnifiedAgent { + namespace { + class TDefaultRecordConverter : public IRecordConverter { + public: + TDefaultRecordConverter(bool stripTrailingNewLine) + : StripTrailingNewLine(stripTrailingNewLine) + , PriorityKey("_priority") + { + } + + TClientMessage Convert(const TLogRecord& rec) const override { + const auto stripTrailingNewLine = StripTrailingNewLine && + rec.Len > 0 && rec.Data[rec.Len - 1] == '\n'; + + THashMap<TString, TString> metaFlags{{PriorityKey, NameOf(rec.Priority)}}; + metaFlags.insert(rec.MetaFlags.begin(), rec.MetaFlags.end()); + + return { + TString(rec.Data, stripTrailingNewLine ? rec.Len - 1 : rec.Len), + std::move(metaFlags) + }; + } + + private: + const bool StripTrailingNewLine; + const TString PriorityKey; + }; + + class TClientSessionAdapter: public TLogBackend { + public: + explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IRecordConverter> recordConverter) + : Session(session) + , RecordConverter(std::move(recordConverter)) + { + } + + void WriteData(const TLogRecord& rec) override { + Session->Send(RecordConverter->Convert(rec)); + } + + void ReopenLog() override { + } + + private: + TClientSessionPtr Session; + THolder<IRecordConverter> RecordConverter; + }; + + class TSessionHolder { + protected: + TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters) + : Client(MakeClient(parameters)) + , Session(Client->CreateSession(sessionParameters)) + { + } + + protected: + TClientPtr Client; + TClientSessionPtr Session; + }; + + class TAgentLogBackend: private TSessionHolder, public TClientSessionAdapter { + public: + TAgentLogBackend(const TClientParameters& parameters, + const TSessionParameters& sessionParameters, + THolder<IRecordConverter> recordConverter) + : TSessionHolder(parameters, sessionParameters) + , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter)) + { + } + + ~TAgentLogBackend() override { + TSessionHolder::Session->Close(); + } + }; + } + + THolder<IRecordConverter> MakeDefaultRecordConverter(bool stripTrailingNewLine) { + return MakeHolder<TDefaultRecordConverter>(stripTrailingNewLine); + } + + THolder<TLogBackend> AsLogBackend(const TClientSessionPtr& session, bool stripTrailingNewLine) { + return MakeHolder<TClientSessionAdapter>(session, MakeDefaultRecordConverter(stripTrailingNewLine)); + } + + THolder<TLogBackend> MakeLogBackend(const TClientParameters& parameters, + const TSessionParameters& sessionParameters, + THolder<IRecordConverter> recordConverter) + { + if (!recordConverter) { + recordConverter = MakeDefaultRecordConverter(); + } + return MakeHolder<TAgentLogBackend>(parameters, sessionParameters, std::move(recordConverter)); + } + + THolder<::TLog> MakeLog(const TClientParameters& parameters, + const TSessionParameters& sessionParameters, + THolder<IRecordConverter> recordConverter) + { + return MakeHolder<::TLog>(MakeLogBackend(parameters, sessionParameters, std::move(recordConverter))); + } +} diff --git a/library/cpp/unified_agent_client/backend.h b/library/cpp/unified_agent_client/backend.h new file mode 100644 index 0000000000..41e8d146b3 --- /dev/null +++ b/library/cpp/unified_agent_client/backend.h @@ -0,0 +1,27 @@ +#pragma once + +#include <library/cpp/unified_agent_client/client.h> + +#include <library/cpp/logger/backend.h> +#include <library/cpp/logger/log.h> + +namespace NUnifiedAgent { + class IRecordConverter { + public: + virtual ~IRecordConverter() = default; + + virtual TClientMessage Convert(const TLogRecord&) const = 0; + }; + + THolder<IRecordConverter> MakeDefaultRecordConverter(bool stripTrailingNewLine = true); + + THolder<TLogBackend> AsLogBackend(const TClientSessionPtr& session, bool stripTrailingNewLine = true); + + THolder<TLogBackend> MakeLogBackend(const TClientParameters& parameters, + const TSessionParameters& sessionParameters = {}, + THolder<IRecordConverter> recordConverter = {}); + + THolder<::TLog> MakeLog(const TClientParameters& parameters, + const TSessionParameters& sessionParameters = {}, + THolder<IRecordConverter> recordConverter = {}); +} diff --git a/library/cpp/unified_agent_client/backend_creator.cpp b/library/cpp/unified_agent_client/backend_creator.cpp new file mode 100644 index 0000000000..825e3ebd2b --- /dev/null +++ b/library/cpp/unified_agent_client/backend_creator.cpp @@ -0,0 +1,63 @@ +#include "backend_creator.h" +#include <library/cpp/logger/global/global.h> + +namespace NUnifiedAgent { + + + TLogBackendCreator::TLogBackendCreator() + : TLogBackendCreatorBase("unified_agent") + {} + + bool TLogBackendCreator::Init(const IInitContext& ctx) { + if(TString socket = ctx.GetOrElse("Uri", TString())) { + ClientParams = MakeHolder<TClientParameters>(socket); + } else { + Cdbg << "Uri not set for unified_agent log backend" << Endl; + return false; + } + TString secretKey; + ctx.GetValue("SharedSecretKey", secretKey); + if (secretKey) { + ClientParams->SharedSecretKey = secretKey; + } + ctx.GetValue("MaxInflightBytes", ClientParams->MaxInflightBytes); + ctx.GetValue("GrpcSendDelay", ClientParams->GrpcSendDelay); + size_t rateLimit; + if (ctx.GetValue("LogRateLimit", rateLimit)) { + ClientParams->LogRateLimitBytes = rateLimit; + } + ctx.GetValue("GrpcReconnectDelay", ClientParams->GrpcReconnectDelay); + ctx.GetValue("GrpcMaxMessageSize", ClientParams->GrpcMaxMessageSize); + const auto ownLogger = ctx.GetChildren("OwnLogger"); + if (!ownLogger.empty() && ownLogger.front()->GetOrElse("LoggerType", TString()) != "global") { + OwnLogger = ILogBackendCreator::Create(*ownLogger.front()); + TLog log; + log.ResetBackend(OwnLogger->CreateLogBackend()); + ClientParams->SetLog(log); + } + return true; + } + + + void TLogBackendCreator::DoToJson(NJson::TJsonValue& value) const { + value["Uri"] = ClientParams->Uri; + if (ClientParams->SharedSecretKey) { + value["SharedSecretKey"] = *ClientParams->SharedSecretKey; + } + value["MaxInflightBytes"] = ClientParams->MaxInflightBytes; + value["GrpcSendDelay"] = ClientParams->GrpcSendDelay.ToString(); + if (ClientParams->LogRateLimitBytes) { + value["LogRateLimit"] = *ClientParams->LogRateLimitBytes; + } + value["GrpcReconnectDelay"] = ClientParams->GrpcReconnectDelay.ToString(); + value["GrpcMaxMessageSize"] = ClientParams->GrpcMaxMessageSize; + if (OwnLogger) { + OwnLogger->ToJson(value["OwnLogger"].AppendValue(NJson::JSON_MAP)); + } + } + + THolder<TLogBackend> TLogBackendCreator::DoCreateLogBackend() const { + return MakeLogBackend(*ClientParams); + } + +} diff --git a/library/cpp/unified_agent_client/backend_creator.h b/library/cpp/unified_agent_client/backend_creator.h new file mode 100644 index 0000000000..04928f616c --- /dev/null +++ b/library/cpp/unified_agent_client/backend_creator.h @@ -0,0 +1,25 @@ +#pragma once + +#include "backend.h" +#include <library/cpp/logger/backend_creator.h> + +namespace NUnifiedAgent { + + class TLogBackendCreator: public TLogBackendCreatorBase { + public: + TLogBackendCreator(); + bool Init(const IInitContext& ctx) override; + static TFactory::TRegistrator<TLogBackendCreator> Registrar; + + protected: + void DoToJson(NJson::TJsonValue& value) const override; + + private: + THolder<TLogBackend> DoCreateLogBackend() const override; + + private: + THolder<TClientParameters> ClientParams; + THolder<ILogBackendCreator> OwnLogger; + }; + +} diff --git a/library/cpp/unified_agent_client/client.h b/library/cpp/unified_agent_client/client.h new file mode 100644 index 0000000000..62e1210803 --- /dev/null +++ b/library/cpp/unified_agent_client/client.h @@ -0,0 +1,256 @@ +#pragma once + +#include <library/cpp/unified_agent_client/counters.h> + +#include <library/cpp/logger/log.h> +#include <library/cpp/threading/future/future.h> + +#include <util/datetime/base.h> +#include <util/generic/hash.h> +#include <util/generic/maybe.h> +#include <util/generic/string.h> + +namespace NUnifiedAgent { + struct TClientParameters { + // uri format https://github.com/grpc/grpc/blob/master/doc/naming.md + // for example: unix:///unified_agent for unix domain sockets or localhost:12345 for tcp + explicit TClientParameters(const TString& uri); + + // Simple way to protect against writing to unintended/invalid Unified Agent endpoint. + // Must correspond to 'shared_secret_key' grpc input parameter + // (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6333542#L219), + // session would end with error otherwise. + // + // Default: not set + TClientParameters& SetSharedSecretKey(const TString& sharedSecretKey) { + SharedSecretKey = sharedSecretKey; + return *this; + } + + // Max bytes count that have been received by client session but not acknowledged yet. + // When exceeded, new messages will be discarded, an error message + // will be written to the TLog instance and drop counter will be incremented. + // + // Default: 10 mb + TClientParameters& SetMaxInflightBytes(size_t maxInflightBytes) { + MaxInflightBytes = maxInflightBytes; + return *this; + } + + // TLog instance for client library's own logs. + // + // Default: TLoggerOperator<TGlobalLog>::Log() + TClientParameters& SetLog(TLog& log) { + Log = log; + return *this; + } + + // Throttle client library log by rate limit in bytes, excess will be discarded. + // + // Default: not set + TClientParameters& SetLogRateLimit(size_t bytesPerSec) { + LogRateLimitBytes = bytesPerSec; + return *this; + } + + // Try to establish new grpc session if the current one become broken. + // Session may break either due to agent unavailability, or the agent itself may + // reject new session creation if it does not satisfy certain + // conditions - shared_secret_key does not match, the session creation rate has been + // exceeded, invalid session metadata has been used and so on. + // Attempts to establish a grpc session will continue indefinitely. + // + // Default: 50 millis + TClientParameters& SetGrpcReconnectDelay(TDuration delay) { + GrpcReconnectDelay = delay; + return *this; + } + + // Grpc usually writes data to the socket faster than it comes from the client. + // This means that it's possible that each TClientMessage would be sent in it's own grpc message. + // This is expensive in terms of cpu, since grpc makes at least one syscall + // for each message on the sender and receiver sides. + // To avoid a large number of syscalls, the client holds incoming messages + // in internal buffer in hope of being able to assemble bigger grpc batch. + // This parameter sets the timeout for this delay - from IClientSession::Send + // call to the actual sending of the corresponding grpc message. + // + // Default: 10 millis. + TClientParameters& SetGrpcSendDelay(TDuration delay) { + GrpcSendDelay = delay; + return *this; + } + + // Client library sends messages to grpc in batches, this parameter + // establishes upper limit on the size of single batch in bytes. + // If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185) + // in grpc input config, it must be grater than GrpcMaxMessageSize. + // + // Default: 1 mb + TClientParameters& SetGrpcMaxMessageSize(size_t size) { + GrpcMaxMessageSize = size; + return *this; + } + + // Enable forks handling in client library. + // Multiple threads and concurrent forks are all supported is this regime. + // + // Default: false + TClientParameters& SetEnableForkSupport(bool value) { + EnableForkSupport = value; + return *this; + } + + // Client library counters. + // App can set this to some leaf of it's TDynamicCounters tree. + // Actual provided counters are listed in TClientCounters. + // + // Default: not set + TClientParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) { + return SetCounters(MakeIntrusive<TClientCounters>(counters)); + } + + TClientParameters& SetCounters(const TIntrusivePtr<TClientCounters>& counters) { + Counters = counters; + return *this; + } + + public: + static const size_t DefaultMaxInflightBytes; + static const size_t DefaultGrpcMaxMessageSize; + static const TDuration DefaultGrpcSendDelay; + + public: + TString Uri; + TMaybe<TString> SharedSecretKey; + size_t MaxInflightBytes; + TLog Log; + TMaybe<size_t> LogRateLimitBytes; + TDuration GrpcReconnectDelay; + TDuration GrpcSendDelay; + bool EnableForkSupport; + size_t GrpcMaxMessageSize; + TIntrusivePtr<TClientCounters> Counters; + }; + + struct TSessionParameters { + TSessionParameters(); + + // Session unique identifier. + // It's guaranteed that for messages with the same sessionId relative + // ordering of the messages will be preserved at all processing stages + // in library, in Unified Agent and in other systems that respect ordering (e.g., Logbroker) + // + // Default: generated automatically by Unified Agent. + TSessionParameters& SetSessionId(const TString& sessionId) { + SessionId = sessionId; + return *this; + } + + // Session metadata as key-value set. + // Can be used by agent filters and outputs for validation/routing/enrichment/etc. + // + // Default: not set + TSessionParameters& SetMeta(const THashMap<TString, TString>& meta) { + Meta = meta; + return *this; + } + + // Session counters. + // Actual provided counters are listed in TClientSessionCounters. + // + // Default: A single common for all sessions subgroup of client TDynamicCounters instance + // with label ('session': 'default'). + TSessionParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) { + return SetCounters(MakeIntrusive<TClientSessionCounters>(counters)); + } + + TSessionParameters& SetCounters(const TIntrusivePtr<TClientSessionCounters>& counters) { + Counters = counters; + return *this; + } + + // Max bytes count that have been received by client session but not acknowledged yet. + // When exceeded, new messages will be discarded, an error message + // will be written to the TLog instance and drop counter will be incremented. + // + // Default: value from client settings + TSessionParameters& SetMaxInflightBytes(size_t maxInflightBytes) { + MaxInflightBytes = maxInflightBytes; + return *this; + } + + public: + TMaybe<TString> SessionId; + TMaybe<THashMap<TString, TString>> Meta; + TIntrusivePtr<TClientSessionCounters> Counters; + TMaybe<size_t> MaxInflightBytes; + }; + + // Message data to be sent to unified agent. + struct TClientMessage { + // Opaque message payload. + TString Payload; + + // Message metadata as key-value set. + // Can be used by agent filters and outputs for validation/routing/enrichment/etc. + // + // Default: not set + TMaybe<THashMap<TString, TString>> Meta{}; + + // Message timestamp. + // + // Default: time the client library has received this instance of TClientMessage. + TMaybe<TInstant> Timestamp{}; + }; + + // Message size as it is accounted in byte-related metrics (ReceivedBytes, InflightBytes, etc). + size_t SizeOf(const TClientMessage& message); + + class IClientSession: public TAtomicRefCount<IClientSession> { + public: + virtual ~IClientSession() = default; + + // Places the message into send queue. Actual grpc call may occur later asynchronously, + // based on settings GrpcSendDelay and GrpcMaxMessageSize. + // A message can be discarded if the limits defined by the GrpcMaxMessageSize and MaxInflightBytes + // settings are exceeded, or if the Close method has already been called. + // In this case an error message will be written to the TLog instance + // and drop counter will be incremented. + virtual void Send(TClientMessage&& message) = 0; + + void Send(const TClientMessage& message) { + Send(TClientMessage(message)); + } + + // Waits until either all current inflight messages are + // acknowledged or the specified deadline is reached. + // Upon the deadline grpc connection would be forcefully dropped (via grpc::ClientContext::TryCancel). + virtual NThreading::TFuture<void> CloseAsync(TInstant deadline) = 0; + + void Close(TInstant deadline) { + CloseAsync(deadline).Wait(); + } + + void Close(TDuration timeout = TDuration::Seconds(3)) { + Close(Now() + timeout); + } + }; + using TClientSessionPtr = TIntrusivePtr<IClientSession>; + + class IClient: public TAtomicRefCount<IClient> { + public: + virtual ~IClient() = default; + + virtual TClientSessionPtr CreateSession(const TSessionParameters& parameters = {}) = 0; + + virtual void StartTracing(ELogPriority) { + } + + virtual void FinishTracing() { + } + }; + using TClientPtr = TIntrusivePtr<IClient>; + + TClientPtr MakeClient(const TClientParameters& parameters); +} diff --git a/library/cpp/unified_agent_client/client_impl.cpp b/library/cpp/unified_agent_client/client_impl.cpp new file mode 100644 index 0000000000..4db98120fd --- /dev/null +++ b/library/cpp/unified_agent_client/client_impl.cpp @@ -0,0 +1,1274 @@ +#include "client_impl.h" +#include "helpers.h" + +#include <contrib/libs/grpc/include/grpc/grpc.h> +#include <contrib/libs/grpc/src/core/lib/gpr/string.h> +#include <contrib/libs/grpc/src/core/lib/gprpp/fork.h> +#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h> + +#include <util/charset/utf8.h> +#include <util/generic/size_literals.h> +#include <util/system/env.h> + +using namespace NThreading; +using namespace NMonitoring; + +namespace NUnifiedAgent::NPrivate { + std::shared_ptr<grpc::Channel> CreateChannel(const grpc::string& target) { + grpc::ChannelArguments args; + args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE); + args.SetMaxReceiveMessageSize(Max<int>()); + args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000); + args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000); + args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100); + args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 200); + args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); + args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1); + args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 5000); + args.SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000); + args.SetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE, 1024*1024); + return grpc::CreateCustomChannel(target, grpc::InsecureChannelCredentials(), args); + } + + void AddMeta(NUnifiedAgentProto::Request_Initialize& init, const TString& name, const TString& value) { + auto* metaItem = init.MutableMeta()->Add(); + metaItem->SetName(name); + metaItem->SetValue(value); + } + + std::atomic<ui64> TClient::Id{0}; + + TClient::TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector) + : Parameters(parameters) + , ForkProtector(forkProtector) + , Counters(parameters.Counters ? parameters.Counters : MakeIntrusive<TClientCounters>()) + , Log(parameters.Log) + , MainLogger(Log, MakeFMaybe(Parameters.LogRateLimitBytes)) + , Logger(MainLogger.Child(Sprintf("ua_%lu", Id.fetch_add(1)))) + , Channel(nullptr) + , Stub(nullptr) + , ActiveCompletionQueue(nullptr) + , SessionLogLabel(0) + , ActiveSessions() + , Started(false) + , Destroyed(false) + , Lock() + { + MainLogger.SetDroppedBytesCounter(&Counters->ClientLogDroppedBytes); + + if (ForkProtector != nullptr) { + ForkProtector->Register(*this); + } + + EnsureStarted(); + + YLOG_INFO(Sprintf("created, uri [%s]", Parameters.Uri.c_str())); + } + + TClient::~TClient() { + with_lock(Lock) { + Y_VERIFY(ActiveSessions.empty(), "active sessions found"); + + EnsureStoppedNoLock(); + + Destroyed = true; + } + + if (ForkProtector != nullptr) { + ForkProtector->Unregister(*this); + } + + YLOG_INFO(Sprintf("destroyed, uri [%s]", Parameters.Uri.c_str())); + } + + TClientSessionPtr TClient::CreateSession(const TSessionParameters& parameters) { + return MakeIntrusive<TClientSession>(this, parameters); + } + + void TClient::StartTracing(ELogPriority logPriority) { + MainLogger.StartTracing(logPriority); + StartGrpcTracing(); + YLOG_INFO("tracing started"); + } + + void TClient::FinishTracing() { + FinishGrpcTracing(); + MainLogger.FinishTracing(); + YLOG_INFO("tracing finished"); + } + + void TClient::RegisterSession(TClientSession* session) { + with_lock(Lock) { + ActiveSessions.push_back(session); + } + } + + void TClient::UnregisterSession(TClientSession* session) { + with_lock(Lock) { + const auto it = Find(ActiveSessions, session); + Y_VERIFY(it != ActiveSessions.end()); + ActiveSessions.erase(it); + } + } + + void TClient::PreFork() { + YLOG_INFO("pre fork started"); + + Lock.Acquire(); + + auto futures = TVector<TFuture<void>>(Reserve(ActiveSessions.size())); + for (auto* s: ActiveSessions) { + futures.push_back(s->PreFork()); + } + YLOG_INFO("waiting for sessions"); + WaitAll(futures).Wait(); + + EnsureStoppedNoLock(); + + YLOG_INFO("shutdown grpc executor"); + grpc_core::Executor::SetThreadingAll(false); + + YLOG_INFO("pre fork finished"); + } + + void TClient::PostForkParent() { + YLOG_INFO("post fork parent started"); + + if (!Destroyed) { + EnsureStartedNoLock(); + } + Lock.Release(); + + for (auto* s: ActiveSessions) { + s->PostForkParent(); + } + + YLOG_INFO("post fork parent finished"); + } + + void TClient::PostForkChild() { + YLOG_INFO("post fork child started"); + + Lock.Release(); + + for (auto* s: ActiveSessions) { + s->PostForkChild(); + } + + YLOG_INFO("post fork child finished"); + } + + void TClient::EnsureStarted() { + with_lock(Lock) { + EnsureStartedNoLock(); + } + } + + void TClient::EnsureStartedNoLock() { + // Lock must be held + + if (Started) { + return; + } + + Channel = CreateChannel(Parameters.Uri); + Stub = NUnifiedAgentProto::UnifiedAgentService::NewStub(Channel); + ActiveCompletionQueue = MakeHolder<TGrpcCompletionQueueHost>(); + ActiveCompletionQueue->Start(); + + Started = true; + } + + void TClient::EnsureStoppedNoLock() { + // Lock must be held + + if (!Started) { + return; + } + + YLOG_INFO("stopping"); + ActiveCompletionQueue->Stop(); + ActiveCompletionQueue = nullptr; + Stub = nullptr; + Channel = nullptr; + YLOG_INFO("stopped"); + + Started = false; + } + + TScopeLogger TClient::CreateSessionLogger() { + return Logger.Child(ToString(SessionLogLabel.fetch_add(1))); + } + + TForkProtector::TForkProtector() + : Clients() + , GrpcInitializer() + , Enabled(grpc_core::Fork::Enabled()) + , Lock() + { + } + + void TForkProtector::Register(TClient& client) { + if (!Enabled) { + return; + } + + Y_VERIFY(grpc_is_initialized()); + Y_VERIFY(grpc_core::Fork::Enabled()); + + with_lock(Lock) { + Clients.push_back(&client); + } + } + + void TForkProtector::Unregister(TClient& client) { + if (!Enabled) { + return; + } + + with_lock(Lock) { + const auto it = Find(Clients, &client); + Y_VERIFY(it != Clients.end()); + Clients.erase(it); + } + } + + std::shared_ptr<TForkProtector> TForkProtector::Get(bool createIfNotExists) { + with_lock(InstanceLock) { + auto result = Instance.lock(); + if (!result && createIfNotExists) { + SetEnv("GRPC_ENABLE_FORK_SUPPORT", "true"); + result = std::make_shared<TForkProtector>(); + if (!result->Enabled) { + TLog log("cerr"); + TLogger logger(log, Nothing()); + auto scopeLogger = logger.Child("ua client"); + YLOG(TLOG_WARNING, + "Grpc is already initialized, can't enable fork support. " + "If forks are possible, please set environment variable GRPC_ENABLE_FORK_SUPPORT to 'true'. " + "If not, you can suppress this warning by setting EnableForkSupport " + "to false when creating the ua client.", + scopeLogger); + } else if (!SubscribedToForks) { + SubscribedToForks = true; + #ifdef _unix_ + pthread_atfork( + &TForkProtector::PreFork, + &TForkProtector::PostForkParent, + &TForkProtector::PostForkChild); + #endif + } + + Instance = result; + } + return result; + } + } + + void TForkProtector::PreFork() { + auto self = Get(false); + if (!self) { + return; + } + self->Lock.Acquire(); + for (auto* c : self->Clients) { + c->PreFork(); + } + } + + void TForkProtector::PostForkParent() { + auto self = Get(false); + if (!self) { + return; + } + for (auto* c : self->Clients) { + c->PostForkParent(); + } + self->Lock.Release(); + } + + void TForkProtector::PostForkChild() { + auto self = Get(false); + if (!self) { + return; + } + for (auto* c : self->Clients) { + c->PostForkChild(); + } + self->Lock.Release(); + } + + std::weak_ptr<TForkProtector> TForkProtector::Instance{}; + TMutex TForkProtector::InstanceLock{}; + bool TForkProtector::SubscribedToForks{false}; + + TClientSession::TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters) + : AsyncJoiner() + , Client(client) + , OriginalSessionId(MakeFMaybe(parameters.SessionId)) + , SessionId(OriginalSessionId) + , Meta(MakeFMaybe(parameters.Meta)) + , Logger(Client->CreateSessionLogger()) + , CloseStarted(false) + , ForcedCloseStarted(false) + , Closed(false) + , ForkInProgressLocal(false) + , Started(false) + , ClosePromise() + , ActiveGrpcCall(nullptr) + , WriteQueue() + , TrimmedCount(0) + , NextIndex(0) + , AckSeqNo(Nothing()) + , PollerLastEventTimestamp() + , Counters(parameters.Counters ? parameters.Counters : Client->GetCounters()->GetDefaultSessionCounters()) + , MakeGrpcCallTimer(nullptr) + , ForceCloseTimer(nullptr) + , PollTimer(nullptr) + , GrpcInflightMessages(0) + , GrpcInflightBytes(0) + , InflightBytes(0) + , CloseRequested(false) + , EventsBatchSize(0) + , PollingStatus(EPollingStatus::Inactive) + , EventNotification(nullptr) + , EventNotificationTriggered(false) + , EventsBatch() + , SecondaryEventsBatch() + , ForkInProgress(false) + , Lock() + , MaxInflightBytes( + parameters.MaxInflightBytes.GetOrElse(Client->GetParameters().MaxInflightBytes)) + , AgentMaxReceiveMessage(Nothing()) { + if (Meta.Defined() && !IsUtf8(*Meta)) { + throw std::runtime_error("session meta contains non UTF-8 characters"); + } + Y_ENSURE(!(Client->GetParameters().EnableForkSupport && SessionId.Defined()), + "explicit session id is not supported with forks"); + Client->RegisterSession(this); + + with_lock(Lock) { + DoStart(); + } + } + + TFuture<void> TClientSession::PreFork() { + YLOG_INFO("pre fork started"); + + Lock.Acquire(); + + YLOG_INFO("triggering event notification"); + if (!EventNotificationTriggered) { + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + + YLOG_INFO("setting 'fork in progress' flag"); + ForkInProgress.store(true); + + if (!Started) { + ClosePromise.TrySetValue(); + } + YLOG_INFO("pre fork finished"); + return ClosePromise.GetFuture(); + } + + void TClientSession::PostForkParent() { + YLOG_INFO("post fork parent started"); + ForkInProgress.store(false); + ForkInProgressLocal = false; + Started = false; + + if (!CloseRequested) { + DoStart(); + + YLOG_INFO("triggering event notification"); + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + + Lock.Release(); + + YLOG_INFO("post fork parent finished"); + } + + void TClientSession::PostForkChild() { + YLOG_INFO("post fork child started"); + ForkInProgress.store(false); + ForkInProgressLocal = false; + Started = false; + + SessionId.Clear(); + TrimmedCount = 0; + NextIndex = 0; + AckSeqNo.Clear(); + PurgeWriteQueue(); + EventsBatch.clear(); + SecondaryEventsBatch.clear(); + EventsBatchSize = 0; + + Lock.Release(); + + YLOG_INFO("post fork child finished"); + } + + void TClientSession::SetAgentMaxReceiveMessage(size_t newValue) { + AgentMaxReceiveMessage = newValue; + } + + void TClientSession::DoStart() { + // Lock must be held + + Y_VERIFY(!Started); + YLOG_INFO("starting"); + + Client->EnsureStarted(); + + MakeGrpcCallTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + if (status == EIOStatus::Error) { + return; + } + MakeGrpcCall(); + }, &AsyncJoiner)); + ForceCloseTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + if (status == EIOStatus::Error) { + return; + } + YLOG_INFO("ForceCloseTimer"); + BeginClose(TInstant::Zero()); + }, &AsyncJoiner)); + PollTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + if (status == EIOStatus::Error) { + return; + } + Poll(); + }, &AsyncJoiner)); + EventNotification = MakeHolder<TGrpcNotification>(Client->GetCompletionQueue(), + MakeIOCallback([this](EIOStatus status) { + Y_VERIFY(status == EIOStatus::Ok); + Poll(); + }, &AsyncJoiner)); + + CloseStarted = false; + ForcedCloseStarted = false; + Closed = false; + ClosePromise = NewPromise(); + EventNotificationTriggered = false; + PollerLastEventTimestamp = Now(); + PollingStatus = EPollingStatus::Inactive; + + ++Client->GetCounters()->ActiveSessionsCount; + MakeGrpcCallTimer->Set(Now()); + YLOG_INFO(Sprintf("started, sessionId [%s]", OriginalSessionId.GetOrElse("").c_str())); + + Started = true; + } + + void TClientSession::MakeGrpcCall() { + if (Closed) { + YLOG_INFO("MakeGrpcCall, session already closed"); + return; + } + Y_VERIFY(!ForcedCloseStarted); + Y_VERIFY(!ActiveGrpcCall); + ActiveGrpcCall = MakeIntrusive<TGrpcCall>(*this); + ActiveGrpcCall->Start(); + ++Counters->GrpcCalls; + if (CloseStarted) { + ActiveGrpcCall->BeginClose(false); + } + } + + TClientSession::~TClientSession() { + Close(TInstant::Zero()); + AsyncJoiner.Join().Wait(); + Client->UnregisterSession(this); + YLOG_INFO("destroyed"); + } + + void TClientSession::Send(TClientMessage&& message) { + const auto messageSize = SizeOf(message); + ++Counters->ReceivedMessages; + Counters->ReceivedBytes += messageSize; + if (messageSize > Client->GetParameters().GrpcMaxMessageSize) { + YLOG_ERR(Sprintf("message size [%lu] is greater than max grpc message size [%lu], message dropped", + messageSize, Client->GetParameters().GrpcMaxMessageSize)); + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + if (message.Meta.Defined() && !IsUtf8(*message.Meta)) { + YLOG_ERR("message meta contains non UTF-8 characters, message dropped"); + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + if (!message.Timestamp.Defined()) { + message.Timestamp = TInstant::Now(); + } + ++Counters->InflightMessages; + Counters->InflightBytes += messageSize; + { + auto g = Guard(Lock); + + if (!Started) { + DoStart(); + } + + if (CloseRequested) { + g.Release(); + YLOG_ERR(Sprintf("session is closing, message dropped, [%lu] bytes", messageSize)); + --Counters->InflightMessages; + Counters->InflightBytes -= messageSize; + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + if (InflightBytes.load() + messageSize > MaxInflightBytes) { + g.Release(); + YLOG_ERR(Sprintf("max inflight of [%lu] bytes reached, [%lu] bytes dropped", + MaxInflightBytes, messageSize)); + --Counters->InflightMessages; + Counters->InflightBytes -= messageSize; + ++Counters->DroppedMessages; + Counters->DroppedBytes += messageSize; + ++Counters->ErrorsCount; + return; + } + InflightBytes.fetch_add(messageSize); + EventsBatch.push_back(TMessageReceivedEvent{std::move(message), messageSize}); + EventsBatchSize += messageSize; + if ((PollingStatus == EPollingStatus::Inactive || + EventsBatchSize >= Client->GetParameters().GrpcMaxMessageSize) && + !EventNotificationTriggered) + { + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + } + } + + TFuture<void> TClientSession::CloseAsync(TInstant deadline) { + YLOG_INFO(Sprintf("close, deadline [%s]", ToString(deadline).c_str())); + if (!ClosePromise.GetFuture().HasValue()) { + with_lock(Lock) { + if (!Started) { + return MakeFuture(); + } + + CloseRequested = true; + + EventsBatch.push_back(TCloseRequestedEvent{deadline}); + if (!EventNotificationTriggered) { + EventNotificationTriggered = true; + EventNotification->Trigger(); + } + } + } + return ClosePromise.GetFuture(); + } + + void TClientSession::BeginClose(TInstant deadline) { + if (Closed) { + return; + } + if (!CloseStarted) { + CloseStarted = true; + YLOG_INFO("close started"); + } + const auto force = deadline == TInstant::Zero(); + if (force && !ForcedCloseStarted) { + ForcedCloseStarted = true; + YLOG_INFO("forced close started"); + } + if (!ActiveGrpcCall && (ForcedCloseStarted || WriteQueue.empty())) { + DoClose(); + } else { + if (!force) { + ForceCloseTimer->Set(deadline); + } + if (ActiveGrpcCall) { + ActiveGrpcCall->BeginClose(ForcedCloseStarted); + } + } + } + + void TClientSession::Poll() { + if (ForkInProgressLocal) { + return; + } + + const auto now = Now(); + const auto sendDelay = Client->GetParameters().GrpcSendDelay; + const auto oldPollingStatus = PollingStatus; + + { + if (!Lock.TryAcquire()) { + TSpinWait sw; + + while (Lock.IsLocked() || !Lock.TryAcquire()) { + if (ForkInProgress.load()) { + YLOG_INFO("poller 'fork in progress' signal received, stopping session"); + ForkInProgressLocal = true; + if (!ActiveGrpcCall || !ActiveGrpcCall->Initialized()) { + BeginClose(TInstant::Max()); + } else if (ActiveGrpcCall->ReuseSessions()) { + ActiveGrpcCall->Poison(); + BeginClose(TInstant::Max()); + } else { + BeginClose(TInstant::Zero()); + } + return; + } + sw.Sleep(); + } + } + + if (!EventsBatch.empty()) { + DoSwap(EventsBatch, SecondaryEventsBatch); + EventsBatchSize = 0; + PollerLastEventTimestamp = now; + } + const auto needNextPollStep = sendDelay != TDuration::Zero() && + !CloseRequested && + (now - PollerLastEventTimestamp) < 10 * sendDelay; + PollingStatus = needNextPollStep ? EPollingStatus::Active : EPollingStatus::Inactive; + EventNotificationTriggered = false; + + Lock.Release(); + } + + if (PollingStatus == EPollingStatus::Active) { + PollTimer->Set(now + sendDelay); + } + if (PollingStatus != oldPollingStatus) { + YLOG_DEBUG(Sprintf("poller %s", PollingStatus == EPollingStatus::Active ? "started" : "stopped")); + } + if (auto& batch = SecondaryEventsBatch; !batch.empty()) { + auto closeIt = FindIf(batch, [](const auto& e) { + return std::holds_alternative<TCloseRequestedEvent>(e); + }); + + if (auto it = begin(batch); it != closeIt) { + Y_VERIFY(!CloseStarted); + do { + auto& e = std::get<TMessageReceivedEvent>(*it++); + WriteQueue.push_back({std::move(e.Message), e.Size, false}); + } while (it != closeIt); + if (ActiveGrpcCall) { + ActiveGrpcCall->NotifyMessageAdded(); + } + } + + for (auto endIt = end(batch); closeIt != endIt; ++closeIt) { + const auto& e = std::get<TCloseRequestedEvent>(*closeIt); + BeginClose(e.Deadline); + } + + batch.clear(); + } + }; + + void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) { + auto& initializeMessage = *target.MutableInitialize(); + if (SessionId.Defined()) { + initializeMessage.SetSessionId(*SessionId); + } + if (Client->GetParameters().SharedSecretKey.Defined()) { + initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey); + } + if (Meta.Defined()) { + for (const auto& p: *Meta) { + AddMeta(initializeMessage, p.first, p.second); + } + } + if (!Meta.Defined() || Meta->find("_reusable") == Meta->end()) { + AddMeta(initializeMessage, "_reusable", "true"); + } + } + + TClientSession::TRequestBuilder::TRequestBuilder(NUnifiedAgentProto::Request& target, size_t RequestPayloadLimitBytes, + TFMaybe<size_t> serializedRequestLimitBytes) + : Target(target) + , PwTarget(MakeFMaybe<NPW::TRequest>()) + , MetaItems() + , RequestPayloadSize(0) + , RequestPayloadLimitBytes(RequestPayloadLimitBytes) + , SerializedRequestSize(0) + , SerializedRequestLimitBytes(serializedRequestLimitBytes) + , CountersInvalid(false) + { + } + + void TClientSession::TRequestBuilder::ResetCounters() { + RequestPayloadSize = 0; + SerializedRequestSize = 0; + PwTarget.Clear(); + PwTarget.ConstructInPlace(); + CountersInvalid = false; + } + + TClientSession::TRequestBuilder::TAddResult TClientSession::TRequestBuilder::TryAddMessage( + const TPendingMessage& message, size_t seqNo) { + Y_VERIFY(!CountersInvalid); + { + // add item to pwRequest to increase calculated size + PwTarget->DataBatch.SeqNo.Add(seqNo); + PwTarget->DataBatch.Timestamp.Add(message.Message.Timestamp->MicroSeconds()); + PwTarget->DataBatch.Payload.Add().SetValue(message.Message.Payload); + if (message.Message.Meta.Defined()) { + for (const auto &m: *message.Message.Meta) { + TMetaItemBuilder *metaItemBuilder = nullptr; + { + auto it = MetaItems.find(m.first); + if (it == MetaItems.end()) { + PwTarget->DataBatch.Meta.Add().Key.SetValue(m.first); + } else { + metaItemBuilder = &it->second; + } + } + size_t metaItemIdx = (metaItemBuilder != nullptr) ? metaItemBuilder->ItemIndex : + PwTarget->DataBatch.Meta.GetSize() - 1; + auto &pwMetaItem = PwTarget->DataBatch.Meta.Get(metaItemIdx); + pwMetaItem.Value.Add().SetValue(m.second); + const auto index = Target.GetDataBatch().SeqNoSize(); + if ((metaItemBuilder != nullptr && metaItemBuilder->ValueIndex != index) || + (metaItemBuilder == nullptr && index != 0)) { + const auto valueIdx = (metaItemBuilder) ? metaItemBuilder->ValueIndex : 0; + pwMetaItem.SkipStart.Add(valueIdx); + pwMetaItem.SkipLength.Add(index - valueIdx); + } + } + } + } + const auto newSerializedRequestSize = PwTarget->ByteSizeLong(); + const auto newPayloadSize = RequestPayloadSize + message.Size; + if ((SerializedRequestLimitBytes.Defined() && newSerializedRequestSize > *SerializedRequestLimitBytes) || + newPayloadSize > RequestPayloadLimitBytes) { + CountersInvalid = true; + return {true, newPayloadSize, newSerializedRequestSize}; + } + + { + // add item to the real request + auto& batch = *Target.MutableDataBatch(); + batch.AddSeqNo(seqNo); + batch.AddTimestamp(message.Message.Timestamp->MicroSeconds()); + batch.AddPayload(message.Message.Payload); + if (message.Message.Meta.Defined()) { + for (const auto &m: *message.Message.Meta) { + TMetaItemBuilder *metaItemBuilder; + { + auto it = MetaItems.find(m.first); + if (it == MetaItems.end()) { + batch.AddMeta()->SetKey(m.first); + auto insertResult = MetaItems.insert({m.first, {batch.MetaSize() - 1}}); + Y_VERIFY(insertResult.second); + metaItemBuilder = &insertResult.first->second; + } else { + metaItemBuilder = &it->second; + } + } + auto *metaItem = batch.MutableMeta(metaItemBuilder->ItemIndex); + metaItem->AddValue(m.second); + const auto index = batch.SeqNoSize() - 1; + if (metaItemBuilder->ValueIndex != index) { + metaItem->AddSkipStart(metaItemBuilder->ValueIndex); + metaItem->AddSkipLength(index - metaItemBuilder->ValueIndex); + } + metaItemBuilder->ValueIndex = index + 1; + } + } + SerializedRequestSize = newSerializedRequestSize; + RequestPayloadSize = newPayloadSize; + } + + return {false, newPayloadSize, newSerializedRequestSize}; + } + + void TClientSession::PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target) { + Y_VERIFY(AckSeqNo.Defined()); + TRequestBuilder requestBuilder(target, Client->GetParameters().GrpcMaxMessageSize, AgentMaxReceiveMessage); + const auto startIndex = NextIndex - TrimmedCount; + for (size_t i = startIndex; i < WriteQueue.size(); ++i) { + auto& queueItem = WriteQueue[i]; + if (queueItem.Skipped) { + NextIndex++; + continue; + } + + const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1); + const auto serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0; + if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) { + YLOG_ERR(Sprintf("single serialized message is too large [%lu] > [%lu], dropping it", + addResult.NewSerializedRequestSize, serializedLimitToLog)); + queueItem.Skipped = true; + ++Counters->DroppedMessages; + Counters->DroppedBytes += queueItem.Size; + ++Counters->ErrorsCount; + NextIndex++; + requestBuilder.ResetCounters(); + continue; + } + if (addResult.LimitExceeded) { + YLOG_DEBUG(Sprintf( + "batch limit exceeded: [%lu] > [%lu] (limit for serialized batch)" + "OR [%lu] > [%lu] (limit for raw batch)", + addResult.NewSerializedRequestSize, serializedLimitToLog, + addResult.NewRequestPayloadSize, Client->GetParameters().GrpcMaxMessageSize)); + break; + } + + NextIndex++; + } + const auto messagesCount = target.GetDataBatch().SeqNoSize(); + if (messagesCount == 0) { + return; + } + Y_VERIFY(requestBuilder.GetSerializedRequestSize() == target.ByteSizeLong(), + "failed to calculate size for message [%s]", target.ShortDebugString().c_str()); + GrpcInflightMessages += messagesCount; + GrpcInflightBytes += requestBuilder.GetRequestPayloadSize(); + YLOG_DEBUG(Sprintf("new write batch, [%lu] messages, [%lu] bytes, first seq_no [%lu], serialized size [%lu]", + messagesCount, requestBuilder.GetRequestPayloadSize(), + *target.GetDataBatch().GetSeqNo().begin(), requestBuilder.GetSerializedRequestSize())); + ++Counters->GrpcWriteBatchRequests; + Counters->GrpcInflightMessages += messagesCount; + Counters->GrpcInflightBytes += requestBuilder.GetRequestPayloadSize(); + } + + void TClientSession::Acknowledge(ui64 seqNo) { + size_t messagesCount = 0; + size_t bytesCount = 0; + size_t skippedMessagesCount = 0; + size_t skippedBytesCount = 0; + + if (AckSeqNo.Defined()) { + while (!WriteQueue.empty() && ((*AckSeqNo < seqNo) || WriteQueue.front().Skipped)) { + if (WriteQueue.front().Skipped) { + skippedMessagesCount++; + skippedBytesCount += WriteQueue.front().Size; + } else { + ++messagesCount; + bytesCount += WriteQueue.front().Size; + } + ++(*AckSeqNo); + WriteQueue.pop_front(); + ++TrimmedCount; + } + } + if (!AckSeqNo.Defined() || seqNo > *AckSeqNo) { + AckSeqNo = seqNo; + } + + Counters->AcknowledgedMessages += messagesCount; + Counters->AcknowledgedBytes += bytesCount; + Counters->InflightMessages -= (messagesCount + skippedMessagesCount); + Counters->InflightBytes -= (bytesCount + skippedBytesCount); + InflightBytes.fetch_sub(bytesCount); + Counters->GrpcInflightMessages -= messagesCount; + Counters->GrpcInflightBytes -= bytesCount; + GrpcInflightMessages -= messagesCount; + GrpcInflightBytes -= bytesCount; + + YLOG_DEBUG(Sprintf("ack [%lu], [%lu] messages, [%lu] bytes", seqNo, messagesCount, bytesCount)); + } + + void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) { + SessionId = sessionId; + Acknowledge(lastSeqNo); + NextIndex = TrimmedCount; + ++Counters->GrpcCallsInitialized; + Counters->GrpcInflightMessages -= GrpcInflightMessages; + Counters->GrpcInflightBytes -= GrpcInflightBytes; + GrpcInflightMessages = 0; + GrpcInflightBytes = 0; + YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%lu]", + sessionId.c_str(), lastSeqNo)); + } + + void TClientSession::OnGrpcCallFinished() { + Y_VERIFY(!Closed); + Y_VERIFY(ActiveGrpcCall); + ActiveGrpcCall = nullptr; + if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) { + DoClose(); + } else { + const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay; + MakeGrpcCallTimer->Set(reconnectTime); + YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str())); + } + } + + auto TClientSession::PurgeWriteQueue() -> TPurgeWriteQueueStats { + size_t bytesCount = 0; + for (const auto& m: WriteQueue) { + bytesCount += m.Size; + } + auto result = TPurgeWriteQueueStats{WriteQueue.size(), bytesCount}; + + Counters->DroppedMessages += WriteQueue.size(); + Counters->DroppedBytes += bytesCount; + Counters->InflightMessages -= WriteQueue.size(); + Counters->InflightBytes -= bytesCount; + Counters->GrpcInflightMessages -= GrpcInflightMessages; + Counters->GrpcInflightBytes -= GrpcInflightBytes; + + InflightBytes.fetch_sub(bytesCount); + GrpcInflightMessages = 0; + GrpcInflightBytes = 0; + WriteQueue.clear(); + + return result; + } + + void TClientSession::DoClose() { + Y_VERIFY(CloseStarted); + Y_VERIFY(!Closed); + Y_VERIFY(!ClosePromise.HasValue()); + MakeGrpcCallTimer->Cancel(); + ForceCloseTimer->Cancel(); + PollTimer->Cancel(); + if (!ForkInProgressLocal && WriteQueue.size() > 0) { + const auto stats = PurgeWriteQueue(); + ++Counters->ErrorsCount; + YLOG_ERR(Sprintf("DoClose, dropped [%lu] messages, [%lu] bytes", + stats.PurgedMessages, stats.PurgedBytes)); + } + --Client->GetCounters()->ActiveSessionsCount; + Closed = true; + ClosePromise.SetValue(); + YLOG_INFO("session closed"); + } + + TGrpcCall::TGrpcCall(TClientSession& session) + : Session(session) + , AsyncJoinerToken(&Session.GetAsyncJoiner()) + , AcceptTag(MakeIOCallback(this, &TGrpcCall::EndAccept)) + , ReadTag(MakeIOCallback(this, &TGrpcCall::EndRead)) + , WriteTag(MakeIOCallback(this, &TGrpcCall::EndWrite)) + , WritesDoneTag(MakeIOCallback(this, &TGrpcCall::EndWritesDone)) + , FinishTag(MakeIOCallback(this, &TGrpcCall::EndFinish)) + , Logger(session.GetLogger().Child("grpc")) + , AcceptPending(false) + , Initialized_(false) + , ReadPending(false) + , ReadsDone(false) + , WritePending(false) + , WritesBlocked(false) + , WritesDonePending(false) + , WritesDone(false) + , ErrorOccured(false) + , FinishRequested(false) + , FinishStarted(false) + , FinishDone(false) + , Cancelled(false) + , Poisoned(false) + , PoisonPillSent(false) + , ReuseSessions_(false) + , FinishStatus() + , ClientContext() + , ReaderWriter(nullptr) + , Request() + , Response() + { + } + + void TGrpcCall::Start() { + AcceptPending = true; + auto& client = Session.GetClient(); + ReaderWriter = client.GetStub().AsyncSession(&ClientContext, + &client.GetCompletionQueue(), + AcceptTag->Ref()); + YLOG_INFO("AsyncSession started"); + } + + TGrpcCall::~TGrpcCall() { + YLOG_INFO("destroyed"); + } + + void TGrpcCall::EnsureFinishStarted() { + if (!FinishStarted) { + FinishStarted = true; + ReaderWriter->Finish(&FinishStatus, FinishTag->Ref()); + YLOG_INFO("Finish started"); + } + } + + bool TGrpcCall::CheckHasError(EIOStatus status, const char* method) { + if (status == EIOStatus::Error) { + SetError(Sprintf("%s %s", method, ToString(status).c_str())); + return true; + } + if (ErrorOccured) { + ScheduleFinishOnError(); + return true; + } + return false; + } + + void TGrpcCall::SetError(const TString& error) { + if (!Cancelled) { + YLOG_ERR(error); + ++Session.GetCounters().ErrorsCount; + } + ErrorOccured = true; + ScheduleFinishOnError(); + } + + void TGrpcCall::ScheduleFinishOnError() { + if (!AcceptPending && !WritePending && !WritesDonePending) { + EnsureFinishStarted(); + } + } + + void TGrpcCall::BeginClose(bool force) { + if (force) { + if (!Cancelled) { + Cancelled = true; + ClientContext.TryCancel(); + SetError("forced close"); + } + return; + } + YLOG_INFO(Sprintf("Close Initialized [%d], AcceptPending [%d], " + "WritePending [%d], FinishRequested [%d], " + "ErrorOccured [%d]", + static_cast<int>(Initialized_), + static_cast<int>(AcceptPending), + static_cast<int>(WritePending), + static_cast<int>(FinishRequested), + static_cast<int>(ErrorOccured))); + if (ErrorOccured || FinishRequested) { + return; + } + FinishRequested = true; + if (!Initialized_ || WritePending) { + return; + } + WritesBlocked = true; + BeginWritesDone(); + } + + void TGrpcCall::Poison() { + Poisoned = true; + NotifyMessageAdded(); + } + + void TGrpcCall::NotifyMessageAdded() { + if (WritePending || !Initialized_ || ErrorOccured || FinishRequested) { + return; + } + ScheduleWrite(); + } + + void TGrpcCall::ScheduleWrite() { + Request.Clear(); + if (!Poisoned) { + Session.PrepareWriteBatchRequest(Request); + } else if (!PoisonPillSent) { + PoisonPillSent = true; + auto& batch = *Request.mutable_data_batch(); + batch.AddSeqNo(std::numeric_limits<::google::protobuf::uint64>::max()); + batch.AddTimestamp(Now().MicroSeconds()); + batch.AddPayload(""); + YLOG_INFO("poison pill sent"); + } + if (Request.GetDataBatch().GetSeqNo().empty()) { + if (FinishRequested) { + WritesBlocked = true; + BeginWritesDone(); + } + return; + } + + BeginWrite(); + } + + void TGrpcCall::EndAccept(EIOStatus status) { + Y_VERIFY(AcceptPending); + AcceptPending = false; + if (CheckHasError(status, "EndAccept")) { + return; + } + BeginRead(); + Request.Clear(); + Session.PrepareInitializeRequest(Request); + BeginWrite(); + } + + void TGrpcCall::EndRead(EIOStatus status) { + ReadPending = false; + if (FinishDone) { + Session.OnGrpcCallFinished(); + return; + } + if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) { + Y_VERIFY(!WritePending); + YLOG_INFO("EndRead ReadsDone"); + ReadsDone = true; + if (WritesDone) { + EnsureFinishStarted(); + return; + } + return; + } + if (CheckHasError(status, "EndRead")) { + return; + } + if (!Initialized_) { + const auto metadata = ClientContext.GetServerInitialMetadata(); + { + const auto it = metadata.find("ua-reuse-sessions"); + if (it != metadata.end() && it->second == "true") { + ReuseSessions_ = true; + } + } + { + const auto it = metadata.find("ua-max-receive-message-size"); + if (it != metadata.end()) { + Session.SetAgentMaxReceiveMessage(FromString<size_t>(TString{it->second.begin(), it->second.end()})); + } + } + + if (Response.response_case() != NUnifiedAgentProto::Response::kInitialized) { + SetError(Sprintf("EndRead while initializing, unexpected response_case [%d]", + static_cast<int>(Response.response_case()))); + return; + } + Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(), + Response.GetInitialized().GetLastSeqNo()); + Initialized_ = true; + if (!WritePending) { + ScheduleWrite(); + } + } else { + if (Response.response_case() != NUnifiedAgentProto::Response::kAck) { + SetError(Sprintf("EndRead unexpected response_case [%d]", + static_cast<int>(Response.response_case()))); + return; + } + Session.Acknowledge(Response.GetAck().GetSeqNo()); + } + BeginRead(); + } + + void TGrpcCall::EndWrite(EIOStatus status) { + WritePending = false; + if (CheckHasError(status, "EndWrite")) { + return; + } + if (!Initialized_) { + return; + } + ScheduleWrite(); + } + + void TGrpcCall::EndFinish(EIOStatus status) { + FinishDone = true; + const auto finishStatus = status == EIOStatus::Error + ? grpc::Status(grpc::UNKNOWN, "finish error") + : FinishStatus; + YLOG(finishStatus.ok() || Cancelled || Poisoned ? TLOG_INFO : TLOG_ERR, + Sprintf("EndFinish, code [%s], message [%s]", + ToString(finishStatus.error_code()).c_str(), + finishStatus.error_message().c_str()), + Logger); + if (!finishStatus.ok() && !Cancelled) { + ++Session.GetCounters().ErrorsCount; + } + if (!ReadPending) { + Session.OnGrpcCallFinished(); + } + } + + void TGrpcCall::EndWritesDone(EIOStatus status) { + YLOG_INFO(Sprintf("EndWritesDone [%s]", ToString(status).c_str())); + Y_VERIFY(!WritePending && !WritesDone && WritesDonePending); + WritesDonePending = false; + WritesDone = true; + if (CheckHasError(status, "EndWriteDone")) { + return; + } + if (ReadsDone) { + EnsureFinishStarted(); + } + } + + void TGrpcCall::BeginWritesDone() { + WritesDonePending = true; + ReaderWriter->WritesDone(WritesDoneTag->Ref()); + YLOG_INFO("WritesDone started"); + } + + void TGrpcCall::BeginRead() { + ReadPending = true; + Response.Clear(); + ReaderWriter->Read(&Response, ReadTag->Ref()); + YLOG_DEBUG("Read started"); + } + + void TGrpcCall::BeginWrite() { + WritePending = true; + ReaderWriter->Write(Request, WriteTag->Ref()); + YLOG_DEBUG("Write started"); + } +} + +namespace NUnifiedAgent { + size_t SizeOf(const TClientMessage& message) { + auto result = message.Payload.Size() + sizeof(TInstant); + if (message.Meta.Defined()) { + for (const auto& m: *message.Meta) { + result += m.first.Size() + m.second.Size(); + } + } + return result; + } + + TClientParameters::TClientParameters(const TString& uri) + : Uri(uri) + , SharedSecretKey(Nothing()) + , MaxInflightBytes(DefaultMaxInflightBytes) + , Log(TLoggerOperator<TGlobalLog>::Log()) + , LogRateLimitBytes(Nothing()) + , GrpcReconnectDelay(TDuration::MilliSeconds(50)) + , GrpcSendDelay(DefaultGrpcSendDelay) + , EnableForkSupport(false) + , GrpcMaxMessageSize(DefaultGrpcMaxMessageSize) + , Counters(nullptr) + { + } + + TSessionParameters::TSessionParameters() + : SessionId(Nothing()) + , Meta(Nothing()) + , Counters(nullptr) + , MaxInflightBytes() + { + } + + const size_t TClientParameters::DefaultMaxInflightBytes = 10_MB; + const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB; + const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10); + + TClientPtr MakeClient(const TClientParameters& parameters) { + if (!grpc_is_initialized()) { + EnsureGrpcConfigured(); + } + + std::shared_ptr<NPrivate::TForkProtector> forkProtector{}; +#ifdef _unix_ + if (parameters.EnableForkSupport) { + forkProtector = NPrivate::TForkProtector::Get(true); + } +#endif + return MakeIntrusive<NPrivate::TClient>(parameters, forkProtector); + } +} diff --git a/library/cpp/unified_agent_client/client_impl.h b/library/cpp/unified_agent_client/client_impl.h new file mode 100644 index 0000000000..6adadf92e3 --- /dev/null +++ b/library/cpp/unified_agent_client/client_impl.h @@ -0,0 +1,364 @@ +#pragma once + +#include <library/cpp/unified_agent_client/client.h> +#include <library/cpp/unified_agent_client/client_proto_weighing.h> +#include <library/cpp/unified_agent_client/counters.h> +#include <library/cpp/unified_agent_client/logger.h> +#include <library/cpp/unified_agent_client/variant.h> +#include <library/cpp/unified_agent_client/proto/unified_agent.grpc.pb.h> +#include <library/cpp/unified_agent_client/grpc_io.h> + +#include <library/cpp/logger/global/global.h> + +#include <util/generic/deque.h> +#include <util/system/mutex.h> + +namespace NUnifiedAgent::NPrivate { + class TClientSession; + class TGrpcCall; + class TForkProtector; + + class TClient: public IClient { + public: + explicit TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector); + + ~TClient() override; + + TClientSessionPtr CreateSession(const TSessionParameters& parameters) override; + + void StartTracing(ELogPriority logPriority) override; + + void FinishTracing() override; + + inline const TIntrusivePtr<TClientCounters>& GetCounters() const noexcept { + return Counters; + } + + inline NUnifiedAgentProto::UnifiedAgentService::Stub& GetStub() noexcept { + return *Stub; + } + + TScopeLogger CreateSessionLogger(); + + inline const TClientParameters& GetParameters() const noexcept { + return Parameters; + } + + inline grpc::CompletionQueue& GetCompletionQueue() noexcept { + return ActiveCompletionQueue->GetCompletionQueue(); + } + + void RegisterSession(TClientSession* session); + + void UnregisterSession(TClientSession* session); + + void PreFork(); + + void PostForkParent(); + + void PostForkChild(); + + void EnsureStarted(); + + private: + void EnsureStartedNoLock(); + + void EnsureStoppedNoLock(); + + private: + const TClientParameters Parameters; + std::shared_ptr<TForkProtector> ForkProtector; + TIntrusivePtr<TClientCounters> Counters; + TLog Log; + TLogger MainLogger; + TScopeLogger Logger; + std::shared_ptr<grpc::Channel> Channel; + std::unique_ptr<NUnifiedAgentProto::UnifiedAgentService::Stub> Stub; + THolder<TGrpcCompletionQueueHost> ActiveCompletionQueue; + std::atomic<size_t> SessionLogLabel; + TVector<TClientSession*> ActiveSessions; + bool Started; + bool Destroyed; + TAdaptiveLock Lock; + static std::atomic<ui64> Id; + }; + + class TForkProtector { + public: + TForkProtector(); + + void Register(TClient& client); + + void Unregister(TClient& client); + + static std::shared_ptr<TForkProtector> Get(bool createIfNotExists); + + private: + static void PreFork(); + + static void PostForkParent(); + + static void PostForkChild(); + + private: + TVector<TClient*> Clients; + grpc::GrpcLibraryCodegen GrpcInitializer; + bool Enabled; + TAdaptiveLock Lock; + + static std::weak_ptr<TForkProtector> Instance; + static TMutex InstanceLock; + static bool SubscribedToForks; + }; + + class TClientSession: public IClientSession { + public: + TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters); + + ~TClientSession(); + + void Send(TClientMessage&& message) override; + + NThreading::TFuture<void> CloseAsync(TInstant deadline) override; + + inline TClient& GetClient() noexcept { + return *Client; + } + + inline TScopeLogger& GetLogger() noexcept { + return Logger; + } + + inline TClientSessionCounters& GetCounters() noexcept { + return *Counters; + } + + inline TAsyncJoiner& GetAsyncJoiner() noexcept { + return AsyncJoiner; + } + + void PrepareInitializeRequest(NUnifiedAgentProto::Request& target); + + void PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target); + + void Acknowledge(ui64 seqNo); + + void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo); + + void OnGrpcCallFinished(); + + NThreading::TFuture<void> PreFork(); + + void PostForkParent(); + + void PostForkChild(); + + void SetAgentMaxReceiveMessage(size_t); + + private: + enum class EPollingStatus { + Active, + Inactive + }; + + struct TCloseRequestedEvent { + TInstant Deadline; + }; + + struct TMessageReceivedEvent { + TClientMessage Message; + size_t Size; + }; + + struct TPurgeWriteQueueStats { + size_t PurgedMessages{}; + size_t PurgedBytes{}; + }; + + using TEvent = std::variant<TCloseRequestedEvent, TMessageReceivedEvent>; + + public: + struct TPendingMessage { + TClientMessage Message; + size_t Size; + bool Skipped; + }; + + class TRequestBuilder { + public: + struct TAddResult; + + public: + TRequestBuilder(NUnifiedAgentProto::Request &target, size_t RequestPayloadLimitBytes, + TFMaybe<size_t> serializedRequestLimitBytes); + + TAddResult TryAddMessage(const TPendingMessage& message, size_t seqNo); + + void ResetCounters(); + + inline size_t GetSerializedRequestSize() const { + return SerializedRequestSize; + } + + inline size_t GetRequestPayloadSize() const { + return RequestPayloadSize; + } + + public: + struct TAddResult { + bool LimitExceeded; + size_t NewRequestPayloadSize; // == actual value, if !LimitExceeded + size_t NewSerializedRequestSize; // == actual value, if !LimitExceeded + }; + + private: + struct TMetaItemBuilder { + size_t ItemIndex; + size_t ValueIndex{0}; + }; + + private: + NUnifiedAgentProto::Request& Target; + TFMaybe<NPW::TRequest> PwTarget; + THashMap<TString, TMetaItemBuilder> MetaItems; + size_t RequestPayloadSize; + size_t RequestPayloadLimitBytes; + size_t SerializedRequestSize; + TFMaybe<size_t> SerializedRequestLimitBytes; + bool CountersInvalid; + }; + + private: + void MakeGrpcCall(); + + void DoClose(); + + void BeginClose(TInstant deadline); + + void Poll(); + + TPurgeWriteQueueStats PurgeWriteQueue(); + + void DoStart(); + + private: + TAsyncJoiner AsyncJoiner; + TIntrusivePtr<TClient> Client; + TFMaybe<TString> OriginalSessionId; + TFMaybe<TString> SessionId; + TFMaybe<THashMap<TString, TString>> Meta; + TScopeLogger Logger; + bool CloseStarted; + bool ForcedCloseStarted; + bool Closed; + bool ForkInProgressLocal; + bool Started; + NThreading::TPromise<void> ClosePromise; + TIntrusivePtr<TGrpcCall> ActiveGrpcCall; + TDeque<TPendingMessage> WriteQueue; + size_t TrimmedCount; + size_t NextIndex; + TFMaybe<ui64> AckSeqNo; + TInstant PollerLastEventTimestamp; + TIntrusivePtr<TClientSessionCounters> Counters; + THolder<TGrpcTimer> MakeGrpcCallTimer; + THolder<TGrpcTimer> ForceCloseTimer; + THolder<TGrpcTimer> PollTimer; + ui64 GrpcInflightMessages; + ui64 GrpcInflightBytes; + + std::atomic<size_t> InflightBytes; + bool CloseRequested; + size_t EventsBatchSize; + EPollingStatus PollingStatus; + THolder<TGrpcNotification> EventNotification; + bool EventNotificationTriggered; + TVector<TEvent> EventsBatch; + TVector<TEvent> SecondaryEventsBatch; + std::atomic<bool> ForkInProgress; + TAdaptiveLock Lock; + size_t MaxInflightBytes; + TFMaybe<size_t> AgentMaxReceiveMessage; + }; + + class TGrpcCall final: public TAtomicRefCount<TGrpcCall> { + public: + explicit TGrpcCall(TClientSession& session); + + void Start(); + + ~TGrpcCall(); + + void BeginClose(bool force); + + void Poison(); + + void NotifyMessageAdded(); + + inline bool Initialized() const { + return Initialized_; + } + + inline bool ReuseSessions() const { + return ReuseSessions_; + } + + private: + void EndAccept(EIOStatus status); + + void EndRead(EIOStatus status); + + void EndWrite(EIOStatus status); + + void EndFinish(EIOStatus status); + + void EndWritesDone(EIOStatus); + + void ScheduleWrite(); + + void BeginWritesDone(); + + bool CheckHasError(EIOStatus status, const char* method); + + void SetError(const TString& error); + + void EnsureFinishStarted(); + + void BeginRead(); + + void BeginWrite(); + + void ScheduleFinishOnError(); + + private: + TClientSession& Session; + TAsyncJoinerToken AsyncJoinerToken; + THolder<IIOCallback> AcceptTag; + THolder<IIOCallback> ReadTag; + THolder<IIOCallback> WriteTag; + THolder<IIOCallback> WritesDoneTag; + THolder<IIOCallback> FinishTag; + TScopeLogger Logger; + bool AcceptPending; + bool Initialized_; + bool ReadPending; + bool ReadsDone; + bool WritePending; + bool WritesBlocked; + bool WritesDonePending; + bool WritesDone; + bool ErrorOccured; + bool FinishRequested; + bool FinishStarted; + bool FinishDone; + bool Cancelled; + bool Poisoned; + bool PoisonPillSent; + bool ReuseSessions_; + grpc::Status FinishStatus; + grpc::ClientContext ClientContext; + std::unique_ptr<grpc::ClientAsyncReaderWriter<NUnifiedAgentProto::Request, NUnifiedAgentProto::Response>> ReaderWriter; + NUnifiedAgentProto::Request Request; + NUnifiedAgentProto::Response Response; + }; +} diff --git a/library/cpp/unified_agent_client/client_proto_weighing.h b/library/cpp/unified_agent_client/client_proto_weighing.h new file mode 100644 index 0000000000..728792e49f --- /dev/null +++ b/library/cpp/unified_agent_client/client_proto_weighing.h @@ -0,0 +1,75 @@ +#pragma once + +#include <library/cpp/unified_agent_client/f_maybe.h> +#include <library/cpp/unified_agent_client/proto_weighing.h> + +namespace NUnifiedAgent::NPW { + struct TMessageMetaItem: public TMessage { + TMessageMetaItem() + : TMessage() + , Key(this) + , Value(this) + , SkipStart(this) + , SkipLength(this) + { + } + + explicit TMessageMetaItem(TMessage* parent) + : TMessage(parent) + , Key(this) + , Value(this) + , SkipStart(this) + , SkipLength(this) + { + } + + explicit TMessageMetaItem(const NUnifiedAgent::TFMaybe<TFieldLink>& link) + : TMessage(link) + , Key(this) + , Value(this) + , SkipStart(this) + , SkipLength(this) + { + } + + TStringField Key; + TRepeatedPtrField<TStringField> Value; + TRepeatedField<ui32> SkipStart; + TRepeatedField<ui32> SkipLength; + }; + + struct TDataBatch: public TMessage { + TDataBatch() + : TMessage() + , SeqNo(this) + , Timestamp(this) + , Payload(this, 2) + , Meta(this, 2) + { + } + + TDataBatch(TMessage* parent) + : TMessage(parent) + , SeqNo(this) + , Timestamp(this) + , Payload(this, 2) + , Meta(this, 2) + { + } + + TRepeatedField<ui64> SeqNo; // 1 + TRepeatedField<ui64> Timestamp; // 2 + TRepeatedPtrField<TStringField> Payload; // 100 + TRepeatedPtrField<TMessageMetaItem> Meta; // 101 + }; + + struct TRequest: public TMessage { + TRequest() + : TMessage() + , DataBatch(this) + { + } + + TDataBatch DataBatch; + }; +} diff --git a/library/cpp/unified_agent_client/clock.cpp b/library/cpp/unified_agent_client/clock.cpp new file mode 100644 index 0000000000..192c998a02 --- /dev/null +++ b/library/cpp/unified_agent_client/clock.cpp @@ -0,0 +1,48 @@ +#include "clock.h" + +namespace NUnifiedAgent { + void TClock::Configure() { + Y_VERIFY(!Configured_); + + Configured_ = true; + } + + void TClock::SetBase(TInstant value) { + Y_VERIFY(Configured_); + + Base_.store(value.GetValue()); + } + + void TClock::ResetBase() { + Base_.store(0); + } + + void TClock::ResetBaseWithShift() { + Y_VERIFY(Configured_); + + Shift_.store(static_cast<i64>(Base_.exchange(0)) - static_cast<i64>(::Now().GetValue())); + } + + void TClock::SetShift(TDuration value) { + Y_VERIFY(Configured_); + + Shift_.fetch_add(value.GetValue()); + } + + void TClock::ResetShift() { + Shift_.store(0); + } + + TInstant TClock::Get() { + auto base = Base_.load(); + if (base == 0) { + base = ::Now().GetValue(); + } + base += Shift_.load(); + return TInstant::FromValue(base); + } + + bool TClock::Configured_{false}; + std::atomic<ui64> TClock::Base_{0}; + std::atomic<i64> TClock::Shift_{0}; +} diff --git a/library/cpp/unified_agent_client/clock.h b/library/cpp/unified_agent_client/clock.h new file mode 100644 index 0000000000..77ff44583e --- /dev/null +++ b/library/cpp/unified_agent_client/clock.h @@ -0,0 +1,37 @@ +#pragma once + +#include <util/datetime/base.h> + +#include <atomic> + +namespace NUnifiedAgent { + class TClock { + public: + static void Configure(); + + static inline bool Configured() { + return Configured_; + } + + static inline TInstant Now() { + return Configured_ ? Get() : TInstant::Now(); + } + + static void SetBase(TInstant value); + + static void ResetBase(); + + static void ResetBaseWithShift(); + + static void SetShift(TDuration value); + + static void ResetShift(); + + static TInstant Get(); + + private: + static bool Configured_; + static std::atomic<ui64> Base_; + static std::atomic<i64> Shift_; + }; +} diff --git a/library/cpp/unified_agent_client/counters.cpp b/library/cpp/unified_agent_client/counters.cpp new file mode 100644 index 0000000000..776a86ec4e --- /dev/null +++ b/library/cpp/unified_agent_client/counters.cpp @@ -0,0 +1,36 @@ +#include "counters.h" + +using namespace NMonitoring; + +namespace NUnifiedAgent { + TClientCounters::TClientCounters(const NMonitoring::TDynamicCounterPtr& counters) + : TDynamicCountersWrapper(counters) + , ActiveSessionsCount(GetCounter("ActiveSessionsCount", false)) + , ClientLogDroppedBytes(GetCounter("ClientLogDroppedBytes", true)) + { + } + + TIntrusivePtr<TClientSessionCounters> TClientCounters::GetDefaultSessionCounters() { + auto group = Unwrap()->GetSubgroup("session", "default"); + return MakeIntrusive<TClientSessionCounters>(group); + } + + TClientSessionCounters::TClientSessionCounters(const NMonitoring::TDynamicCounterPtr& counters) + : TDynamicCountersWrapper(counters) + , ReceivedMessages(GetCounter("ReceivedMessages", true)) + , ReceivedBytes(GetCounter("ReceivedBytes", true)) + , AcknowledgedMessages(GetCounter("AcknowledgedMessages", true)) + , AcknowledgedBytes(GetCounter("AcknowledgedBytes", true)) + , InflightMessages(GetCounter("InflightMessages", false)) + , InflightBytes(GetCounter("InflightBytes", false)) + , GrpcWriteBatchRequests(GetCounter("GrpcWriteBatchRequests", true)) + , GrpcInflightMessages(GetCounter("GrpcInflightMessages", false)) + , GrpcInflightBytes(GetCounter("GrpcInflightBytes", false)) + , GrpcCalls(GetCounter("GrpcCalls", true)) + , GrpcCallsInitialized(GetCounter("GrpcCallsInitialized", true)) + , DroppedMessages(GetCounter("DroppedMessages", true)) + , DroppedBytes(GetCounter("DroppedBytes", true)) + , ErrorsCount(GetCounter("ErrorsCount", true)) + { + } +} diff --git a/library/cpp/unified_agent_client/counters.h b/library/cpp/unified_agent_client/counters.h new file mode 100644 index 0000000000..3c2192c3c5 --- /dev/null +++ b/library/cpp/unified_agent_client/counters.h @@ -0,0 +1,38 @@ +#pragma once + +#include <library/cpp/unified_agent_client/dynamic_counters_wrapper.h> + +namespace NUnifiedAgent { + struct TClientSessionCounters; + + struct TClientCounters: public TDynamicCountersWrapper { + explicit TClientCounters(const NMonitoring::TDynamicCounterPtr& counters = + MakeIntrusive<NMonitoring::TDynamicCounters>()); + + NMonitoring::TDeprecatedCounter& ActiveSessionsCount; + NMonitoring::TDeprecatedCounter& ClientLogDroppedBytes; + + public: + TIntrusivePtr<TClientSessionCounters> GetDefaultSessionCounters(); + }; + + struct TClientSessionCounters: public TDynamicCountersWrapper { + explicit TClientSessionCounters(const NMonitoring::TDynamicCounterPtr& counters = + MakeIntrusive<NMonitoring::TDynamicCounters>()); + + NMonitoring::TDeprecatedCounter& ReceivedMessages; + NMonitoring::TDeprecatedCounter& ReceivedBytes; + NMonitoring::TDeprecatedCounter& AcknowledgedMessages; + NMonitoring::TDeprecatedCounter& AcknowledgedBytes; + NMonitoring::TDeprecatedCounter& InflightMessages; + NMonitoring::TDeprecatedCounter& InflightBytes; + NMonitoring::TDeprecatedCounter& GrpcWriteBatchRequests; + NMonitoring::TDeprecatedCounter& GrpcInflightMessages; + NMonitoring::TDeprecatedCounter& GrpcInflightBytes; + NMonitoring::TDeprecatedCounter& GrpcCalls; + NMonitoring::TDeprecatedCounter& GrpcCallsInitialized; + NMonitoring::TDeprecatedCounter& DroppedMessages; + NMonitoring::TDeprecatedCounter& DroppedBytes; + NMonitoring::TDeprecatedCounter& ErrorsCount; + }; +} diff --git a/library/cpp/unified_agent_client/duration_counter.cpp b/library/cpp/unified_agent_client/duration_counter.cpp new file mode 100644 index 0000000000..118778a226 --- /dev/null +++ b/library/cpp/unified_agent_client/duration_counter.cpp @@ -0,0 +1,41 @@ +#include "duration_counter.h" + +namespace NUnifiedAgent { + using namespace NMonitoring; + + TDurationUsCounter::TDurationUsCounter(const TString& name, TDynamicCounters& owner) + : Counter(*owner.GetCounter(name, true)) + , ActiveTimers() + , Lock() + { + } + + NHPTimer::STime* TDurationUsCounter::Begin() { + with_lock (Lock) { + ActiveTimers.push_back(0); + auto& result = ActiveTimers.back(); + NHPTimer::GetTime(&result); + return &result; + } + } + + void TDurationUsCounter::End(NHPTimer::STime* startTime) { + with_lock (Lock) { + Counter += static_cast<ui64>(NHPTimer::GetTimePassed(startTime) * 1000000); + *startTime = 0; + while (!ActiveTimers.empty() && ActiveTimers.front() == 0) { + ActiveTimers.pop_front(); + } + } + } + + void TDurationUsCounter::Update() { + with_lock (Lock) { + for (auto& startTime : ActiveTimers) { + if (startTime != 0) { + Counter += static_cast<ui64>(NHPTimer::GetTimePassed(&startTime) * 1000000); + } + } + } + } +} diff --git a/library/cpp/unified_agent_client/duration_counter.h b/library/cpp/unified_agent_client/duration_counter.h new file mode 100644 index 0000000000..dbdfc22ed4 --- /dev/null +++ b/library/cpp/unified_agent_client/duration_counter.h @@ -0,0 +1,43 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/generic/deque.h> +#include <util/system/hp_timer.h> +#include <util/system/mutex.h> + +namespace NUnifiedAgent { + class TDurationUsCounter { + public: + class TScope { + public: + TScope(TDurationUsCounter& counter) + : Counter(counter) + , StartTime(Counter.Begin()) + { + } + + ~TScope() { + Counter.End(StartTime); + } + + private: + TDurationUsCounter& Counter; + NHPTimer::STime* StartTime; + }; + + public: + TDurationUsCounter(const TString& name, NMonitoring::TDynamicCounters& owner); + + NHPTimer::STime* Begin(); + + void End(NHPTimer::STime* startTime); + + void Update(); + + private: + NMonitoring::TDeprecatedCounter& Counter; + TDeque<NHPTimer::STime> ActiveTimers; + TAdaptiveLock Lock; + }; +} diff --git a/library/cpp/unified_agent_client/dynamic_counters_wrapper.h b/library/cpp/unified_agent_client/dynamic_counters_wrapper.h new file mode 100644 index 0000000000..cac4c6813d --- /dev/null +++ b/library/cpp/unified_agent_client/dynamic_counters_wrapper.h @@ -0,0 +1,34 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NUnifiedAgent { + class TDynamicCountersWrapper: public TAtomicRefCount<TDynamicCountersWrapper> { + public: + explicit TDynamicCountersWrapper(const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters) + : Counters(counters) + { + } + + virtual ~TDynamicCountersWrapper() = default; + + const TIntrusivePtr<NMonitoring::TDynamicCounters>& Unwrap() const { + return Counters; + } + + protected: + NMonitoring::TDeprecatedCounter& GetCounter(const TString& value, bool derivative) { + return *Counters->GetCounter(value, derivative); + } + + private: + TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; + }; + + class TUpdatableCounters: public TDynamicCountersWrapper { + public: + using TDynamicCountersWrapper::TDynamicCountersWrapper; + + virtual void Update() = 0; + }; +} diff --git a/library/cpp/unified_agent_client/enum.h b/library/cpp/unified_agent_client/enum.h new file mode 100644 index 0000000000..b21e21acb0 --- /dev/null +++ b/library/cpp/unified_agent_client/enum.h @@ -0,0 +1,30 @@ +#pragma once + +#include <util/generic/serialized_enum.h> +#include <util/generic/vector.h> + +namespace NUnifiedAgent { + namespace NPrivate { + using TEnumNames = TVector<const TString*>; + + template <typename TEnum> + TEnumNames BuildEnumNames() { + const auto names = GetEnumNames<TEnum>(); + auto result = TEnumNames(names.size()); + size_t index = 0; + for (const auto& p: names) { + Y_VERIFY(static_cast<size_t>(p.first) == index); + result[index++] = &p.second; + } + return result; + } + + template <typename TEnum> + inline const auto EnumNames = BuildEnumNames<TEnum>(); + } + + template <typename TEnum, typename = std::enable_if_t<std::is_enum_v<TEnum>>> + inline const TString& NameOf(TEnum val) noexcept { + return *NPrivate::EnumNames<TEnum>[static_cast<size_t>(val)]; + } +} diff --git a/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp b/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp new file mode 100644 index 0000000000..a9eb423d13 --- /dev/null +++ b/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp @@ -0,0 +1,122 @@ +#include <library/cpp/unified_agent_client/client.h> + +#include <library/cpp/getopt/opt.h> + +#include <util/string/split.h> + +using namespace NUnifiedAgent; + +class TOptions { +public: + TString Uri; + TString SharedSecretKey; + TString SessionId; + TString SessionMeta; + + TOptions(int argc, const char* argv[]) { + NLastGetopt::TOpts opts; + TString logPriorityStr; + + opts + .AddLongOption("uri") + .RequiredArgument() + .Required() + .StoreResult(&Uri); + opts + .AddLongOption("shared-secret-key") + .RequiredArgument() + .Optional() + .StoreResult(&SharedSecretKey); + opts + .AddLongOption("session-id") + .RequiredArgument() + .Optional() + .StoreResult(&SessionId); + opts + .AddLongOption("session-meta", "key-value pairs separated by comma, e.g. 'k1=v1,k2=v2'") + .RequiredArgument() + .Optional() + .StoreResult(&SessionMeta); + + opts.AddHelpOption(); + opts.AddVersionOption(); + NLastGetopt::TOptsParseResult res(&opts, argc, argv); + } +}; + +bool TryParseMeta(const TString& s, THashMap<TString, TString>& meta) { + for (auto& t: StringSplitter(s).Split(',')) { + TString key; + TString value; + if (!StringSplitter(t.Token()).Split('=').TryCollectInto(&key, &value)) { + Cout << "invalid meta, can't extract key-value pair from [" << t.Token() << "]" << Endl; + return false; + } + meta[key] = value; + } + return true; +} + +bool TryParseLine(const TString& line, TVector<TString>& lineItems) { + lineItems = StringSplitter(line).Split('|').ToList<TString>(); + Y_VERIFY(lineItems.size() >= 1); + if (lineItems.size() > 2) { + Cout << "invalid line format, expected 'k1=v1,k2=v2|payload' or just 'payload'" << Endl; + return false; + } + return true; +} + +int main(int argc, const char* argv[]) { + TOptions options(argc, argv); + + TClientSessionPtr sessionPtr; + { + TLog emptyLog; + auto clientParameters = TClientParameters(options.Uri).SetLog(emptyLog); + if (!options.SharedSecretKey.Empty()) { + clientParameters.SetSharedSecretKey(options.SharedSecretKey); + } + auto clientPtr = MakeClient(clientParameters); + auto sessionParameters = TSessionParameters(); + if (!options.SessionId.Empty()) { + sessionParameters.SetSessionId(options.SessionId); + } + if (!options.SessionMeta.empty()) { + THashMap<TString, TString> sessionMeta; + if (!TryParseMeta(options.SessionMeta, sessionMeta)) { + return -1; + } + sessionParameters.SetMeta(sessionMeta); + } + sessionPtr = clientPtr->CreateSession(sessionParameters); + } + + TString line; + while (true) { + Cin.ReadLine(line); + if (line.Empty()) { + break; + } + + TVector<TString> lineItems; + if (!TryParseLine(line, lineItems)) { + continue; + } + + TClientMessage clientMessage; + clientMessage.Payload = lineItems.back(); + if (lineItems.size() == 2) { + THashMap<TString, TString> messageMeta; + if (!TryParseMeta(lineItems[0], messageMeta)) { + continue; + } + clientMessage.Meta = std::move(messageMeta); + } + sessionPtr->Send(std::move(clientMessage)); + } + + sessionPtr->Close(); + + return 0; +} diff --git a/library/cpp/unified_agent_client/f_maybe.h b/library/cpp/unified_agent_client/f_maybe.h new file mode 100644 index 0000000000..7abd4c0aac --- /dev/null +++ b/library/cpp/unified_agent_client/f_maybe.h @@ -0,0 +1,23 @@ +#pragma once + +#include <util/generic/maybe.h> + +namespace NUnifiedAgent { + template <typename T> + using TFMaybe = TMaybe<T, ::NMaybe::TPolicyUndefinedFail>; + + template <class T> + inline constexpr TFMaybe<std::decay_t<T>> MakeFMaybe(T&& value) { + return TMaybe<std::decay_t<T>, ::NMaybe::TPolicyUndefinedFail>(std::forward<T>(value)); + } + + template <class T, class... TArgs> + inline constexpr TFMaybe<T> MakeFMaybe(TArgs&&... args) { + return TFMaybe<T>(typename TFMaybe<T>::TInPlace{}, std::forward<TArgs>(args)...); + } + + template <class T> + inline constexpr TFMaybe<std::decay_t<T>> MakeFMaybe(const TMaybe<T>& source) { + return source.Defined() ? MakeFMaybe(*source) : Nothing(); + } +} diff --git a/library/cpp/unified_agent_client/grpc_io.cpp b/library/cpp/unified_agent_client/grpc_io.cpp new file mode 100644 index 0000000000..6d237d75ec --- /dev/null +++ b/library/cpp/unified_agent_client/grpc_io.cpp @@ -0,0 +1,161 @@ +#include "grpc_io.h" + +#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h> +#include <contrib/libs/grpc/src/core/lib/surface/completion_queue.h> +#include <contrib/libs/grpc/include/grpc/impl/codegen/log.h> + +#include <util/generic/yexception.h> +#include <util/string/cast.h> +#include <util/system/env.h> +#include <util/system/mutex.h> +#include <util/system/thread.h> + +namespace NUnifiedAgent { + namespace { + std::once_flag GrpcConfigured{}; + } + + TGrpcNotification::TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback) + : CompletionQueue(completionQueue) + , IOCallback(std::move(ioCallback)) + , Completion(MakeHolder<grpc_cq_completion>()) + , InQueue(false) + { + } + + TGrpcNotification::~TGrpcNotification() = default; + + void TGrpcNotification::Trigger() { + { + bool inQueue = false; + if (!InQueue.compare_exchange_strong(inQueue, true)) { + return; + } + } + grpc_core::ApplicationCallbackExecCtx callbackExecCtx; + grpc_core::ExecCtx execCtx; + IOCallback->Ref(); + Y_VERIFY(grpc_cq_begin_op(CompletionQueue.cq(), this)); + grpc_cq_end_op(CompletionQueue.cq(), this, nullptr, + [](void* self, grpc_cq_completion*) { + Y_VERIFY(static_cast<TGrpcNotification*>(self)->InQueue.exchange(false)); + }, + this, Completion.Get()); + } + + bool TGrpcNotification::FinalizeResult(void** tag, bool*) { + *tag = IOCallback.Get(); + return true; + } + + TGrpcTimer::TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback) + : CompletionQueue(completionQueue) + , IOCallback(std::move(ioCallback)) + , Alarm() + , AlarmIsSet(false) + , NextTriggerTime(Nothing()) + { + } + + void TGrpcTimer::Set(TInstant triggerTime) { + if (AlarmIsSet) { + NextTriggerTime = triggerTime; + Alarm.Cancel(); + } else { + AlarmIsSet = true; + Alarm.Set(&CompletionQueue, InstantToTimespec(triggerTime), Ref()); + } + } + + void TGrpcTimer::Cancel() { + NextTriggerTime.Clear(); + if (AlarmIsSet) { + Alarm.Cancel(); + } + } + + IIOCallback* TGrpcTimer::Ref() { + IOCallback->Ref(); + return this; + } + + void TGrpcTimer::OnIOCompleted(EIOStatus status) { + Y_VERIFY(AlarmIsSet); + if (NextTriggerTime) { + Alarm.Set(&CompletionQueue, InstantToTimespec(*NextTriggerTime), this); + NextTriggerTime.Clear(); + } else { + AlarmIsSet = false; + IOCallback->OnIOCompleted(status); + } + } + + TGrpcCompletionQueuePoller::TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue) + : Queue(queue) + , Thread() + { + } + + void TGrpcCompletionQueuePoller::Start() { + Thread = std::thread([this]() { + TThread::SetCurrentThreadName("ua_grpc_cq"); + void* tag; + bool ok; + while (Queue.Next(&tag, &ok)) { + try { + static_cast<IIOCallback*>(tag)->OnIOCompleted(ok ? EIOStatus::Ok : EIOStatus::Error); + } catch (...) { + Y_FAIL("unexpected exception [%s]", CurrentExceptionMessage().c_str()); + } + } + }); + } + + void TGrpcCompletionQueuePoller::Join() { + Thread.join(); + } + + TGrpcCompletionQueueHost::TGrpcCompletionQueueHost() + : CompletionQueue() + , Poller(CompletionQueue) + { + } + + void TGrpcCompletionQueueHost::Start() { + Poller.Start(); + } + + void TGrpcCompletionQueueHost::Stop() { + CompletionQueue.Shutdown(); + Poller.Join(); + } + + gpr_timespec InstantToTimespec(TInstant instant) { + gpr_timespec result; + result.clock_type = GPR_CLOCK_REALTIME; + result.tv_sec = static_cast<int64_t>(instant.Seconds()); + result.tv_nsec = instant.NanoSecondsOfSecond(); + return result; + } + + void EnsureGrpcConfigured() { + std::call_once(GrpcConfigured, []() { + const auto limitStr = GetEnv("UA_GRPC_EXECUTOR_THREADS_LIMIT"); + ui64 limit; + if (limitStr.Empty() || !TryFromString(limitStr, limit)) { + limit = 2; + } + grpc_core::Executor::SetThreadsLimit(limit); + }); + } + + void StartGrpcTracing() { + grpc_tracer_set_enabled("all", true); + gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); + } + + void FinishGrpcTracing() { + grpc_tracer_set_enabled("all", false); + gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR); + } +} diff --git a/library/cpp/unified_agent_client/grpc_io.h b/library/cpp/unified_agent_client/grpc_io.h new file mode 100644 index 0000000000..5f368a5943 --- /dev/null +++ b/library/cpp/unified_agent_client/grpc_io.h @@ -0,0 +1,141 @@ +#pragma once + +#include <library/cpp/unified_agent_client/async_joiner.h> +#include <library/cpp/unified_agent_client/f_maybe.h> + +#include <contrib/libs/grpc/include/grpcpp/alarm.h> +#include <contrib/libs/grpc/include/grpc++/grpc++.h> + +#include <thread> + +struct grpc_cq_completion; + +namespace NUnifiedAgent { + enum class EIOStatus { + Ok, + Error + }; + + class IIOCallback { + public: + virtual ~IIOCallback() = default; + + virtual IIOCallback* Ref() = 0; + + virtual void OnIOCompleted(EIOStatus status) = 0; + }; + + template<typename TCallback, typename TCounter> + class TIOCallback: public IIOCallback { + public: + explicit TIOCallback(TCallback&& callback, TCounter* counter) + : Callback(std::move(callback)) + , Counter(counter) + { + } + + IIOCallback* Ref() override { + Counter->Ref(); + return this; + } + + void OnIOCompleted(EIOStatus status) override { + Callback(status); + Counter->UnRef(); + } + + private: + TCallback Callback; + TCounter* Counter; + }; + + template<typename TCallback, typename TCounter> + THolder<IIOCallback> MakeIOCallback(TCallback&& callback, TCounter* counter) { + return MakeHolder<TIOCallback<TCallback, TCounter>>(std::move(callback), counter); + } + + template<typename TTarget, typename TCounter = TTarget> + THolder<IIOCallback> MakeIOCallback(TTarget* target, void (TTarget::*method)(EIOStatus), + TCounter* counter = nullptr) + { + return MakeIOCallback([target, method](EIOStatus status) { ((*target).*method)(status); }, + counter ? counter : target); + } + + class TGrpcNotification: private ::grpc::internal::CompletionQueueTag { + public: + TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback); + + ~TGrpcNotification(); + + void Trigger(); + + private: + bool FinalizeResult(void** tag, bool* status) override; + + private: + grpc::CompletionQueue& CompletionQueue; + THolder<IIOCallback> IOCallback; + THolder<grpc_cq_completion> Completion; + std::atomic<bool> InQueue; + }; + + class TGrpcTimer: private IIOCallback { + public: + TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback); + + void Set(TInstant triggerTime); + + void Cancel(); + + private: + IIOCallback* Ref() override; + + void OnIOCompleted(EIOStatus status) override; + + private: + grpc::CompletionQueue& CompletionQueue; + THolder<IIOCallback> IOCallback; + grpc::Alarm Alarm; + bool AlarmIsSet; + TFMaybe<TInstant> NextTriggerTime; + }; + + class TGrpcCompletionQueuePoller { + public: + explicit TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue); + + void Start(); + + void Join(); + + private: + grpc::CompletionQueue& Queue; + std::thread Thread; + }; + + class TGrpcCompletionQueueHost { + public: + TGrpcCompletionQueueHost(); + + void Start(); + + void Stop(); + + inline grpc::CompletionQueue& GetCompletionQueue() noexcept { + return CompletionQueue; + } + + private: + grpc::CompletionQueue CompletionQueue; + TGrpcCompletionQueuePoller Poller; + }; + + gpr_timespec InstantToTimespec(TInstant instant); + + void EnsureGrpcConfigured(); + + void StartGrpcTracing(); + + void FinishGrpcTracing(); +} diff --git a/library/cpp/unified_agent_client/grpc_status_code.cpp b/library/cpp/unified_agent_client/grpc_status_code.cpp new file mode 100644 index 0000000000..662bbbe7a5 --- /dev/null +++ b/library/cpp/unified_agent_client/grpc_status_code.cpp @@ -0,0 +1,56 @@ +#include <contrib/libs/grpc/include/grpcpp/impl/codegen/status_code_enum.h> + +#include <util/stream/output.h> + +namespace { + const char* GrpcStatusCodeToString(grpc::StatusCode statusCode) { + switch (statusCode) { + case grpc::OK: + return "OK"; + case grpc::CANCELLED: + return "CANCELLED"; + case grpc::UNKNOWN: + return "UNKNOWN"; + case grpc::INVALID_ARGUMENT: + return "INVALID_ARGUMENT"; + case grpc::DEADLINE_EXCEEDED: + return "DEADLINE_EXCEEDED"; + case grpc::NOT_FOUND: + return "NOT_FOUND"; + case grpc::ALREADY_EXISTS: + return "ALREADY_EXISTS"; + case grpc::PERMISSION_DENIED: + return "PERMISSION_DENIED"; + case grpc::UNAUTHENTICATED: + return "UNAUTHENTICATED"; + case grpc::RESOURCE_EXHAUSTED: + return "RESOURCE_EXHAUSTED"; + case grpc::FAILED_PRECONDITION: + return "FAILED_PRECONDITION"; + case grpc::ABORTED: + return "ABORTED"; + case grpc::OUT_OF_RANGE: + return "OUT_OF_RANGE"; + case grpc::UNIMPLEMENTED: + return "UNIMPLEMENTED"; + case grpc::INTERNAL: + return "INTERNAL"; + case grpc::UNAVAILABLE: + return "UNAVAILABLE"; + case grpc::DATA_LOSS: + return "DATA_LOSS"; + default: + return nullptr; + } + } +} + +template <> +void Out<grpc::StatusCode>(IOutputStream& o, grpc::StatusCode statusCode) { + const auto* s = GrpcStatusCodeToString(statusCode); + if (s == nullptr) { + o << "grpc::StatusCode [" << static_cast<int>(statusCode) << "]"; + } else { + o << s; + } +} diff --git a/library/cpp/unified_agent_client/helpers.cpp b/library/cpp/unified_agent_client/helpers.cpp new file mode 100644 index 0000000000..335cc4e323 --- /dev/null +++ b/library/cpp/unified_agent_client/helpers.cpp @@ -0,0 +1,12 @@ +#include "helpers.h" + +namespace NUnifiedAgent::NPrivate { + bool IsUtf8(const THashMap<TString, TString>& meta) { + for (const auto& p: meta) { + if (!IsUtf(p.first) || !IsUtf(p.second)) { + return false; + } + } + return true; + } +} diff --git a/library/cpp/unified_agent_client/helpers.h b/library/cpp/unified_agent_client/helpers.h new file mode 100644 index 0000000000..33defa2b49 --- /dev/null +++ b/library/cpp/unified_agent_client/helpers.h @@ -0,0 +1,9 @@ +#pragma once + +#include "client.h" + +#include <util/charset/utf8.h> + +namespace NUnifiedAgent::NPrivate { + bool IsUtf8(const THashMap<TString, TString>& meta); +} diff --git a/library/cpp/unified_agent_client/logger.cpp b/library/cpp/unified_agent_client/logger.cpp new file mode 100644 index 0000000000..e9c713f0d0 --- /dev/null +++ b/library/cpp/unified_agent_client/logger.cpp @@ -0,0 +1,130 @@ +#include "logger.h" + +#include <library/cpp/unified_agent_client/clock.h> + +#include <library/cpp/logger/log.h> + +#include <util/datetime/base.h> +#include <util/stream/str.h> +#include <util/system/getpid.h> +#include <util/system/thread.h> + +namespace NUnifiedAgent { + namespace { + TString FormatLogLine(ELogPriority logLevel, const TStringBuf message, const TString& scope) { + TString result; + { + TStringOutput output(result); + output << FormatIsoLocal(TClock::Now()) + << " " << GetPID() + << " " << TThread::CurrentThreadId() + << " " << logLevel; + if (!scope.Empty()) { + output << " " << scope; + } + output << " " << message << "\n"; + } + return result; + } + } + + TLogger::TThrottlerWithLock::TThrottlerWithLock(size_t rateLimitBytes) + : Throttler(rateLimitBytes, rateLimitBytes / 2) + , Lock() + { + } + + bool TLogger::TThrottlerWithLock::TryConsume(double tokens) { + with_lock(Lock) { + return Throttler.TryConsume(tokens); + } + } + + TLogger::TLogger(TLog& log, TFMaybe<size_t> rateLimitBytes) + : DefaultLogContext{log, log.IsNullLog() ? ELogPriority::TLOG_EMERG : log.FiltrationLevel()} + , TracingLogContexts() + , CurrentLogContext_() + , Errors(nullptr) + , DroppedBytes(nullptr) + , Throttler(rateLimitBytes.Defined() ? MakeHolder<TThrottlerWithLock>(*rateLimitBytes) : nullptr) + , Lock() + { + SetCurrentLogContext(DefaultLogContext); + } + + void TLogger::SetCurrentLogContext(TLogContext& logContext) { + CurrentLogContext_.store(logContext.Log.IsNullLog() ? nullptr : &logContext, std::memory_order_release); + } + + void TLogger::Log(TLog& log, ELogPriority logPriority, const TStringBuf message, const TString& scope) const { + try { + const auto logLine = FormatLogLine(logPriority, message, scope); + if (Throttler && &log == &DefaultLogContext.Log && !Throttler->TryConsume(logLine.size())) { + if (DroppedBytes) { + DroppedBytes->Add(logLine.size()); + } + return; + } + log.Write(logPriority, logLine); + } catch (...) { + } + } + + void TLogger::StartTracing(ELogPriority logPriority) noexcept { + with_lock(Lock) { + auto& logContext = GetOrCreateTracingLogContext(logPriority); + SetTracing(logContext, "started"); + } + } + + void TLogger::FinishTracing() noexcept { + with_lock(Lock) { + SetTracing(DefaultLogContext, "finished"); + } + } + + void TLogger::SetTracing(TLogContext& logContext, const char* action) { + // Lock must be held + + SetCurrentLogContext(logContext); + + Log(logContext.Log, + TLOG_INFO, + Sprintf("tracing %s, log priority is set to [%s]", + action, ToString(logContext.Priority).c_str()), + ""); + } + + auto TLogger::GetOrCreateTracingLogContext(ELogPriority logPriority) -> TLogContext& { + // Lock must be held + + for (const auto& c: TracingLogContexts) { + if (c->Priority == logPriority) { + return *c; + } + } + + auto newLogContext = MakeHolder<TLogContext>(); + newLogContext->Log = TLog("cerr", logPriority); + newLogContext->Priority = logPriority; + auto* result = newLogContext.Get(); + TracingLogContexts.push_back(std::move(newLogContext)); + return *result; + } + + TScopeLogger::TScopeLogger() + : Logger(nullptr) + , Scope() + , Errors(nullptr) + { + } + + TScopeLogger::TScopeLogger(TLogger* logger, + const TString& scope, + NMonitoring::TDeprecatedCounter* errors) + : Logger(logger) + , Scope(scope) + , Errors(errors) + { + } +} diff --git a/library/cpp/unified_agent_client/logger.h b/library/cpp/unified_agent_client/logger.h new file mode 100644 index 0000000000..d83cba92de --- /dev/null +++ b/library/cpp/unified_agent_client/logger.h @@ -0,0 +1,157 @@ +#pragma once + +#include <library/cpp/unified_agent_client/f_maybe.h> +#include <library/cpp/unified_agent_client/throttling.h> + +#include <library/cpp/logger/log.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/generic/string.h> +#include <util/string/join.h> +#include <util/string/printf.h> +#include <util/system/file.h> + +#include <atomic> + +#define YLOG(logPriority, message, logger) \ + do { \ + const auto __logPriority = logPriority; \ + if (auto* log = logger.Accept(__logPriority, false); log != nullptr) { \ + logger.Log(*log, __logPriority, message); \ + } \ + } while (false) + +#define YLOG_EMERG(msg) YLOG(TLOG_EMERG, msg, Logger) +#define YLOG_ALERT(msg) YLOG(TLOG_ALERT, msg, Logger) +#define YLOG_CRIT(msg) YLOG(TLOG_CRIT, msg, Logger) +#define YLOG_ERR(msg) YLOG(TLOG_ERR, msg, Logger) +#define YLOG_WARNING(msg) YLOG(TLOG_WARNING, msg, Logger) +#define YLOG_NOTICE(msg) YLOG(TLOG_NOTICE, msg, Logger) +#define YLOG_INFO(msg) YLOG(TLOG_INFO, msg, Logger) +#define YLOG_DEBUG(msg) YLOG(TLOG_DEBUG, msg, Logger) +#define YLOG_RESOURCES(msg) YLOG(TLOG_RESOURCES , msg, Logger) + +#define YLOG_FATAL(msg) \ + YLOG(TLOG_CRIT, msg, Logger); \ + _Exit(1); + +namespace NUnifiedAgent { + class TScopeLogger; + + class TLogger { + public: + TLogger(TLog& log, TFMaybe<size_t> rateLimitBytes); + + void StartTracing(ELogPriority logPriority) noexcept; + + void FinishTracing() noexcept; + + inline TScopeLogger Child(const TString& v, NMonitoring::TDeprecatedCounter* errors = nullptr); + + inline void SetErrorsCounter(NMonitoring::TDeprecatedCounter* counter) noexcept { + Errors = counter; + } + + inline void SetDroppedBytesCounter(NMonitoring::TDeprecatedCounter* counter) noexcept { + DroppedBytes = counter; + } + + inline bool HasRateLimit() const noexcept { + return Throttler != nullptr; + } + + friend class TScopeLogger; + + private: + void Log(TLog& log, ELogPriority logPriority, const TStringBuf message, const TString& scope) const; + + inline TLog* Accept(ELogPriority logPriority, NMonitoring::TDeprecatedCounter* errors) const noexcept { + if ((logPriority <= TLOG_ERR) && (errors != nullptr)) { + ++(*errors); + } + auto* result = CurrentLogContext_.load(std::memory_order_acquire); + return result != nullptr && static_cast<int>(logPriority) <= static_cast<int>(result->Priority) + ? &result->Log + : nullptr; + } + + private: + struct TLogContext { + TLog Log; + ELogPriority Priority; + }; + + class TThrottlerWithLock { + public: + explicit TThrottlerWithLock(size_t rateLimitBytes); + + bool TryConsume(double tokens); + + private: + TThrottler Throttler; + TAdaptiveLock Lock; + }; + + private: + void SetCurrentLogContext(TLogContext& logContext); + + TLogContext& GetOrCreateTracingLogContext(ELogPriority logPriority); + + void SetTracing(TLogContext& logContext, const char* action); + + private: + TLogContext DefaultLogContext; + TVector<THolder<TLogContext>> TracingLogContexts; + std::atomic<TLogContext*> CurrentLogContext_; + NMonitoring::TDeprecatedCounter* Errors; + NMonitoring::TDeprecatedCounter* DroppedBytes; + const THolder<TThrottlerWithLock> Throttler; + TAdaptiveLock Lock; + }; + + class TScopeLogger { + public: + TScopeLogger(); + + inline void Log(TLog& log, ELogPriority logPriority, const TStringBuf message) const { + if (Logger) { + Logger->Log(log, logPriority, message, Scope); + } + } + + inline TLog* Accept(ELogPriority logPriority, bool silent) const noexcept { + return Logger ? Logger->Accept(logPriority, silent ? nullptr : Errors) : nullptr; + } + + inline TScopeLogger Child(const TString& v, NMonitoring::TDeprecatedCounter* errors = nullptr) { + return Logger + ? Logger->Child(Join('/', Scope, v), errors == nullptr ? Errors : errors) + : TScopeLogger(); + } + + inline TLogger* Unwrap() noexcept { + return Logger; + } + + friend class TLogger; + + private: + TScopeLogger(TLogger* logger, + const TString& scope, + NMonitoring::TDeprecatedCounter* errors); + + private: + TLogger* Logger; + TString Scope; + NMonitoring::TDeprecatedCounter* Errors; + }; + + inline TScopeLogger TLogger::Child(const TString& v, NMonitoring::TDeprecatedCounter* errors) { + return TScopeLogger(this, v, errors == nullptr ? Errors : errors); + } + + inline ELogPriority ToLogPriority(int level) noexcept { + const auto result = ClampVal(level, 0, static_cast<int>(TLOG_RESOURCES)); + return static_cast<ELogPriority>(result); + } +} diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt b/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt new file mode 100644 index 0000000000..7cd90f489d --- /dev/null +++ b/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt @@ -0,0 +1,45 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-unified_agent_client-proto) +set_property(TARGET cpp-unified_agent_client-proto PROPERTY + PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h +) +target_link_libraries(cpp-unified_agent_client-proto PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + tools-enum_parser-enum_serialization_runtime + contrib-libs-protobuf +) +target_proto_messages(cpp-unified_agent_client-proto PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto +) +generate_enum_serilization(cpp-unified_agent_client-proto + ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/proto/unified_agent.pb.h +) +target_proto_addincls(cpp-unified_agent_client-proto + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(cpp-unified_agent_client-proto + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +target_proto_plugin(cpp-unified_agent_client-proto + grpc_cpp + grpc_cpp +) diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt b/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..db3af9681d --- /dev/null +++ b/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt @@ -0,0 +1,46 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-unified_agent_client-proto) +set_property(TARGET cpp-unified_agent_client-proto PROPERTY + PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h +) +target_link_libraries(cpp-unified_agent_client-proto PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + tools-enum_parser-enum_serialization_runtime + contrib-libs-protobuf +) +target_proto_messages(cpp-unified_agent_client-proto PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto +) +generate_enum_serilization(cpp-unified_agent_client-proto + ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/proto/unified_agent.pb.h +) +target_proto_addincls(cpp-unified_agent_client-proto + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(cpp-unified_agent_client-proto + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +target_proto_plugin(cpp-unified_agent_client-proto + grpc_cpp + grpc_cpp +) diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt b/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt new file mode 100644 index 0000000000..db3af9681d --- /dev/null +++ b/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt @@ -0,0 +1,46 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(cpp-unified_agent_client-proto) +set_property(TARGET cpp-unified_agent_client-proto PROPERTY + PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h +) +target_link_libraries(cpp-unified_agent_client-proto PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-grpc + tools-enum_parser-enum_serialization_runtime + contrib-libs-protobuf +) +target_proto_messages(cpp-unified_agent_client-proto PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto +) +generate_enum_serilization(cpp-unified_agent_client-proto + ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h + INCLUDE_HEADERS + library/cpp/unified_agent_client/proto/unified_agent.pb.h +) +target_proto_addincls(cpp-unified_agent_client-proto + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(cpp-unified_agent_client-proto + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +target_proto_plugin(cpp-unified_agent_client-proto + grpc_cpp + grpc_cpp +) diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.txt b/library/cpp/unified_agent_client/proto/CMakeLists.txt new file mode 100644 index 0000000000..3e0811fb22 --- /dev/null +++ b/library/cpp/unified_agent_client/proto/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/library/cpp/unified_agent_client/proto/unified_agent.proto b/library/cpp/unified_agent_client/proto/unified_agent.proto new file mode 100644 index 0000000000..68efe35747 --- /dev/null +++ b/library/cpp/unified_agent_client/proto/unified_agent.proto @@ -0,0 +1,101 @@ +syntax = "proto3"; +import "google/protobuf/descriptor.proto"; + +package NUnifiedAgentProto; + +option java_package = "com.yandex.unified_agent"; +option go_package = "a.yandex-team.ru/library/cpp/unified_agent_client/proto;unifiedagent"; + +extend google.protobuf.FileOptions { + bool GenerateYaStyle = 66777; +} + +message Request { + message SessionMetaItem { + string name = 1; + string value = 2; + } + + message Initialize { + // Session_id provided by server, use it in case of reconnects. + string session_id = 1; + + // Session metadata + repeated SessionMetaItem meta = 2; + + string shared_secret_key = 3; + } + + message MessageMetaItem { + // Arbitrary key-value pairs. Can be used by agent filters to modify/filter messages + // or to route them to target outputs. + + // Meta items of all messages should be grouped by meta key, it's expected in the 'key' field. + // Meta values should be passed in the 'value' sequence, it corresponds to the payload + // sequence from DataBatch. If some messages don't have a meta with this key, the range of such messages + // can be passed via skip_start/skip_length sequences. + // For example, [{m:v1}, {}, {}, {m: v2}, {}, {m: v3}, {}, {}] can be represented as follows: + // key: 'm' + // value: ['v1', 'v2', 'v3'] + // skip_start: [1, 4] + // skip_length: [2, 1] + + string key = 1; + repeated string value = 2; + repeated uint32 skip_start = 3; + repeated uint32 skip_length = 4; + } + + message DataBatch { + repeated uint64 seq_no = 1; + repeated uint64 timestamp = 2; //microseconds + repeated bytes payload = 100; + repeated MessageMetaItem meta = 101; + } + + oneof request { + Initialize initialize = 1; + DataBatch data_batch = 2; + } +} + +message Response { + message Initialized { + // Session identifier for log and deduplication purposes. + string session_id = 1; + + // Application can skip all formed messages by seq_no upto last_seq_no - they are consumed by server. + uint64 last_seq_no = 2; + } + + message Ack { + uint64 seq_no = 1; + } + + oneof response { + Initialized initialized = 1; + Ack ack = 2; + } +} + +service UnifiedAgentService { + rpc Session(stream Request) returns (stream Response); +} + + +// dataflow: +// Request.initialize -> UnifiedAgent; +// specify session_id when this is a retry. Сlient can already have sesison_id from previous init response, +// or it can use some pregenerated sessionId for each session. +// Response.initializeded -> client; +// Request.entry -> UnifiedAgent; +// .... +// Response.ack -> client; +// when this record is consumed by UnifiedAgent with choosen garanties UnifiedAgent will send ack to client; +// client can forget about this log record now +// +// grpc finish session -> client; +// something went wrong; client must reconnect and retry all not acknowleged records +// +// Exactly once retries - when reconnect, client must provide previous session_id and same seq_no`s +// for records - only in this case UnifiedAgent can dedup. diff --git a/library/cpp/unified_agent_client/proto_weighing.cpp b/library/cpp/unified_agent_client/proto_weighing.cpp new file mode 100644 index 0000000000..7a532213ea --- /dev/null +++ b/library/cpp/unified_agent_client/proto_weighing.cpp @@ -0,0 +1,99 @@ +#include "proto_weighing.h" + +#include <google/protobuf/io/coded_stream.h> + +namespace NUnifiedAgent::NPW { + template <typename T> + inline size_t SizeOf(T value); + + using CodedOutputStream = google::protobuf::io::CodedOutputStream; + + template <> + inline size_t SizeOf(ui64 value) { + return CodedOutputStream::VarintSize64(value); + } + + template <> + inline size_t SizeOf(ui32 value) { + return CodedOutputStream::VarintSize32(value); + } + + template <> + inline size_t SizeOf(i64 value) { + return CodedOutputStream::VarintSize64(static_cast<google::protobuf::uint64>(value)); + } + + TFieldLink::TFieldLink(TLengthDelimited* container, bool repeated, size_t keySize) + : Container(container) + , OuterSize(0) + , Repeated(repeated) + , KeySize(keySize) + { + } + + void TFieldLink::SetValueSize(bool empty, size_t size) { + const auto newOuterSize = empty && !Repeated ? 0 : KeySize + static_cast<int>(size); + Container->IncSize(newOuterSize - OuterSize); + OuterSize = newOuterSize; + } + + TLengthDelimited::TLengthDelimited(const TFMaybe<TFieldLink>& link) + : Link(link) + , ByteSize(0) + { + } + + void TLengthDelimited::IncSize(int sizeDelta) { + ByteSize += sizeDelta; + if (Link) { + const auto byteSize = static_cast<ui32>(ByteSize); + Link->SetValueSize(false, byteSize + SizeOf(byteSize)); + } + } + + template <typename T> + void TRepeatedField<T>::Add(T value) { + IncSize(static_cast<int>(SizeOf(value))); + } + + template <typename T> + TNumberField<T>::TNumberField(const TFieldLink& link) + : Link(link) + { + } + + template <typename T> + void TNumberField<T>::SetValue(T value) { + Link.SetValueSize(value == 0, SizeOf(value)); + } + + template <typename T> + TFixedNumberField<T>::TFixedNumberField(const TFieldLink& link) + : Link(link) + { + } + + template <typename T> + void TFixedNumberField<T>::SetValue() { + Link.SetValueSize(false, sizeof(T)); + } + + TStringField::TStringField(const TFieldLink& link) + : Link(link) + { + } + + void TStringField::SetValue(const TString& value) { + Link.SetValueSize(value.Empty(), value.Size() + SizeOf(static_cast<ui32>(value.Size()))); + } + + template class TNumberField<ui64>; + template class TNumberField<ui32>; + template class TNumberField<i64>; + template class TFixedNumberField<ui64>; + template class TFixedNumberField<ui32>; + template class TFixedNumberField<i64>; + template class TRepeatedField<ui64>; + template class TRepeatedField<ui32>; + template class TRepeatedField<i64>; +} diff --git a/library/cpp/unified_agent_client/proto_weighing.h b/library/cpp/unified_agent_client/proto_weighing.h new file mode 100644 index 0000000000..47cf577e14 --- /dev/null +++ b/library/cpp/unified_agent_client/proto_weighing.h @@ -0,0 +1,138 @@ +#pragma once + +#include <library/cpp/unified_agent_client/f_maybe.h> + +#include <util/generic/deque.h> +#include <util/generic/string.h> + +namespace NUnifiedAgent::NPW { + class TLengthDelimited; + + class TFieldLink { + public: + TFieldLink(TLengthDelimited* container, bool repeated = false, size_t keySize = 1); + + void SetValueSize(bool empty, size_t size); + + private: + TLengthDelimited* Container; + int OuterSize; + bool Repeated; + size_t KeySize; + }; + + class TLengthDelimited { + public: + explicit TLengthDelimited(const TFMaybe<TFieldLink>& link = Nothing()); + + void IncSize(int sizeDelta); + + size_t ByteSizeLong() const { + return static_cast<size_t>(ByteSize); + } + + private: + TFMaybe<TFieldLink> Link; + int ByteSize; + }; + + using TMessage = TLengthDelimited; + + template <typename T> + class TRepeatedField: public TLengthDelimited { + public: + static_assert(std::is_same_v<T, ui32> || + std::is_same_v<T, ui64> || + std::is_same_v<T, i64>, + "type is not supported"); + + using TLengthDelimited::TLengthDelimited; + + void Add(T value); + }; + + template <typename T> + class TRepeatedPtrField { + public: + explicit TRepeatedPtrField(TMessage* message, size_t keySize = 1) + : Message(message) + , Children() + , KeySize(keySize) + { + } + + size_t GetSize() const { + return Children.size(); + } + + T& Get(size_t index) { + return Children[index]; + } + + T& Add() { + if constexpr (std::is_constructible<T, TFieldLink>::value) { + Children.emplace_back(TFieldLink(Message, true, KeySize)); + } else { + Children.emplace_back(Message); + } + return Children.back(); + } + + private: + TMessage* Message; + TDeque<T> Children; + size_t KeySize; + }; + + template <typename T> + class TNumberField { + public: + static_assert(std::is_same_v<T, ui32> || + std::is_same_v<T, ui64> || + std::is_same_v<T, i64>, + "type is not supported"); + + explicit TNumberField(const TFieldLink& link); + + void SetValue(T value); + + private: + TFieldLink Link; + }; + + template <typename T> + class TFixedNumberField { + public: + static_assert(std::is_same_v<T, ui32> || + std::is_same_v<T, ui64> || + std::is_same_v<T, i64>, + "type is not supported"); + + explicit TFixedNumberField(const TFieldLink& link); + + void SetValue(); + + private: + TFieldLink Link; + }; + + class TStringField { + public: + explicit TStringField(const TFieldLink& link); + + void SetValue(const TString& value); + + private: + TFieldLink Link; + }; + + extern template class TNumberField<ui64>; + extern template class TNumberField<ui32>; + extern template class TNumberField<i64>; + extern template class TFixedNumberField<ui64>; + extern template class TFixedNumberField<ui32>; + extern template class TFixedNumberField<i64>; + extern template class TRepeatedField<ui64>; + extern template class TRepeatedField<ui32>; + extern template class TRepeatedField<i64>; +} diff --git a/library/cpp/unified_agent_client/registrar.cpp b/library/cpp/unified_agent_client/registrar.cpp new file mode 100644 index 0000000000..41f6eb34ca --- /dev/null +++ b/library/cpp/unified_agent_client/registrar.cpp @@ -0,0 +1,8 @@ +#include "backend_creator.h" + +namespace NUnifiedAgent { + + ILogBackendCreator::TFactory::TRegistrator<NUnifiedAgent::TLogBackendCreator> TLogBackendCreator::Registrar("unified_agent"); + +} + diff --git a/library/cpp/unified_agent_client/throttling.cpp b/library/cpp/unified_agent_client/throttling.cpp new file mode 100644 index 0000000000..271f7b0e7e --- /dev/null +++ b/library/cpp/unified_agent_client/throttling.cpp @@ -0,0 +1,67 @@ +#include "throttling.h" + +#include <util/datetime/cputimer.h> + +namespace NUnifiedAgent { + TThrottler::TThrottler(double rate, TDuration updatePeriod) + : CyclesPerMillisecond(GetCyclesPerMillisecond()) + , UpdatePeriod(updatePeriod.MilliSeconds() * CyclesPerMillisecond) + , PeriodTokens(updatePeriod.SecondsFloat() * rate) + , AvailableTokens(0) + , ExpirationTime(0) + { + } + + TThrottler::TThrottler(double rate, double burst) + : TThrottler(rate, TDuration::Seconds(burst / rate)) + { + } + + void TThrottler::Consume(double& tokens, TFMaybe<TDuration>& nextCheckDelay) { + const auto updateTime = UpdateTokens(); + + if (tokens <= AvailableTokens) { + AvailableTokens -= tokens; + tokens = 0.0; + nextCheckDelay = Nothing(); + } else { + tokens -= AvailableTokens; + AvailableTokens = 0.0; + nextCheckDelay = TDuration::MicroSeconds((ExpirationTime - updateTime) * 1000 / CyclesPerMillisecond + 1); + } + } + + bool TThrottler::TryConsume(double tokens) { + UpdateTokens(); + + if (tokens > AvailableTokens) { + return false; + } + AvailableTokens -= tokens; + return true; + } + + void TThrottler::ConsumeAndWait(double tokens) { + TFMaybe<TDuration> nextCheckDelay; + while (true) { + Consume(tokens, nextCheckDelay); + if (!nextCheckDelay.Defined()) { + return; + } + Sleep(*nextCheckDelay); + } + } + + ui64 TThrottler::UpdateTokens() { + const auto updateTime = GetCycleCount(); + if (updateTime >= ExpirationTime) { + if (ExpirationTime == 0) { + ExpirationTime = updateTime + UpdatePeriod; + } else { + ExpirationTime += ((updateTime - ExpirationTime) / UpdatePeriod + 1) * UpdatePeriod; + } + AvailableTokens = PeriodTokens; + } + return updateTime; + } +} diff --git a/library/cpp/unified_agent_client/throttling.h b/library/cpp/unified_agent_client/throttling.h new file mode 100644 index 0000000000..1e5db1e8fa --- /dev/null +++ b/library/cpp/unified_agent_client/throttling.h @@ -0,0 +1,30 @@ +#pragma once + +#include <library/cpp/unified_agent_client/f_maybe.h> + +#include <util/datetime/base.h> + +namespace NUnifiedAgent { + class TThrottler { + public: + explicit TThrottler(double rate, TDuration updatePeriod = TDuration::MilliSeconds(100)); + + TThrottler(double rate, double burst); + + void Consume(double& tokens, TFMaybe<TDuration>& nextCheckDelay); + + bool TryConsume(double tokens); + + void ConsumeAndWait(double tokens); + + private: + ui64 UpdateTokens(); + + private: + ui64 CyclesPerMillisecond; + ui64 UpdatePeriod; + double PeriodTokens; + double AvailableTokens; + ui64 ExpirationTime; + }; +} diff --git a/library/cpp/unified_agent_client/variant.h b/library/cpp/unified_agent_client/variant.h new file mode 100644 index 0000000000..e261aa9af3 --- /dev/null +++ b/library/cpp/unified_agent_client/variant.h @@ -0,0 +1,18 @@ +#pragma once + +#include <variant> + +namespace NUnifiedAgent { + template<class... Ts> struct TOverloaded : Ts... { using Ts::operator()...; }; + template<class... Ts> TOverloaded(Ts...) -> TOverloaded<Ts...>; + + template <class T, class... U> + auto Visit(T&& variant, U&&... visitorOverloads) { + return std::visit(TOverloaded{std::forward<U>(visitorOverloads)...}, std::forward<T>(variant)); + } + + template <typename TTarget, typename... TSourceTypes> + auto CastTo(std::variant<TSourceTypes...>&& variant) { + return Visit(variant, [](auto& p) -> TTarget { return std::move(p); }); + } +} |