aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 10:54:08 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-03-31 12:28:07 +0300
commitfc1cffcfa7f0497a1f97b384a24bcbf23362f3be (patch)
treec15f7ab5b9e9b20fd0ef8fc07d598d28e8b32004 /library
parent8a749596d40e91c896a1907afcd108d9221fbde1 (diff)
downloadydb-fc1cffcfa7f0497a1f97b384a24bcbf23362f3be.tar.gz
Ydb stable 23-1-1923.1.19
x-stable-origin-commit: c5d5a396e89d0a72e0267a55e93d8404d4fb54fe
Diffstat (limited to 'library')
-rw-r--r--library/cpp/CMakeLists.darwin.txt1
-rw-r--r--library/cpp/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/CMakeLists.linux.txt1
-rw-r--r--library/cpp/actors/core/actorsystem.h16
-rw-r--r--library/cpp/actors/core/config.h12
-rw-r--r--library/cpp/actors/core/executor_pool.h14
-rw-r--r--library/cpp/actors/core/executor_pool_basic.cpp19
-rw-r--r--library/cpp/actors/core/executor_pool_basic.h3
-rw-r--r--library/cpp/actors/core/executor_pool_basic_ut.cpp2
-rw-r--r--library/cpp/actors/core/executor_pool_united_ut.cpp2
-rw-r--r--library/cpp/actors/core/harmonizer.cpp65
-rw-r--r--library/cpp/actors/core/harmonizer.h1
-rw-r--r--library/cpp/actors/core/mon_stats.h9
-rw-r--r--library/cpp/actors/core/worker_context.h3
-rw-r--r--library/cpp/actors/helpers/pool_stats_collector.h11
-rw-r--r--library/cpp/actors/interconnect/events_local.h15
-rw-r--r--library/cpp/actors/interconnect/handshake_broker.h78
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp88
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp8
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp7
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_proxy.h28
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp30
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h12
-rw-r--r--library/cpp/actors/interconnect/watchdog_timer.h13
-rw-r--r--library/cpp/actors/util/rc_buf.h7
-rw-r--r--library/cpp/charset/CMakeLists.darwin.txt2
-rw-r--r--library/cpp/charset/CMakeLists.linux-aarch64.txt2
-rw-r--r--library/cpp/charset/CMakeLists.linux.txt2
-rw-r--r--library/cpp/charset/iconv.cpp8
-rw-r--r--library/cpp/grpc/server/grpc_request.h6
-rw-r--r--library/cpp/grpc/server/grpc_request_base.h3
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp37
-rw-r--r--library/cpp/grpc/server/grpc_server.h80
-rw-r--r--library/cpp/unified_agent_client/CMakeLists.darwin.txt63
-rw-r--r--library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt65
-rw-r--r--library/cpp/unified_agent_client/CMakeLists.linux.txt65
-rw-r--r--library/cpp/unified_agent_client/CMakeLists.txt15
-rw-r--r--library/cpp/unified_agent_client/async_joiner.h42
-rw-r--r--library/cpp/unified_agent_client/backend.cpp112
-rw-r--r--library/cpp/unified_agent_client/backend.h27
-rw-r--r--library/cpp/unified_agent_client/backend_creator.cpp63
-rw-r--r--library/cpp/unified_agent_client/backend_creator.h25
-rw-r--r--library/cpp/unified_agent_client/client.h256
-rw-r--r--library/cpp/unified_agent_client/client_impl.cpp1274
-rw-r--r--library/cpp/unified_agent_client/client_impl.h364
-rw-r--r--library/cpp/unified_agent_client/client_proto_weighing.h75
-rw-r--r--library/cpp/unified_agent_client/clock.cpp48
-rw-r--r--library/cpp/unified_agent_client/clock.h37
-rw-r--r--library/cpp/unified_agent_client/counters.cpp36
-rw-r--r--library/cpp/unified_agent_client/counters.h38
-rw-r--r--library/cpp/unified_agent_client/duration_counter.cpp41
-rw-r--r--library/cpp/unified_agent_client/duration_counter.h43
-rw-r--r--library/cpp/unified_agent_client/dynamic_counters_wrapper.h34
-rw-r--r--library/cpp/unified_agent_client/enum.h30
-rw-r--r--library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp122
-rw-r--r--library/cpp/unified_agent_client/f_maybe.h23
-rw-r--r--library/cpp/unified_agent_client/grpc_io.cpp161
-rw-r--r--library/cpp/unified_agent_client/grpc_io.h141
-rw-r--r--library/cpp/unified_agent_client/grpc_status_code.cpp56
-rw-r--r--library/cpp/unified_agent_client/helpers.cpp12
-rw-r--r--library/cpp/unified_agent_client/helpers.h9
-rw-r--r--library/cpp/unified_agent_client/logger.cpp130
-rw-r--r--library/cpp/unified_agent_client/logger.h157
-rw-r--r--library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt45
-rw-r--r--library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt46
-rw-r--r--library/cpp/unified_agent_client/proto/CMakeLists.linux.txt46
-rw-r--r--library/cpp/unified_agent_client/proto/CMakeLists.txt15
-rw-r--r--library/cpp/unified_agent_client/proto/unified_agent.proto101
-rw-r--r--library/cpp/unified_agent_client/proto_weighing.cpp99
-rw-r--r--library/cpp/unified_agent_client/proto_weighing.h138
-rw-r--r--library/cpp/unified_agent_client/registrar.cpp8
-rw-r--r--library/cpp/unified_agent_client/throttling.cpp67
-rw-r--r--library/cpp/unified_agent_client/throttling.h30
-rw-r--r--library/cpp/unified_agent_client/variant.h18
74 files changed, 4459 insertions, 304 deletions
diff --git a/library/cpp/CMakeLists.darwin.txt b/library/cpp/CMakeLists.darwin.txt
index 74f3a1430a..50514bafce 100644
--- a/library/cpp/CMakeLists.darwin.txt
+++ b/library/cpp/CMakeLists.darwin.txt
@@ -82,6 +82,7 @@ add_subdirectory(time_provider)
add_subdirectory(timezone_conversion)
add_subdirectory(tld)
add_subdirectory(unicode)
+add_subdirectory(unified_agent_client)
add_subdirectory(uri)
add_subdirectory(xml)
add_subdirectory(yaml)
diff --git a/library/cpp/CMakeLists.linux-aarch64.txt b/library/cpp/CMakeLists.linux-aarch64.txt
index 7ccca0159a..2bc7249205 100644
--- a/library/cpp/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/CMakeLists.linux-aarch64.txt
@@ -81,6 +81,7 @@ add_subdirectory(time_provider)
add_subdirectory(timezone_conversion)
add_subdirectory(tld)
add_subdirectory(unicode)
+add_subdirectory(unified_agent_client)
add_subdirectory(uri)
add_subdirectory(xml)
add_subdirectory(yaml)
diff --git a/library/cpp/CMakeLists.linux.txt b/library/cpp/CMakeLists.linux.txt
index 74f3a1430a..50514bafce 100644
--- a/library/cpp/CMakeLists.linux.txt
+++ b/library/cpp/CMakeLists.linux.txt
@@ -82,6 +82,7 @@ add_subdirectory(time_provider)
add_subdirectory(timezone_conversion)
add_subdirectory(tld)
add_subdirectory(unicode)
+add_subdirectory(unified_agent_client)
add_subdirectory(uri)
add_subdirectory(xml)
add_subdirectory(yaml)
diff --git a/library/cpp/actors/core/actorsystem.h b/library/cpp/actors/core/actorsystem.h
index 8051f5ee57..cd2cfda1bb 100644
--- a/library/cpp/actors/core/actorsystem.h
+++ b/library/cpp/actors/core/actorsystem.h
@@ -122,7 +122,21 @@ namespace NActors {
}
ui32 GetThreads(ui32 poolId) const {
- return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId);
+ auto result = GetThreadsOptional(poolId);
+ Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId);
+ return *result;
+ }
+
+ std::optional<ui32> GetThreadsOptional(const ui32 poolId) const {
+ if (Y_LIKELY(Executors)) {
+ if (Y_LIKELY(poolId < ExecutorsCount)) {
+ return Executors[poolId]->GetDefaultThreadCount();
+ } else {
+ return {};
+ }
+ } else {
+ return CpuManager.GetThreadsOptional(poolId);
+ }
}
};
diff --git a/library/cpp/actors/core/config.h b/library/cpp/actors/core/config.h
index 0bf4b871d7..650b1f39f5 100644
--- a/library/cpp/actors/core/config.h
+++ b/library/cpp/actors/core/config.h
@@ -128,10 +128,10 @@ namespace NActors {
Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId);
}
- ui32 GetThreads(ui32 poolId) const {
+ std::optional<ui32> GetThreadsOptional(ui32 poolId) const {
for (const auto& p : Basic) {
if (p.PoolId == poolId) {
- return p.Threads;
+ return p.DefaultThreadCount;
}
}
for (const auto& p : IO) {
@@ -144,7 +144,13 @@ namespace NActors {
return p.Concurrency ? p.Concurrency : UnitedWorkers.CpuCount;
}
}
- Y_FAIL("undefined pool id: %" PRIu32, (ui32)poolId);
+ return {};
+ }
+
+ ui32 GetThreads(ui32 poolId) const {
+ auto result = GetThreadsOptional(poolId);
+ Y_VERIFY(result, "undefined pool id: %" PRIu32, (ui32)poolId);
+ return *result;
}
};
diff --git a/library/cpp/actors/core/executor_pool.h b/library/cpp/actors/core/executor_pool.h
index f39415c7e2..c7c85e61fd 100644
--- a/library/cpp/actors/core/executor_pool.h
+++ b/library/cpp/actors/core/executor_pool.h
@@ -10,6 +10,11 @@ namespace NActors {
struct TWorkerContext;
class ISchedulerCookie;
+ struct TCpuConsumption {
+ double ConsumedUs = 0;
+ double BookedUs = 0;
+ };
+
class IExecutorPool : TNonCopyable {
public:
const ui32 PoolId;
@@ -131,14 +136,9 @@ namespace NActors {
return false;
}
- virtual double GetThreadConsumedUs(i16 threadIdx) {
- Y_UNUSED(threadIdx);
- return 0.0;
- }
-
- virtual double GetThreadBookedUs(i16 threadIdx) {
+ virtual TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) {
Y_UNUSED(threadIdx);
- return 0.0;
+ return TCpuConsumption{0.0, 0.0};
}
};
diff --git a/library/cpp/actors/core/executor_pool_basic.cpp b/library/cpp/actors/core/executor_pool_basic.cpp
index 0c984f8fb0..de04105991 100644
--- a/library/cpp/actors/core/executor_pool_basic.cpp
+++ b/library/cpp/actors/core/executor_pool_basic.cpp
@@ -334,6 +334,8 @@ namespace NActors {
poolStats.MaxUtilizationTime = RelaxedLoad(&MaxUtilizationAccumulator) / (i64)(NHPTimer::GetCyclesPerSecond() / 1000);
poolStats.WrongWakenedThreadCount = RelaxedLoad(&WrongWakenedThreadCount);
poolStats.CurrentThreadCount = RelaxedLoad(&ThreadCount);
+ poolStats.DefaultThreadCount = DefaultThreadCount;
+ poolStats.MaxThreadCount = MaxThreadCount;
if (Harmonizer) {
TPoolHarmonizedStats stats = Harmonizer->GetPoolStats(PoolId);
poolStats.IsNeedy = stats.IsNeedy;
@@ -342,6 +344,7 @@ namespace NActors {
poolStats.IncreasingThreadsByNeedyState = stats.IncreasingThreadsByNeedyState;
poolStats.DecreasingThreadsByStarvedState = stats.DecreasingThreadsByStarvedState;
poolStats.DecreasingThreadsByHoggishState = stats.DecreasingThreadsByHoggishState;
+ poolStats.PotentialMaxThreadCount = stats.PotentialMaxThreadCount;
}
statsCopy.resize(PoolThreads + 1);
@@ -490,24 +493,14 @@ namespace NActors {
return false;
}
- double TBasicExecutorPool::GetThreadConsumedUs(i16 threadIdx) {
+ TCpuConsumption TBasicExecutorPool::GetThreadCpuConsumption(i16 threadIdx) {
if ((ui32)threadIdx >= PoolThreads) {
- return 0;
+ return {0.0, 0.0};
}
TThreadCtx& threadCtx = Threads[threadIdx];
TExecutorThreadStats stats;
threadCtx.Thread->GetCurrentStats(stats);
- return Ts2Us(stats.ElapsedTicks);
- }
-
- double TBasicExecutorPool::GetThreadBookedUs(i16 threadIdx) {
- if ((ui32)threadIdx >= PoolThreads) {
- return 0;
- }
- TThreadCtx& threadCtx = Threads[threadIdx];
- TExecutorThreadStats stats;
- threadCtx.Thread->GetCurrentStats(stats);
- return stats.CpuNs / 1000.0;
+ return {Ts2Us(stats.SafeElapsedTicks), static_cast<double>(stats.CpuUs)};
}
i16 TBasicExecutorPool::GetBlockingThreadCount() const {
diff --git a/library/cpp/actors/core/executor_pool_basic.h b/library/cpp/actors/core/executor_pool_basic.h
index cd94a998f1..813f91dc9a 100644
--- a/library/cpp/actors/core/executor_pool_basic.h
+++ b/library/cpp/actors/core/executor_pool_basic.h
@@ -153,8 +153,7 @@ namespace NActors {
i16 GetMinThreadCount() const override;
i16 GetMaxThreadCount() const override;
bool IsThreadBeingStopped(i16 threadIdx) const override;
- double GetThreadConsumedUs(i16 threadIdx) override;
- double GetThreadBookedUs(i16 threadIdx) override;
+ TCpuConsumption GetThreadCpuConsumption(i16 threadIdx) override;
i16 GetBlockingThreadCount() const override;
i16 GetPriority() const override;
diff --git a/library/cpp/actors/core/executor_pool_basic_ut.cpp b/library/cpp/actors/core/executor_pool_basic_ut.cpp
index 6361bc6662..f96f65931a 100644
--- a/library/cpp/actors/core/executor_pool_basic_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_basic_ut.cpp
@@ -339,7 +339,7 @@ Y_UNIT_TEST_SUITE(BasicExecutorPool) {
UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0);
- //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined
+ //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined
UNIT_ASSERT(stats[0].ElapsedTicks > 0);
UNIT_ASSERT(stats[0].ParkedTicks > 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0);
diff --git a/library/cpp/actors/core/executor_pool_united_ut.cpp b/library/cpp/actors/core/executor_pool_united_ut.cpp
index 133e9c5f2a..a7c7399d73 100644
--- a/library/cpp/actors/core/executor_pool_united_ut.cpp
+++ b/library/cpp/actors/core/executor_pool_united_ut.cpp
@@ -171,7 +171,7 @@ Y_UNIT_TEST_SUITE(UnitedExecutorPool) {
//UNIT_ASSERT_VALUES_EQUAL(stats[0].PreemptedEvents, 0); // depends on execution time and system load, so may be non-zero
UNIT_ASSERT_VALUES_EQUAL(stats[0].NonDeliveredEvents, 0);
UNIT_ASSERT_VALUES_EQUAL(stats[0].EmptyMailboxActivation, 0);
- //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuNs, 0); // depends on total duration of test, so undefined
+ //UNIT_ASSERT_VALUES_EQUAL(stats[0].CpuUs, 0); // depends on total duration of test, so undefined
UNIT_ASSERT(stats[0].ElapsedTicks > 0);
//UNIT_ASSERT(stats[0].ParkedTicks == 0); // per-pool parked time does not make sense for united pools
UNIT_ASSERT_VALUES_EQUAL(stats[0].BlockedTicks, 0);
diff --git a/library/cpp/actors/core/harmonizer.cpp b/library/cpp/actors/core/harmonizer.cpp
index f318d8909c..e2fd0c5f24 100644
--- a/library/cpp/actors/core/harmonizer.cpp
+++ b/library/cpp/actors/core/harmonizer.cpp
@@ -121,6 +121,7 @@ struct TPoolInfo {
TAtomic IncreasingThreadsByNeedyState = 0;
TAtomic DecreasingThreadsByStarvedState = 0;
TAtomic DecreasingThreadsByHoggishState = 0;
+ TAtomic PotentialMaxThreadCount = 0;
bool IsBeingStopped(i16 threadIdx);
double GetBooked(i16 threadIdx);
@@ -169,9 +170,10 @@ double TPoolInfo::GetlastSecondPoolConsumed(i16 threadIdx) {
void TPoolInfo::PullStats(ui64 ts) {
for (i16 threadIdx = 0; threadIdx < MaxThreadCount; ++threadIdx) {
TThreadInfo &threadInfo = ThreadInfo[threadIdx];
- threadInfo.Consumed.Register(ts, Pool->GetThreadConsumedUs(threadIdx));
+ TCpuConsumption cpuConsumption = Pool->GetThreadCpuConsumption(threadIdx);
+ threadInfo.Consumed.Register(ts, cpuConsumption.ConsumedUs);
LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "consumed", UNROLL_HISTORY(threadInfo.Consumed.History));
- threadInfo.Booked.Register(ts, Pool->GetThreadBookedUs(threadIdx));
+ threadInfo.Booked.Register(ts, cpuConsumption.BookedUs);
LWPROBE(SavedValues, Pool->PoolId, Pool->GetName(), "booked", UNROLL_HISTORY(threadInfo.Booked.History));
}
}
@@ -236,7 +238,7 @@ void THarmonizer::PullStats(ui64 ts) {
}
Y_FORCE_INLINE bool IsStarved(double consumed, double booked) {
- return Max(consumed, booked) > 0.1 && consumed < booked * 0.7;
+ return consumed < booked * 0.7;
}
Y_FORCE_INLINE bool IsHoggish(double booked, ui16 currentThreadCount) {
@@ -293,35 +295,43 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
LWPROBE(HarmonizeCheckPool, poolIdx, pool.Pool->GetName(), poolBooked, poolConsumed, lastSecondPoolBooked, lastSecondPoolConsumed, pool.GetThreadCount(), pool.MaxThreadCount, isStarved, isNeedy, isHoggish);
}
double budget = total - Max(booked, lastSecondBooked);
+ i16 budgetInt = static_cast<i16>(Max(budget, 0.0));
if (budget < -0.1) {
isStarvedPresent = true;
}
+ for (size_t poolIdx = 0; poolIdx < Pools.size(); ++poolIdx) {
+ TPoolInfo& pool = Pools[poolIdx];
+ AtomicSet(pool.PotentialMaxThreadCount, Min(pool.MaxThreadCount, budgetInt));
+ }
double overbooked = consumed - booked;
if (isStarvedPresent) {
- // last_starved_at_consumed_value = сумма по всем пулам consumed;
- // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
- // использовать вместо total
- if (beingStopped && beingStopped >= overbooked) {
- // do nothing
- } else {
- TStackVec<size_t> reorder;
- for (size_t i = 0; i < Pools.size(); ++i) {
- reorder.push_back(i);
- }
- for (ui16 poolIdx : PriorityOrder) {
- TPoolInfo &pool = Pools[poolIdx];
- i64 threadCount = pool.GetThreadCount();
- if (threadCount > pool.DefaultThreadCount) {
- pool.SetThreadCount(threadCount - 1);
- AtomicIncrement(pool.DecreasingThreadsByStarvedState);
- overbooked--;
- LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
- if (overbooked < 1) {
- break;
- }
- }
- }
- }
+ // last_starved_at_consumed_value = сумма по всем пулам consumed;
+ // TODO(cthulhu): использовать как лимит планвно устремлять этот лимит к total,
+ // использовать вместо total
+ if (beingStopped && beingStopped >= overbooked) {
+ // do nothing
+ } else {
+ TStackVec<size_t> reorder;
+ for (size_t i = 0; i < Pools.size(); ++i) {
+ reorder.push_back(i);
+ }
+ for (ui16 poolIdx : PriorityOrder) {
+ TPoolInfo &pool = Pools[poolIdx];
+ i64 threadCount = pool.GetThreadCount();
+ while (threadCount > pool.DefaultThreadCount) {
+ pool.SetThreadCount(threadCount - 1);
+ AtomicIncrement(pool.DecreasingThreadsByStarvedState);
+ overbooked--;
+ LWPROBE(HarmonizeOperation, poolIdx, pool.Pool->GetName(), "decrease", threadCount - 1, pool.DefaultThreadCount, pool.MaxThreadCount);
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ if (overbooked < 1) {
+ break;
+ }
+ }
+ }
} else {
for (size_t needyPoolIdx : needyPools) {
TPoolInfo &pool = Pools[needyPoolIdx];
@@ -422,6 +432,7 @@ TPoolHarmonizedStats THarmonizer::GetPoolStats(i16 poolId) const {
.IncreasingThreadsByNeedyState = static_cast<ui64>(RelaxedLoad(&pool.IncreasingThreadsByNeedyState)),
.DecreasingThreadsByStarvedState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByStarvedState)),
.DecreasingThreadsByHoggishState = static_cast<ui64>(RelaxedLoad(&pool.DecreasingThreadsByHoggishState)),
+ .PotentialMaxThreadCount = static_cast<i16>(RelaxedLoad(&pool.PotentialMaxThreadCount)),
.IsNeedy = static_cast<bool>(flags & 1),
.IsStarved = static_cast<bool>(flags & 2),
.IsHoggish = static_cast<bool>(flags & 4),
diff --git a/library/cpp/actors/core/harmonizer.h b/library/cpp/actors/core/harmonizer.h
index 61f13e43ac..bc6b938fe8 100644
--- a/library/cpp/actors/core/harmonizer.h
+++ b/library/cpp/actors/core/harmonizer.h
@@ -10,6 +10,7 @@ namespace NActors {
ui64 IncreasingThreadsByNeedyState = 0;
ui64 DecreasingThreadsByStarvedState = 0;
ui64 DecreasingThreadsByHoggishState = 0;
+ i16 PotentialMaxThreadCount = 0;
bool IsNeedy = false;
bool IsStarved = false;
bool IsHoggish = false;
diff --git a/library/cpp/actors/core/mon_stats.h b/library/cpp/actors/core/mon_stats.h
index 38629e2aa1..4c664a964a 100644
--- a/library/cpp/actors/core/mon_stats.h
+++ b/library/cpp/actors/core/mon_stats.h
@@ -65,6 +65,9 @@ namespace NActors {
ui64 DecreasingThreadsByHoggishState = 0;
i16 WrongWakenedThreadCount = 0;
i16 CurrentThreadCount = 0;
+ i16 PotentialMaxThreadCount = 0;
+ i16 DefaultThreadCount = 0;
+ i16 MaxThreadCount = 0;
bool IsNeedy = false;
bool IsStarved = false;
bool IsHoggish = false;
@@ -76,7 +79,8 @@ namespace NActors {
ui64 PreemptedEvents = 0; // Number of events experienced hard preemption
ui64 NonDeliveredEvents = 0;
ui64 EmptyMailboxActivation = 0;
- ui64 CpuNs = 0; // nanoseconds thread was executing on CPU (accounts for preemtion)
+ ui64 CpuUs = 0; // microseconds thread was executing on CPU (accounts for preemtion)
+ ui64 SafeElapsedTicks = 0;
ui64 WorstActivationTimeUs = 0;
NHPTimer::STime ElapsedTicks = 0;
NHPTimer::STime ParkedTicks = 0;
@@ -120,7 +124,8 @@ namespace NActors {
PreemptedEvents += RelaxedLoad(&other.PreemptedEvents);
NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents);
EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation);
- CpuNs += RelaxedLoad(&other.CpuNs);
+ CpuUs += RelaxedLoad(&other.CpuUs);
+ SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks);
RelaxedStore(
&WorstActivationTimeUs,
std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs)));
diff --git a/library/cpp/actors/core/worker_context.h b/library/cpp/actors/core/worker_context.h
index 2179771fb6..c3a2947df1 100644
--- a/library/cpp/actors/core/worker_context.h
+++ b/library/cpp/actors/core/worker_context.h
@@ -137,7 +137,8 @@ namespace NActors {
}
void UpdateThreadTime() {
- RelaxedStore(&WorkerStats.CpuNs, ThreadCPUTime() * 1000);
+ RelaxedStore(&WorkerStats.SafeElapsedTicks, (ui64)RelaxedLoad(&WorkerStats.ElapsedTicks));
+ RelaxedStore(&WorkerStats.CpuUs, ThreadCPUTime());
}
#else
void GetCurrentStats(TExecutorThreadStats&) const {}
diff --git a/library/cpp/actors/helpers/pool_stats_collector.h b/library/cpp/actors/helpers/pool_stats_collector.h
index b1217b1d63..d80951827d 100644
--- a/library/cpp/actors/helpers/pool_stats_collector.h
+++ b/library/cpp/actors/helpers/pool_stats_collector.h
@@ -126,6 +126,9 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr MailboxPushedOutByEventCount;
NMonitoring::TDynamicCounters::TCounterPtr WrongWakenedThreadCount;
NMonitoring::TDynamicCounters::TCounterPtr CurrentThreadCount;
+ NMonitoring::TDynamicCounters::TCounterPtr PotentialMaxThreadCount;
+ NMonitoring::TDynamicCounters::TCounterPtr DefaultThreadCount;
+ NMonitoring::TDynamicCounters::TCounterPtr MaxThreadCount;
NMonitoring::TDynamicCounters::TCounterPtr IsNeedy;
NMonitoring::TDynamicCounters::TCounterPtr IsStarved;
NMonitoring::TDynamicCounters::TCounterPtr IsHoggish;
@@ -178,6 +181,9 @@ private:
MailboxPushedOutByEventCount = PoolGroup->GetCounter("MailboxPushedOutByEventCount", true);
WrongWakenedThreadCount = PoolGroup->GetCounter("WrongWakenedThreadCount", true);
CurrentThreadCount = PoolGroup->GetCounter("CurrentThreadCount", false);
+ PotentialMaxThreadCount = PoolGroup->GetCounter("PotentialMaxThreadCount", false);
+ DefaultThreadCount = PoolGroup->GetCounter("DefaultThreadCount", false);
+ MaxThreadCount = PoolGroup->GetCounter("MaxThreadCount", false);
IsNeedy = PoolGroup->GetCounter("IsNeedy", false);
IsStarved = PoolGroup->GetCounter("IsStarved", false);
IsHoggish = PoolGroup->GetCounter("IsHoggish", false);
@@ -211,7 +217,7 @@ private:
*NonDeliveredEvents = stats.NonDeliveredEvents;
*DestroyedActors = stats.PoolDestroyedActors;
*EmptyMailboxActivation = stats.EmptyMailboxActivation;
- *CpuMicrosec = stats.CpuNs / 1000;
+ *CpuMicrosec = stats.CpuUs;
*ElapsedMicrosec = ::NHPTimer::GetSeconds(stats.ElapsedTicks)*1000000;
*ParkedMicrosec = ::NHPTimer::GetSeconds(stats.ParkedTicks)*1000000;
*ActorRegistrations = stats.PoolActorRegistrations;
@@ -222,6 +228,9 @@ private:
*MailboxPushedOutByEventCount = stats.MailboxPushedOutByEventCount;
*WrongWakenedThreadCount = poolStats.WrongWakenedThreadCount;
*CurrentThreadCount = poolStats.CurrentThreadCount;
+ *PotentialMaxThreadCount = poolStats.PotentialMaxThreadCount;
+ *DefaultThreadCount = poolStats.DefaultThreadCount;
+ *MaxThreadCount = poolStats.MaxThreadCount;
*IsNeedy = poolStats.IsNeedy;
*IsStarved = poolStats.IsStarved;
*IsHoggish = poolStats.IsHoggish;
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 43f376038b..b1b8ae0c75 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -52,9 +52,6 @@ namespace NActors {
EvProcessPingRequest,
EvGetSecureSocket,
EvSecureSocket,
- HandshakeBrokerTake,
- HandshakeBrokerFree,
- HandshakeBrokerPermit,
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// nonlocal messages; their indices must be preserved in order to work properly while doing rolling update
@@ -101,18 +98,6 @@ namespace NActors {
}
};
- struct TEvHandshakeBrokerTake: public TEventLocal<TEvHandshakeBrokerTake, ui32(ENetwork::HandshakeBrokerTake)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerTake, "Network: TEvHandshakeBrokerTake")
- };
-
- struct TEvHandshakeBrokerFree: public TEventLocal<TEvHandshakeBrokerFree, ui32(ENetwork::HandshakeBrokerFree)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerFree, "Network: TEvHandshakeBrokerFree")
- };
-
- struct TEvHandshakeBrokerPermit: public TEventLocal<TEvHandshakeBrokerPermit, ui32(ENetwork::HandshakeBrokerPermit)> {
- DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeBrokerPermit, "Network: TEvHandshakeBrokerPermit")
- };
-
struct TEvHandshakeAsk: public TEventLocal<TEvHandshakeAsk, ui32(ENetwork::HandshakeAsk)> {
DEFINE_SIMPLE_LOCAL_EVENT(TEvHandshakeAsk, "Network: TEvHandshakeAsk")
TEvHandshakeAsk(const TActorId& self,
diff --git a/library/cpp/actors/interconnect/handshake_broker.h b/library/cpp/actors/interconnect/handshake_broker.h
deleted file mode 100644
index 70a7cb91dc..0000000000
--- a/library/cpp/actors/interconnect/handshake_broker.h
+++ /dev/null
@@ -1,78 +0,0 @@
-#pragma once
-
-#include <library/cpp/actors/core/actor.h>
-
-#include <deque>
-
-namespace NActors {
- static constexpr ui32 DEFAULT_INFLIGHT = 100;
-
- class THandshakeBroker : public TActor<THandshakeBroker> {
- private:
- std::deque<TActorId> Waiting;
- ui32 Capacity;
-
- void Handle(TEvHandshakeBrokerTake::TPtr &ev) {
- if (Capacity > 0) {
- Capacity -= 1;
- Send(ev->Sender, new TEvHandshakeBrokerPermit());
- } else {
- Waiting.push_back(ev->Sender);
- }
- }
-
- void Handle(TEvHandshakeBrokerFree::TPtr& ev) {
- Y_UNUSED(ev);
- if (Capacity == 0 && !Waiting.empty()) {
- Send(Waiting.front(), new TEvHandshakeBrokerPermit());
- Waiting.pop_front();
- } else {
- Capacity += 1;
- }
- }
-
- void PassAway() override {
- while (!Waiting.empty()) {
- Send(Waiting.front(), new TEvHandshakeBrokerPermit());
- Waiting.pop_front();
- }
- TActor::PassAway();
- }
-
- public:
- THandshakeBroker(ui32 inflightLimit = DEFAULT_INFLIGHT)
- : TActor(&TThis::StateFunc)
- , Capacity(inflightLimit)
- {
- }
-
- static constexpr char ActorName[] = "HANDSHAKE_BROKER_ACTOR";
-
- STFUNC(StateFunc) {
- Y_UNUSED(ctx);
- switch(ev->GetTypeRewrite()) {
- hFunc(TEvHandshakeBrokerTake, Handle);
- hFunc(TEvHandshakeBrokerFree, Handle);
- cFunc(TEvents::TSystem::Poison, PassAway);
- }
- }
-
- void Bootstrap() {
- Become(&TThis::StateFunc);
- };
- };
-
- inline IActor* CreateHandshakeBroker() {
- return new THandshakeBroker();
- }
-
- inline TActorId MakeHandshakeBrokerOutId() {
- char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'O', 'u', 't'};
- return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
- }
-
- inline TActorId MakeHandshakeBrokerInId() {
- char x[12] = {'I', 'C', 'H', 's', 'h', 'k', 'B', 'r', 'k', 'r', 'I', 'n'};
- return TActorId(0, TStringBuf(std::begin(x), std::end(x)));
- }
-};
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index a9c6b1dd11..dc651f3762 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -1,5 +1,4 @@
#include "interconnect_handshake.h"
-#include "handshake_broker.h"
#include "interconnect_tcp_proxy.h"
#include <library/cpp/actors/core/actor_coroutine.h>
@@ -97,13 +96,8 @@ namespace NActors {
THashMap<ui32, TInstant> LastLogNotice;
const TDuration MuteDuration = TDuration::Seconds(15);
TInstant Deadline;
- TActorId HandshakeBroker;
public:
- static constexpr IActor::EActivityType ActorActivityType() {
- return IActor::INTERCONNECT_HANDSHAKE;
- }
-
THandshakeActor(TInterconnectProxyCommon::TPtr common, const TActorId& self, const TActorId& peer,
ui32 nodeId, ui64 nextPacket, TString peerHostName, TSessionParams params)
: TActorCoroImpl(StackSize, true, true) // allow unhandled poison pills and dtors
@@ -119,7 +113,6 @@ namespace NActors {
Y_VERIFY(SelfVirtualId);
Y_VERIFY(SelfVirtualId.NodeId());
Y_VERIFY(PeerNodeId);
- HandshakeBroker = MakeHandshakeBrokerOutId();
}
THandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket)
@@ -135,7 +128,6 @@ namespace NActors {
} else {
PeerAddr.clear();
}
- HandshakeBroker = MakeHandshakeBrokerInId();
}
void UpdatePrefix() {
@@ -145,64 +137,45 @@ namespace NActors {
void Run() override {
UpdatePrefix();
- bool isBrokerActive = false;
-
- if (Send(HandshakeBroker, new TEvHandshakeBrokerTake())) {
- isBrokerActive = true;
- WaitForSpecificEvent<TEvHandshakeBrokerPermit>("HandshakeBrokerPermit");
+ // set up overall handshake process timer
+ TDuration timeout = Common->Settings.Handshake;
+ if (timeout == TDuration::Zero()) {
+ timeout = DEFAULT_HANDSHAKE_TIMEOUT;
}
+ timeout += ResolveTimeout * 2;
+ Deadline = Now() + timeout;
+ Schedule(Deadline, new TEvents::TEvWakeup);
try {
- // set up overall handshake process timer
- TDuration timeout = Common->Settings.Handshake;
- if (timeout == TDuration::Zero()) {
- timeout = DEFAULT_HANDSHAKE_TIMEOUT;
- }
- timeout += ResolveTimeout * 2;
- Deadline = Now() + timeout;
- Schedule(Deadline, new TEvents::TEvWakeup);
-
- try {
- if (Socket) {
- PerformIncomingHandshake();
- } else {
- PerformOutgoingHandshake();
- }
-
- // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
- if (ProgramInfo) {
- if (Params.Encryption) {
- EstablishSecureConnection();
- } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
- Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
- }
- }
- } catch (const TExHandshakeFailed&) {
- ProgramInfo.Clear();
+ if (Socket) {
+ PerformIncomingHandshake();
+ } else {
+ PerformOutgoingHandshake();
}
+ // establish encrypted channel, or, in case when encryption is disabled, check if it matches settings
if (ProgramInfo) {
- LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
- Y_VERIFY(NextPacketFromPeer);
- if (PollerToken) {
- Y_VERIFY(PollerToken->RefCount() == 1);
- PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ if (Params.Encryption) {
+ EstablishSecureConnection();
+ } else if (Common->Settings.EncryptionMode == EEncryptionMode::REQUIRED && !Params.AuthOnly) {
+ Fail(TEvHandshakeFail::HANDSHAKE_FAIL_PERMANENT, "Peer doesn't support encryption, which is required");
}
- SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
- *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
- }
- } catch (const TDtorException&) {
- throw; // we can't use actor system when handling this exception
- } catch (...) {
- if (isBrokerActive) {
- Send(HandshakeBroker, new TEvHandshakeBrokerFree());
}
- throw;
+ } catch (const TExHandshakeFailed&) {
+ ProgramInfo.Clear();
}
- if (isBrokerActive) {
- Send(HandshakeBroker, new TEvHandshakeBrokerFree());
+ if (ProgramInfo) {
+ LOG_LOG_IC_X(NActorsServices::INTERCONNECT, "ICH04", NLog::PRI_INFO, "handshake succeeded");
+ Y_VERIFY(NextPacketFromPeer);
+ if (PollerToken) {
+ Y_VERIFY(PollerToken->RefCount() == 1);
+ PollerToken.Reset(); // ensure we are going to destroy poller token here as we will re-register the socket within other actor
+ }
+ SendToProxy(MakeHolder<TEvHandshakeDone>(std::move(Socket), PeerVirtualId, SelfVirtualId,
+ *NextPacketFromPeer, ProgramInfo->Release(), std::move(Params)));
}
+
Socket.Reset();
}
@@ -1022,11 +995,12 @@ namespace NActors {
const TActorId& peer, ui32 nodeId, ui64 nextPacket, TString peerHostName,
TSessionParams params) {
return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), self, peer, nodeId, nextPacket,
- std::move(peerHostName), std::move(params)));
+ std::move(peerHostName), std::move(params)), IActor::INTERCONNECT_HANDSHAKE);
}
IActor* CreateIncomingHandshakeActor(TInterconnectProxyCommon::TPtr common, TSocketPtr socket) {
- return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket)));
+ return new TActorCoro(MakeHolder<THandshakeActor>(std::move(common), std::move(socket)),
+ IActor::INTERCONNECT_HANDSHAKE);
}
}
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index a8c505d94d..fdf035499f 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -39,7 +39,7 @@ namespace NActors {
SetPrefix(Sprintf("InputSession %s [node %" PRIu32 "]", SelfId().ToString().data(), NodeId));
Become(&TThis::WorkingState, DeadPeerTimeout, new TEvCheckDeadPeer);
LOG_DEBUG_IC_SESSION("ICIS01", "InputSession created");
- LastReceiveTimestamp = TActivationContext::Now();
+ LastReceiveTimestamp = TActivationContext::Monotonic();
ReceiveData();
}
@@ -437,7 +437,7 @@ namespace NActors {
}
}
- LastReceiveTimestamp = TActivationContext::Now();
+ LastReceiveTimestamp = TActivationContext::Monotonic();
return true;
}
@@ -473,7 +473,7 @@ namespace NActors {
}
void TInputSessionTCP::HandleCheckDeadPeer() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
if (now >= LastReceiveTimestamp + DeadPeerTimeout) {
ReceiveData();
if (Socket && now >= LastReceiveTimestamp + DeadPeerTimeout) {
@@ -481,7 +481,7 @@ namespace NActors {
DestroySession(TDisconnectReason::DeadPeer());
}
}
- Schedule(LastReceiveTimestamp + DeadPeerTimeout - now, new TEvCheckDeadPeer);
+ Schedule(LastReceiveTimestamp + DeadPeerTimeout, new TEvCheckDeadPeer);
}
void TInputSessionTCP::HandlePingResponse(TDuration passed) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
index 7e2d8ccb94..b4cc263a4c 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.cpp
@@ -40,7 +40,6 @@ namespace NActors {
SetPrefix(Sprintf("Proxy %s [node %" PRIu32 "]", SelfId().ToString().data(), PeerNodeId));
SwitchToInitialState();
- PassAwayTimestamp = TActivationContext::Now() + TDuration::Seconds(15);
LOG_INFO_IC("ICP01", "ready to work");
}
@@ -563,7 +562,7 @@ namespace NActors {
ValidateEvent(ev, "EnqueueSessionEvent");
const ui32 size = ev->GetSize();
PendingSessionEventsSize += size;
- PendingSessionEvents.emplace_back(TActivationContext::Now() + Common->Settings.MessagePendingTimeout, size, ev);
+ PendingSessionEvents.emplace_back(TActivationContext::Monotonic() + Common->Settings.MessagePendingTimeout, size, ev);
ScheduleCleanupEventQueue();
CleanupEventQueue();
}
@@ -810,7 +809,7 @@ namespace NActors {
if (!CleanupEventQueueScheduled && PendingSessionEvents) {
// apply batching at 50 ms granularity
- Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Now()), new TEvCleanupEventQueue);
+ Schedule(Max(TDuration::MilliSeconds(50), PendingSessionEvents.front().Deadline - TActivationContext::Monotonic()), new TEvCleanupEventQueue);
CleanupEventQueueScheduled = true;
}
}
@@ -827,7 +826,7 @@ namespace NActors {
void TInterconnectProxyTCP::CleanupEventQueue() {
ICPROXY_PROFILED;
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
while (PendingSessionEvents) {
TPendingSessionEvent& ev = PendingSessionEvents.front();
if (now >= ev.Deadline || PendingSessionEventsSize > Common->Settings.MessagePendingSize) {
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
index 023e5bd1ee..b750e278e1 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_proxy.h
@@ -175,11 +175,18 @@ namespace NActors {
Become(std::forward<TArgs>(args)...);
Y_VERIFY(!Terminated || CurrentStateFunc() == &TThis::HoldByError); // ensure we never escape this state
if (CurrentStateFunc() != &TThis::PendingActivation) {
- PassAwayTimestamp = TInstant::Max();
+ PassAwayTimestamp = TMonotonic::Max();
+ } else if (DynamicPtr) {
+ PassAwayTimestamp = TActivationContext::Monotonic() + TDuration::Seconds(15);
+ if (!PassAwayScheduled) {
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
+ {}, nullptr, 0));
+ PassAwayScheduled = true;
+ }
}
}
- TInstant PassAwayTimestamp;
+ TMonotonic PassAwayTimestamp;
bool PassAwayScheduled = false;
void SwitchToInitialState() {
@@ -189,17 +196,18 @@ namespace NActors {
" PendingIncomingHandshakeEvents# %zu State# %s", LogPrefix.data(), PendingSessionEvents.size(),
PendingIncomingHandshakeEvents.size(), State);
SwitchToState(__LINE__, "PendingActivation", &TThis::PendingActivation);
- if (DynamicPtr && !PassAwayScheduled && PassAwayTimestamp != TInstant::Max()) {
- TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
- {}, nullptr, 0));
- PassAwayScheduled = true;
- }
}
void HandlePassAwayIfNeeded() {
Y_VERIFY(PassAwayScheduled);
- if (PassAwayTimestamp != TInstant::Max()) {
+ const TMonotonic now = TActivationContext::Monotonic();
+ if (now >= PassAwayTimestamp) {
PassAway();
+ } else if (PassAwayTimestamp != TMonotonic::Max()) {
+ TActivationContext::Schedule(PassAwayTimestamp, new IEventHandle(EvPassAwayIfNeeded, 0, SelfId(),
+ {}, nullptr, 0));
+ } else {
+ PassAwayScheduled = false;
}
}
@@ -387,11 +395,11 @@ namespace NActors {
// hold all events before connection is established
struct TPendingSessionEvent {
- TInstant Deadline;
+ TMonotonic Deadline;
ui32 Size;
THolder<IEventHandle> Event;
- TPendingSessionEvent(TInstant deadline, ui32 size, TAutoPtr<IEventHandle> event)
+ TPendingSessionEvent(TMonotonic deadline, ui32 size, TAutoPtr<IEventHandle> event)
: Deadline(deadline)
, Size(size)
, Event(event)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index feb55a16ad..18df8e42ff 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -232,7 +232,7 @@ namespace NActors {
CloseOnIdleWatchdog.Arm(SelfId());
// reset activity timestamps
- LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastInputActivityTimestamp = LastPayloadActivityTimestamp = TActivationContext::Monotonic();
LOG_INFO_IC_SESSION("ICS10", "traffic start");
@@ -315,7 +315,7 @@ namespace NActors {
bool needConfirm = false;
// update activity timer for dead peer checker
- LastInputActivityTimestamp = TActivationContext::Now();
+ LastInputActivityTimestamp = TActivationContext::Monotonic();
if (msg.NumDataBytes) {
UnconfirmedBytes += msg.NumDataBytes;
@@ -326,7 +326,7 @@ namespace NActors {
}
// reset payload watchdog that controls close-on-idle behaviour
- LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastPayloadActivityTimestamp = TActivationContext::Monotonic();
CloseOnIdleWatchdog.Reset();
}
@@ -654,7 +654,7 @@ namespace NActors {
void TInterconnectSessionTCP::SetForcePacketTimestamp(TDuration period) {
if (period != TDuration::Max()) {
- const TInstant when = TActivationContext::Now() + period;
+ const TMonotonic when = TActivationContext::Monotonic() + period;
if (when < ForcePacketTimestamp) {
ForcePacketTimestamp = when;
ScheduleFlush();
@@ -664,7 +664,7 @@ namespace NActors {
void TInterconnectSessionTCP::ScheduleFlush() {
if (FlushSchedule.empty() || ForcePacketTimestamp < FlushSchedule.top()) {
- Schedule(ForcePacketTimestamp - TActivationContext::Now(), new TEvFlush);
+ Schedule(ForcePacketTimestamp, new TEvFlush);
FlushSchedule.push(ForcePacketTimestamp);
MaxFlushSchedule = Max(MaxFlushSchedule, FlushSchedule.size());
++FlushEventsScheduled;
@@ -672,7 +672,7 @@ namespace NActors {
}
void TInterconnectSessionTCP::HandleFlush() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
while (FlushSchedule && now >= FlushSchedule.top()) {
FlushSchedule.pop();
}
@@ -682,14 +682,14 @@ namespace NActors {
++ConfirmPacketsForcedByTimeout;
++FlushEventsProcessed;
MakePacket(false); // just generate confirmation packet if we have preconditions for this
- } else if (ForcePacketTimestamp != TInstant::Max()) {
+ } else if (ForcePacketTimestamp != TMonotonic::Max()) {
ScheduleFlush();
}
}
}
void TInterconnectSessionTCP::ResetFlushLogic() {
- ForcePacketTimestamp = TInstant::Max();
+ ForcePacketTimestamp = TMonotonic::Max();
UnconfirmedBytes = 0;
const TDuration ping = Proxy->Common->Settings.PingPeriod;
if (ping != TDuration::Zero() && !NumEventsInReadyChannels) {
@@ -761,7 +761,7 @@ namespace NActors {
}
// update payload activity timer
- LastPayloadActivityTimestamp = TActivationContext::Now();
+ LastPayloadActivityTimestamp = TActivationContext::Monotonic();
} else if (pingMask) {
serial = *pingMask;
@@ -923,7 +923,7 @@ namespace NActors {
flagState = EFlag::GREEN;
do {
- auto lastInputDelay = TActivationContext::Now() - LastInputActivityTimestamp;
+ auto lastInputDelay = TActivationContext::Monotonic() - LastInputActivityTimestamp;
if (lastInputDelay * 4 >= GetDeadPeerTimeout() * 3) {
flagState = EFlag::ORANGE;
break;
@@ -1006,7 +1006,7 @@ namespace NActors {
}
void TInterconnectSessionTCP::IssuePingRequest() {
- const TInstant now = TActivationContext::Now();
+ const TMonotonic now = TActivationContext::Monotonic();
if (now >= LastPingTimestamp + PingPeriodicity) {
LOG_DEBUG_IC_SESSION("ICS22", "Issuing ping request");
if (Socket) {
@@ -1175,6 +1175,8 @@ namespace NActors {
ui32 unsentQueueSize = Socket ? Socket->GetUnsentQueueSize() : 0;
+ const TMonotonic now = TActivationContext::Monotonic();
+
MON_VAR(OutputStuckFlag)
MON_VAR(SendQueue.size())
MON_VAR(SendQueueCache.size())
@@ -1184,8 +1186,8 @@ namespace NActors {
MON_VAR(InflightDataAmount)
MON_VAR(unsentQueueSize)
MON_VAR(SendBufferSize)
- MON_VAR(LastInputActivityTimestamp)
- MON_VAR(LastPayloadActivityTimestamp)
+ MON_VAR(now - LastInputActivityTimestamp)
+ MON_VAR(now - LastPayloadActivityTimestamp)
MON_VAR(LastHandshakeDone)
MON_VAR(OutputCounter)
MON_VAR(LastSentSerial)
@@ -1204,7 +1206,7 @@ namespace NActors {
clockSkew = Sprintf("+%s", TDuration::MicroSeconds(x).ToString().data());
}
- MON_VAR(LastPingTimestamp)
+ MON_VAR(now - LastPingTimestamp)
MON_VAR(GetPingRTT())
MON_VAR(clockSkew)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 51c5bfa453..598a5c9220 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -266,7 +266,7 @@ namespace NActors {
}
const TDuration DeadPeerTimeout;
- TInstant LastReceiveTimestamp;
+ TMonotonic LastReceiveTimestamp;
void HandleCheckDeadPeer();
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -413,15 +413,15 @@ namespace NActors {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// pinger
- TInstant LastPingTimestamp;
+ TMonotonic LastPingTimestamp;
static constexpr TDuration PingPeriodicity = TDuration::Seconds(1);
void IssuePingRequest();
void Handle(TEvProcessPingRequest::TPtr ev);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- TInstant LastInputActivityTimestamp;
- TInstant LastPayloadActivityTimestamp;
+ TMonotonic LastInputActivityTimestamp;
+ TMonotonic LastPayloadActivityTimestamp;
TWatchdogTimer<TEvCheckCloseOnIdle> CloseOnIdleWatchdog;
TWatchdogTimer<TEvCheckLostConnection> LostConnectionWatchdog;
@@ -481,8 +481,8 @@ namespace NActors {
// time at which we want to send confirmation packet even if there was no outgoing data
ui64 UnconfirmedBytes = 0;
- TInstant ForcePacketTimestamp = TInstant::Max();
- TPriorityQueue<TInstant, TVector<TInstant>, std::greater<TInstant>> FlushSchedule;
+ TMonotonic ForcePacketTimestamp = TMonotonic::Max();
+ TPriorityQueue<TMonotonic, TVector<TMonotonic>, std::greater<TMonotonic>> FlushSchedule;
size_t MaxFlushSchedule = 0;
ui64 FlushEventsScheduled = 0;
ui64 FlushEventsProcessed = 0;
diff --git a/library/cpp/actors/interconnect/watchdog_timer.h b/library/cpp/actors/interconnect/watchdog_timer.h
index c190105a59..fe62006e3b 100644
--- a/library/cpp/actors/interconnect/watchdog_timer.h
+++ b/library/cpp/actors/interconnect/watchdog_timer.h
@@ -8,7 +8,7 @@ namespace NActors {
const TDuration Timeout;
const TCallback Callback;
- TInstant LastResetTimestamp;
+ TMonotonic LastResetTimestamp;
TEvent* ExpectedEvent = nullptr;
ui32 Iteration = 0;
@@ -29,7 +29,7 @@ namespace NActors {
}
void Reset() {
- LastResetTimestamp = TActivationContext::Now();
+ LastResetTimestamp = TActivationContext::Monotonic();
}
void Disarm() {
@@ -38,11 +38,11 @@ namespace NActors {
void operator()(typename TEvent::TPtr& ev) {
if (ev->Get() == ExpectedEvent) {
- const TInstant now = TActivationContext::Now();
- const TInstant barrier = LastResetTimestamp + Timeout;
+ const TMonotonic now = TActivationContext::Monotonic();
+ const TMonotonic barrier = LastResetTimestamp + Timeout;
if (now < barrier) {
// the time hasn't come yet
- Schedule(barrier - now, TActorIdentity(ev->Recipient));
+ Schedule(barrier, TActorIdentity(ev->Recipient));
} else if (Iteration < NumIterationsBeforeFiring) {
// time has come, but we will still give actor a chance to process some messages and rearm timer
++Iteration;
@@ -57,7 +57,8 @@ namespace NActors {
}
private:
- void Schedule(TDuration timeout, const TActorIdentity& actor) {
+ template<typename T>
+ void Schedule(T&& timeout, const TActorIdentity& actor) {
auto ev = MakeHolder<TEvent>();
ExpectedEvent = ev.Get();
Iteration = 0;
diff --git a/library/cpp/actors/util/rc_buf.h b/library/cpp/actors/util/rc_buf.h
index 1a492064ee..a2bce33fba 100644
--- a/library/cpp/actors/util/rc_buf.h
+++ b/library/cpp/actors/util/rc_buf.h
@@ -113,10 +113,11 @@ public:
private:
static int Compare(const TContiguousSpan& x, const TContiguousSpan& y) {
- if (int res = std::memcmp(x.data(), y.data(), std::min(x.size(), y.size())); res) {
- return res;
+ int res = 0;
+ if (const size_t common = std::min(x.size(), y.size())) {
+ res = std::memcmp(x.data(), y.data(), common);
}
- return x.size() - y.size();
+ return res ? res : x.size() - y.size();
}
};
diff --git a/library/cpp/charset/CMakeLists.darwin.txt b/library/cpp/charset/CMakeLists.darwin.txt
index cb5c16891d..ed68dc36f8 100644
--- a/library/cpp/charset/CMakeLists.darwin.txt
+++ b/library/cpp/charset/CMakeLists.darwin.txt
@@ -6,13 +6,11 @@
# original buildsystem will not be accepted.
-find_package(Iconv REQUIRED)
add_library(library-cpp-charset)
target_link_libraries(library-cpp-charset PUBLIC
contrib-libs-cxxsupp
yutil
- Iconv::Iconv
)
target_sources(library-cpp-charset PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/charset/generated/cp_data.cpp
diff --git a/library/cpp/charset/CMakeLists.linux-aarch64.txt b/library/cpp/charset/CMakeLists.linux-aarch64.txt
index 03b01e7c69..1d9903b843 100644
--- a/library/cpp/charset/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/charset/CMakeLists.linux-aarch64.txt
@@ -6,14 +6,12 @@
# original buildsystem will not be accepted.
-find_package(Iconv REQUIRED)
add_library(library-cpp-charset)
target_link_libraries(library-cpp-charset PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
- Iconv::Iconv
)
target_sources(library-cpp-charset PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/charset/generated/cp_data.cpp
diff --git a/library/cpp/charset/CMakeLists.linux.txt b/library/cpp/charset/CMakeLists.linux.txt
index 03b01e7c69..1d9903b843 100644
--- a/library/cpp/charset/CMakeLists.linux.txt
+++ b/library/cpp/charset/CMakeLists.linux.txt
@@ -6,14 +6,12 @@
# original buildsystem will not be accepted.
-find_package(Iconv REQUIRED)
add_library(library-cpp-charset)
target_link_libraries(library-cpp-charset PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
yutil
- Iconv::Iconv
)
target_sources(library-cpp-charset PRIVATE
${CMAKE_SOURCE_DIR}/library/cpp/charset/generated/cp_data.cpp
diff --git a/library/cpp/charset/iconv.cpp b/library/cpp/charset/iconv.cpp
index dc604c8492..0b6dbace9e 100644
--- a/library/cpp/charset/iconv.cpp
+++ b/library/cpp/charset/iconv.cpp
@@ -5,20 +5,22 @@
using namespace NICONVPrivate;
TDescriptor::TDescriptor(const char* from, const char* to)
- : Descriptor_(libiconv_open(to, from))
+ : Descriptor_(iconv_open(to, from))
, From_(from)
, To_(to)
{
+#if defined(USE_ICONV_EXTENSIONS)
if (!Invalid()) {
int temp = 1;
libiconvctl(Descriptor_, ICONV_SET_DISCARD_ILSEQ, &temp);
}
+#endif
}
TDescriptor::~TDescriptor() {
if (!Invalid()) {
- libiconv_close(Descriptor_);
+ iconv_close(Descriptor_);
}
}
@@ -31,7 +33,7 @@ size_t NICONVPrivate::RecodeImpl(const TDescriptor& descriptor, const char* in,
char* outPtr = out;
size_t inSizeMod = inSize;
size_t outSizeMod = outSize;
- size_t res = libiconv(descriptor.Get(), &inPtr, &inSizeMod, &outPtr, &outSizeMod);
+ size_t res = iconv(descriptor.Get(), &inPtr, &inSizeMod, &outPtr, &outSizeMod);
read = inSize - inSizeMod;
written = outSize - outSizeMod;
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
index c4b7e9c040..4e869ef5f6 100644
--- a/library/cpp/grpc/server/grpc_request.h
+++ b/library/cpp/grpc/server/grpc_request.h
@@ -113,6 +113,10 @@ public:
return FinishPromise_.GetFuture();
}
+ bool IsClientLost() const override {
+ return ClientLost_.load();
+ }
+
TString GetPeer() const override {
return TString(this->Context.peer());
}
@@ -496,6 +500,7 @@ private:
void OnFinish(EQueueEventStatus evStatus) {
if (this->Context.IsCancelled()) {
+ ClientLost_.store(true);
FinishPromise_.SetValue(EFinishStatus::CANCEL);
} else {
FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR);
@@ -556,6 +561,7 @@ private:
NThreading::TPromise<EFinishStatus> FinishPromise_;
bool SkipUpdateCountersOnError = false;
IStreamAdaptor::TPtr StreamAdaptor_;
+ std::atomic<bool> ClientLost_ = false;
};
template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer>
diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h
index 42b78ed7df..60b38805ed 100644
--- a/library/cpp/grpc/server/grpc_request_base.h
+++ b/library/cpp/grpc/server/grpc_request_base.h
@@ -116,6 +116,9 @@ public:
//! Returns true if server is using ssl
virtual bool SslServer() const = 0;
+
+ //! Returns true if client was not interested in result (but we still must send response to make grpc happy)
+ virtual bool IsClientLost() const = 0;
};
} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
index 97472206e2..0c05c7404e 100644
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ b/library/cpp/grpc/server/grpc_server.cpp
@@ -130,11 +130,16 @@ void TGRpcServer::Start() {
builder.SetOption(std::make_unique<TKeepAliveOption>());
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- CQS_.push_back(builder.AddCompletionQueue());
- }
- } else {
+ size_t completionQueueCount = 1;
+ if (Options_.WorkersPerCompletionQueue) {
+ size_t threadsPerQueue = Max(std::size_t{1}, Options_.WorkersPerCompletionQueue);
+ completionQueueCount = (Options_.WorkerThreads + threadsPerQueue - 1) / threadsPerQueue; // ceiling
+ } else if (Options_.UseCompletionQueuePerThread) {
+ completionQueueCount = Options_.WorkerThreads;
+ }
+
+ CQS_.reserve(completionQueueCount);
+ for (size_t i = 0; i < completionQueueCount; ++i) {
CQS_.push_back(builder.AddCompletionQueue());
}
@@ -159,23 +164,15 @@ void TGRpcServer::Start() {
size_t index = 0;
for (IGRpcServicePtr service : Services_) {
// TODO: provide something else for services instead of ServerCompletionQueue
- service->InitService(CQS_[index++ % CQS_.size()].get(), Options_.Logger);
+ service->InitService(CQS_, Options_.Logger, index++);
}
- if (Options_.UseCompletionQueuePerThread) {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[i];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
- } else {
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[0];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
+ Ts.reserve(Options_.WorkerThreads);
+ for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
+ auto* cq = &CQS_[i % CQS_.size()];
+ Ts.push_back(SystemThreadFactory()->Run([cq] {
+ PullEvents(cq->get());
+ }));
}
if (Options_.ExternalListener) {
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
index c9b48a6676..6da5076046 100644
--- a/library/cpp/grpc/server/grpc_server.h
+++ b/library/cpp/grpc/server/grpc_server.h
@@ -54,7 +54,14 @@ struct TServerOptions {
//! Number of worker threads.
DECLARE_FIELD(WorkerThreads, size_t, 2);
- //! Create one completion queue per thread
+ //! Number of workers per completion queue, i.e. when
+ // WorkerThreads=8 and PriorityWorkersPerCompletionQueue=2
+ // there will be 4 completion queues. When set to 0 then
+ // only UseCompletionQueuePerThread affects number of CQ.
+ DECLARE_FIELD(WorkersPerCompletionQueue, size_t, 0);
+
+ //! Obsolete. Create one completion queue per thread.
+ // Setting true equals to the WorkersPerCompletionQueue=1
DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
//! Memory quota size for grpc server in bytes. Zero means unlimited.
@@ -122,6 +129,15 @@ class ICancelableContext {
public:
virtual void Shutdown() = 0;
virtual ~ICancelableContext() = default;
+
+private:
+ template<class T>
+ friend class TGrpcServiceBase;
+
+ // Shard assigned by RegisterRequestCtx. This field is not thread-safe
+ // because RegisterRequestCtx may only be called once for a single service,
+ // so it's only assigned once.
+ size_t ShardIndex = size_t(-1);
};
template <class TLimit>
@@ -166,7 +182,17 @@ class IGRpcService: public TThrRefBase {
public:
virtual grpc::Service* GetService() = 0;
virtual void StopService() noexcept = 0;
+
virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
+
+ virtual void InitService(
+ const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs,
+ TLoggerPtr logger,
+ size_t index)
+ {
+ InitService(cqs[index % cqs.size()].get(), logger);
+ }
+
virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
virtual bool IsUnsafeToShutdown() const = 0;
virtual size_t RequestsInProgress() const = 0;
@@ -236,13 +262,15 @@ public:
using TCurrentGRpcService = T;
void StopService() noexcept override {
- with_lock(Lock_) {
- AtomicSet(ShuttingDown_, 1);
-
- // Send TryCansel to event (can be send after finishing).
- // Actual dtors will be called from grpc thread, so deadlock impossible
- for (auto* request : Requests_) {
- request->Shutdown();
+ AtomicSet(ShuttingDown_, 1);
+
+ for (auto& shard : Shards_) {
+ with_lock(shard.Lock_) {
+ // Send TryCansel to event (can be send after finishing).
+ // Actual dtors will be called from grpc thread, so deadlock impossible
+ for (auto* request : shard.Requests_) {
+ request->Shutdown();
+ }
}
}
}
@@ -263,8 +291,10 @@ public:
size_t RequestsInProgress() const override {
size_t c = 0;
- with_lock(Lock_) {
- c = Requests_.size();
+ for (auto& shard : Shards_) {
+ with_lock(shard.Lock_) {
+ c += shard.Requests_.size();
+ }
}
return c;
}
@@ -290,23 +320,29 @@ public:
}
bool RegisterRequestCtx(ICancelableContext* req) {
- with_lock(Lock_) {
- auto r = Requests_.emplace(req);
- Y_VERIFY(r.second, "Ctx already registered");
+ if (Y_LIKELY(req->ShardIndex == size_t(-1))) {
+ req->ShardIndex = NextShard_.fetch_add(1, std::memory_order_relaxed) % Shards_.size();
+ }
+ auto& shard = Shards_[req->ShardIndex];
+ with_lock(shard.Lock_) {
if (IsShuttingDown()) {
- // Server is already shutting down
- Requests_.erase(r.first);
return false;
}
+
+ auto r = shard.Requests_.emplace(req);
+ Y_VERIFY(r.second, "Ctx already registered");
}
return true;
}
void DeregisterRequestCtx(ICancelableContext* req) {
- with_lock(Lock_) {
- Y_VERIFY(Requests_.erase(req), "Ctx is not registered");
+ Y_VERIFY(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index");
+
+ auto& shard = Shards_[req->ShardIndex];
+ with_lock(shard.Lock_) {
+ Y_VERIFY(shard.Requests_.erase(req), "Ctx is not registered");
}
}
@@ -325,8 +361,14 @@ private:
bool SslServer_ = false;
bool NeedAuth_ = false;
- THashSet<ICancelableContext*> Requests_;
- TAdaptiveLock Lock_;
+ struct TShard {
+ TAdaptiveLock Lock_;
+ THashSet<ICancelableContext*> Requests_;
+ };
+
+ // Note: benchmarks showed 4 shards is enough to scale to ~30 threads
+ TVector<TShard> Shards_{ size_t(4) };
+ std::atomic<size_t> NextShard_{ 0 };
};
class TGRpcServer {
diff --git a/library/cpp/unified_agent_client/CMakeLists.darwin.txt b/library/cpp/unified_agent_client/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..bad1e374fd
--- /dev/null
+++ b/library/cpp/unified_agent_client/CMakeLists.darwin.txt
@@ -0,0 +1,63 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(proto)
+
+add_library(library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp
+)
+generate_enum_serilization(library-cpp-unified_agent_client
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/grpc_io.h
+)
+
+add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client.global PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp
+)
diff --git a/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt b/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..ea3391bf18
--- /dev/null
+++ b/library/cpp/unified_agent_client/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,65 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(proto)
+
+add_library(library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp
+)
+generate_enum_serilization(library-cpp-unified_agent_client
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/grpc_io.h
+)
+
+add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client.global PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp
+)
diff --git a/library/cpp/unified_agent_client/CMakeLists.linux.txt b/library/cpp/unified_agent_client/CMakeLists.linux.txt
new file mode 100644
index 0000000000..ea3391bf18
--- /dev/null
+++ b/library/cpp/unified_agent_client/CMakeLists.linux.txt
@@ -0,0 +1,65 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(proto)
+
+add_library(library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/backend_creator.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/client_impl.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/counters.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/helpers.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_status_code.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/clock.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/duration_counter.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/logger.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/throttling.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto_weighing.cpp
+)
+generate_enum_serilization(library-cpp-unified_agent_client
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/grpc_io.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/grpc_io.h
+)
+
+add_global_library_for(library-cpp-unified_agent_client.global library-cpp-unified_agent_client)
+target_include_directories(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc
+ ${CMAKE_SOURCE_DIR}/contrib/libs/grpc/include
+)
+target_link_libraries(library-cpp-unified_agent_client.global PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ cpp-logger-global
+ cpp-threading-future
+ cpp-monlib-dynamic_counters
+ cpp-unified_agent_client-proto
+ tools-enum_parser-enum_serialization_runtime
+)
+target_sources(library-cpp-unified_agent_client.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/registrar.cpp
+)
diff --git a/library/cpp/unified_agent_client/CMakeLists.txt b/library/cpp/unified_agent_client/CMakeLists.txt
new file mode 100644
index 0000000000..3e0811fb22
--- /dev/null
+++ b/library/cpp/unified_agent_client/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/library/cpp/unified_agent_client/async_joiner.h b/library/cpp/unified_agent_client/async_joiner.h
new file mode 100644
index 0000000000..ce392ef7bc
--- /dev/null
+++ b/library/cpp/unified_agent_client/async_joiner.h
@@ -0,0 +1,42 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+
+namespace NUnifiedAgent {
+ class TAsyncJoiner {
+ public:
+ inline TAsyncJoiner()
+ : Promise(NThreading::NewPromise())
+ , Refs(1)
+ {
+ }
+
+ inline i64 Ref(i64 count = 1) noexcept {
+ const auto result = Refs.fetch_add(count);
+ Y_VERIFY(result >= 1, "already joined");
+ return result;
+ }
+
+ inline i64 UnRef() noexcept {
+ const auto prev = Refs.fetch_sub(1);
+ Y_VERIFY(prev >= 1);
+ if (prev == 1) {
+ auto p = Promise;
+ p.SetValue();
+ }
+ return prev;
+ }
+
+ inline NThreading::TFuture<void> Join() noexcept {
+ auto result = Promise;
+ UnRef();
+ return result;
+ }
+
+ private:
+ NThreading::TPromise<void> Promise;
+ std::atomic<i64> Refs;
+ };
+
+ using TAsyncJoinerToken = TIntrusivePtr<TAsyncJoiner>;
+}
diff --git a/library/cpp/unified_agent_client/backend.cpp b/library/cpp/unified_agent_client/backend.cpp
new file mode 100644
index 0000000000..b3c4b4ebcf
--- /dev/null
+++ b/library/cpp/unified_agent_client/backend.cpp
@@ -0,0 +1,112 @@
+#include "backend.h"
+
+#include <library/cpp/unified_agent_client/enum.h>
+
+#include <library/cpp/logger/record.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/guid.h>
+#include <util/generic/serialized_enum.h>
+
+namespace NUnifiedAgent {
+ namespace {
+ class TDefaultRecordConverter : public IRecordConverter {
+ public:
+ TDefaultRecordConverter(bool stripTrailingNewLine)
+ : StripTrailingNewLine(stripTrailingNewLine)
+ , PriorityKey("_priority")
+ {
+ }
+
+ TClientMessage Convert(const TLogRecord& rec) const override {
+ const auto stripTrailingNewLine = StripTrailingNewLine &&
+ rec.Len > 0 && rec.Data[rec.Len - 1] == '\n';
+
+ THashMap<TString, TString> metaFlags{{PriorityKey, NameOf(rec.Priority)}};
+ metaFlags.insert(rec.MetaFlags.begin(), rec.MetaFlags.end());
+
+ return {
+ TString(rec.Data, stripTrailingNewLine ? rec.Len - 1 : rec.Len),
+ std::move(metaFlags)
+ };
+ }
+
+ private:
+ const bool StripTrailingNewLine;
+ const TString PriorityKey;
+ };
+
+ class TClientSessionAdapter: public TLogBackend {
+ public:
+ explicit TClientSessionAdapter(const TClientSessionPtr& session, THolder<IRecordConverter> recordConverter)
+ : Session(session)
+ , RecordConverter(std::move(recordConverter))
+ {
+ }
+
+ void WriteData(const TLogRecord& rec) override {
+ Session->Send(RecordConverter->Convert(rec));
+ }
+
+ void ReopenLog() override {
+ }
+
+ private:
+ TClientSessionPtr Session;
+ THolder<IRecordConverter> RecordConverter;
+ };
+
+ class TSessionHolder {
+ protected:
+ TSessionHolder(const TClientParameters& parameters, const TSessionParameters& sessionParameters)
+ : Client(MakeClient(parameters))
+ , Session(Client->CreateSession(sessionParameters))
+ {
+ }
+
+ protected:
+ TClientPtr Client;
+ TClientSessionPtr Session;
+ };
+
+ class TAgentLogBackend: private TSessionHolder, public TClientSessionAdapter {
+ public:
+ TAgentLogBackend(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters,
+ THolder<IRecordConverter> recordConverter)
+ : TSessionHolder(parameters, sessionParameters)
+ , TClientSessionAdapter(TSessionHolder::Session, std::move(recordConverter))
+ {
+ }
+
+ ~TAgentLogBackend() override {
+ TSessionHolder::Session->Close();
+ }
+ };
+ }
+
+ THolder<IRecordConverter> MakeDefaultRecordConverter(bool stripTrailingNewLine) {
+ return MakeHolder<TDefaultRecordConverter>(stripTrailingNewLine);
+ }
+
+ THolder<TLogBackend> AsLogBackend(const TClientSessionPtr& session, bool stripTrailingNewLine) {
+ return MakeHolder<TClientSessionAdapter>(session, MakeDefaultRecordConverter(stripTrailingNewLine));
+ }
+
+ THolder<TLogBackend> MakeLogBackend(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters,
+ THolder<IRecordConverter> recordConverter)
+ {
+ if (!recordConverter) {
+ recordConverter = MakeDefaultRecordConverter();
+ }
+ return MakeHolder<TAgentLogBackend>(parameters, sessionParameters, std::move(recordConverter));
+ }
+
+ THolder<::TLog> MakeLog(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters,
+ THolder<IRecordConverter> recordConverter)
+ {
+ return MakeHolder<::TLog>(MakeLogBackend(parameters, sessionParameters, std::move(recordConverter)));
+ }
+}
diff --git a/library/cpp/unified_agent_client/backend.h b/library/cpp/unified_agent_client/backend.h
new file mode 100644
index 0000000000..41e8d146b3
--- /dev/null
+++ b/library/cpp/unified_agent_client/backend.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/client.h>
+
+#include <library/cpp/logger/backend.h>
+#include <library/cpp/logger/log.h>
+
+namespace NUnifiedAgent {
+ class IRecordConverter {
+ public:
+ virtual ~IRecordConverter() = default;
+
+ virtual TClientMessage Convert(const TLogRecord&) const = 0;
+ };
+
+ THolder<IRecordConverter> MakeDefaultRecordConverter(bool stripTrailingNewLine = true);
+
+ THolder<TLogBackend> AsLogBackend(const TClientSessionPtr& session, bool stripTrailingNewLine = true);
+
+ THolder<TLogBackend> MakeLogBackend(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters = {},
+ THolder<IRecordConverter> recordConverter = {});
+
+ THolder<::TLog> MakeLog(const TClientParameters& parameters,
+ const TSessionParameters& sessionParameters = {},
+ THolder<IRecordConverter> recordConverter = {});
+}
diff --git a/library/cpp/unified_agent_client/backend_creator.cpp b/library/cpp/unified_agent_client/backend_creator.cpp
new file mode 100644
index 0000000000..825e3ebd2b
--- /dev/null
+++ b/library/cpp/unified_agent_client/backend_creator.cpp
@@ -0,0 +1,63 @@
+#include "backend_creator.h"
+#include <library/cpp/logger/global/global.h>
+
+namespace NUnifiedAgent {
+
+
+ TLogBackendCreator::TLogBackendCreator()
+ : TLogBackendCreatorBase("unified_agent")
+ {}
+
+ bool TLogBackendCreator::Init(const IInitContext& ctx) {
+ if(TString socket = ctx.GetOrElse("Uri", TString())) {
+ ClientParams = MakeHolder<TClientParameters>(socket);
+ } else {
+ Cdbg << "Uri not set for unified_agent log backend" << Endl;
+ return false;
+ }
+ TString secretKey;
+ ctx.GetValue("SharedSecretKey", secretKey);
+ if (secretKey) {
+ ClientParams->SharedSecretKey = secretKey;
+ }
+ ctx.GetValue("MaxInflightBytes", ClientParams->MaxInflightBytes);
+ ctx.GetValue("GrpcSendDelay", ClientParams->GrpcSendDelay);
+ size_t rateLimit;
+ if (ctx.GetValue("LogRateLimit", rateLimit)) {
+ ClientParams->LogRateLimitBytes = rateLimit;
+ }
+ ctx.GetValue("GrpcReconnectDelay", ClientParams->GrpcReconnectDelay);
+ ctx.GetValue("GrpcMaxMessageSize", ClientParams->GrpcMaxMessageSize);
+ const auto ownLogger = ctx.GetChildren("OwnLogger");
+ if (!ownLogger.empty() && ownLogger.front()->GetOrElse("LoggerType", TString()) != "global") {
+ OwnLogger = ILogBackendCreator::Create(*ownLogger.front());
+ TLog log;
+ log.ResetBackend(OwnLogger->CreateLogBackend());
+ ClientParams->SetLog(log);
+ }
+ return true;
+ }
+
+
+ void TLogBackendCreator::DoToJson(NJson::TJsonValue& value) const {
+ value["Uri"] = ClientParams->Uri;
+ if (ClientParams->SharedSecretKey) {
+ value["SharedSecretKey"] = *ClientParams->SharedSecretKey;
+ }
+ value["MaxInflightBytes"] = ClientParams->MaxInflightBytes;
+ value["GrpcSendDelay"] = ClientParams->GrpcSendDelay.ToString();
+ if (ClientParams->LogRateLimitBytes) {
+ value["LogRateLimit"] = *ClientParams->LogRateLimitBytes;
+ }
+ value["GrpcReconnectDelay"] = ClientParams->GrpcReconnectDelay.ToString();
+ value["GrpcMaxMessageSize"] = ClientParams->GrpcMaxMessageSize;
+ if (OwnLogger) {
+ OwnLogger->ToJson(value["OwnLogger"].AppendValue(NJson::JSON_MAP));
+ }
+ }
+
+ THolder<TLogBackend> TLogBackendCreator::DoCreateLogBackend() const {
+ return MakeLogBackend(*ClientParams);
+ }
+
+}
diff --git a/library/cpp/unified_agent_client/backend_creator.h b/library/cpp/unified_agent_client/backend_creator.h
new file mode 100644
index 0000000000..04928f616c
--- /dev/null
+++ b/library/cpp/unified_agent_client/backend_creator.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include "backend.h"
+#include <library/cpp/logger/backend_creator.h>
+
+namespace NUnifiedAgent {
+
+ class TLogBackendCreator: public TLogBackendCreatorBase {
+ public:
+ TLogBackendCreator();
+ bool Init(const IInitContext& ctx) override;
+ static TFactory::TRegistrator<TLogBackendCreator> Registrar;
+
+ protected:
+ void DoToJson(NJson::TJsonValue& value) const override;
+
+ private:
+ THolder<TLogBackend> DoCreateLogBackend() const override;
+
+ private:
+ THolder<TClientParameters> ClientParams;
+ THolder<ILogBackendCreator> OwnLogger;
+ };
+
+}
diff --git a/library/cpp/unified_agent_client/client.h b/library/cpp/unified_agent_client/client.h
new file mode 100644
index 0000000000..62e1210803
--- /dev/null
+++ b/library/cpp/unified_agent_client/client.h
@@ -0,0 +1,256 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/counters.h>
+
+#include <library/cpp/logger/log.h>
+#include <library/cpp/threading/future/future.h>
+
+#include <util/datetime/base.h>
+#include <util/generic/hash.h>
+#include <util/generic/maybe.h>
+#include <util/generic/string.h>
+
+namespace NUnifiedAgent {
+ struct TClientParameters {
+ // uri format https://github.com/grpc/grpc/blob/master/doc/naming.md
+ // for example: unix:///unified_agent for unix domain sockets or localhost:12345 for tcp
+ explicit TClientParameters(const TString& uri);
+
+ // Simple way to protect against writing to unintended/invalid Unified Agent endpoint.
+ // Must correspond to 'shared_secret_key' grpc input parameter
+ // (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6333542#L219),
+ // session would end with error otherwise.
+ //
+ // Default: not set
+ TClientParameters& SetSharedSecretKey(const TString& sharedSecretKey) {
+ SharedSecretKey = sharedSecretKey;
+ return *this;
+ }
+
+ // Max bytes count that have been received by client session but not acknowledged yet.
+ // When exceeded, new messages will be discarded, an error message
+ // will be written to the TLog instance and drop counter will be incremented.
+ //
+ // Default: 10 mb
+ TClientParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
+ MaxInflightBytes = maxInflightBytes;
+ return *this;
+ }
+
+ // TLog instance for client library's own logs.
+ //
+ // Default: TLoggerOperator<TGlobalLog>::Log()
+ TClientParameters& SetLog(TLog& log) {
+ Log = log;
+ return *this;
+ }
+
+ // Throttle client library log by rate limit in bytes, excess will be discarded.
+ //
+ // Default: not set
+ TClientParameters& SetLogRateLimit(size_t bytesPerSec) {
+ LogRateLimitBytes = bytesPerSec;
+ return *this;
+ }
+
+ // Try to establish new grpc session if the current one become broken.
+ // Session may break either due to agent unavailability, or the agent itself may
+ // reject new session creation if it does not satisfy certain
+ // conditions - shared_secret_key does not match, the session creation rate has been
+ // exceeded, invalid session metadata has been used and so on.
+ // Attempts to establish a grpc session will continue indefinitely.
+ //
+ // Default: 50 millis
+ TClientParameters& SetGrpcReconnectDelay(TDuration delay) {
+ GrpcReconnectDelay = delay;
+ return *this;
+ }
+
+ // Grpc usually writes data to the socket faster than it comes from the client.
+ // This means that it's possible that each TClientMessage would be sent in it's own grpc message.
+ // This is expensive in terms of cpu, since grpc makes at least one syscall
+ // for each message on the sender and receiver sides.
+ // To avoid a large number of syscalls, the client holds incoming messages
+ // in internal buffer in hope of being able to assemble bigger grpc batch.
+ // This parameter sets the timeout for this delay - from IClientSession::Send
+ // call to the actual sending of the corresponding grpc message.
+ //
+ // Default: 10 millis.
+ TClientParameters& SetGrpcSendDelay(TDuration delay) {
+ GrpcSendDelay = delay;
+ return *this;
+ }
+
+ // Client library sends messages to grpc in batches, this parameter
+ // establishes upper limit on the size of single batch in bytes.
+ // If you increase this value, don't forget to adjust max_receive_message_size (https://a.yandex-team.ru/arc/trunk/arcadia/logbroker/unified_agent/examples/all.yml?rev=6661788#L185)
+ // in grpc input config, it must be grater than GrpcMaxMessageSize.
+ //
+ // Default: 1 mb
+ TClientParameters& SetGrpcMaxMessageSize(size_t size) {
+ GrpcMaxMessageSize = size;
+ return *this;
+ }
+
+ // Enable forks handling in client library.
+ // Multiple threads and concurrent forks are all supported is this regime.
+ //
+ // Default: false
+ TClientParameters& SetEnableForkSupport(bool value) {
+ EnableForkSupport = value;
+ return *this;
+ }
+
+ // Client library counters.
+ // App can set this to some leaf of it's TDynamicCounters tree.
+ // Actual provided counters are listed in TClientCounters.
+ //
+ // Default: not set
+ TClientParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
+ return SetCounters(MakeIntrusive<TClientCounters>(counters));
+ }
+
+ TClientParameters& SetCounters(const TIntrusivePtr<TClientCounters>& counters) {
+ Counters = counters;
+ return *this;
+ }
+
+ public:
+ static const size_t DefaultMaxInflightBytes;
+ static const size_t DefaultGrpcMaxMessageSize;
+ static const TDuration DefaultGrpcSendDelay;
+
+ public:
+ TString Uri;
+ TMaybe<TString> SharedSecretKey;
+ size_t MaxInflightBytes;
+ TLog Log;
+ TMaybe<size_t> LogRateLimitBytes;
+ TDuration GrpcReconnectDelay;
+ TDuration GrpcSendDelay;
+ bool EnableForkSupport;
+ size_t GrpcMaxMessageSize;
+ TIntrusivePtr<TClientCounters> Counters;
+ };
+
+ struct TSessionParameters {
+ TSessionParameters();
+
+ // Session unique identifier.
+ // It's guaranteed that for messages with the same sessionId relative
+ // ordering of the messages will be preserved at all processing stages
+ // in library, in Unified Agent and in other systems that respect ordering (e.g., Logbroker)
+ //
+ // Default: generated automatically by Unified Agent.
+ TSessionParameters& SetSessionId(const TString& sessionId) {
+ SessionId = sessionId;
+ return *this;
+ }
+
+ // Session metadata as key-value set.
+ // Can be used by agent filters and outputs for validation/routing/enrichment/etc.
+ //
+ // Default: not set
+ TSessionParameters& SetMeta(const THashMap<TString, TString>& meta) {
+ Meta = meta;
+ return *this;
+ }
+
+ // Session counters.
+ // Actual provided counters are listed in TClientSessionCounters.
+ //
+ // Default: A single common for all sessions subgroup of client TDynamicCounters instance
+ // with label ('session': 'default').
+ TSessionParameters& SetCounters(const NMonitoring::TDynamicCounterPtr& counters) {
+ return SetCounters(MakeIntrusive<TClientSessionCounters>(counters));
+ }
+
+ TSessionParameters& SetCounters(const TIntrusivePtr<TClientSessionCounters>& counters) {
+ Counters = counters;
+ return *this;
+ }
+
+ // Max bytes count that have been received by client session but not acknowledged yet.
+ // When exceeded, new messages will be discarded, an error message
+ // will be written to the TLog instance and drop counter will be incremented.
+ //
+ // Default: value from client settings
+ TSessionParameters& SetMaxInflightBytes(size_t maxInflightBytes) {
+ MaxInflightBytes = maxInflightBytes;
+ return *this;
+ }
+
+ public:
+ TMaybe<TString> SessionId;
+ TMaybe<THashMap<TString, TString>> Meta;
+ TIntrusivePtr<TClientSessionCounters> Counters;
+ TMaybe<size_t> MaxInflightBytes;
+ };
+
+ // Message data to be sent to unified agent.
+ struct TClientMessage {
+ // Opaque message payload.
+ TString Payload;
+
+ // Message metadata as key-value set.
+ // Can be used by agent filters and outputs for validation/routing/enrichment/etc.
+ //
+ // Default: not set
+ TMaybe<THashMap<TString, TString>> Meta{};
+
+ // Message timestamp.
+ //
+ // Default: time the client library has received this instance of TClientMessage.
+ TMaybe<TInstant> Timestamp{};
+ };
+
+ // Message size as it is accounted in byte-related metrics (ReceivedBytes, InflightBytes, etc).
+ size_t SizeOf(const TClientMessage& message);
+
+ class IClientSession: public TAtomicRefCount<IClientSession> {
+ public:
+ virtual ~IClientSession() = default;
+
+ // Places the message into send queue. Actual grpc call may occur later asynchronously,
+ // based on settings GrpcSendDelay and GrpcMaxMessageSize.
+ // A message can be discarded if the limits defined by the GrpcMaxMessageSize and MaxInflightBytes
+ // settings are exceeded, or if the Close method has already been called.
+ // In this case an error message will be written to the TLog instance
+ // and drop counter will be incremented.
+ virtual void Send(TClientMessage&& message) = 0;
+
+ void Send(const TClientMessage& message) {
+ Send(TClientMessage(message));
+ }
+
+ // Waits until either all current inflight messages are
+ // acknowledged or the specified deadline is reached.
+ // Upon the deadline grpc connection would be forcefully dropped (via grpc::ClientContext::TryCancel).
+ virtual NThreading::TFuture<void> CloseAsync(TInstant deadline) = 0;
+
+ void Close(TInstant deadline) {
+ CloseAsync(deadline).Wait();
+ }
+
+ void Close(TDuration timeout = TDuration::Seconds(3)) {
+ Close(Now() + timeout);
+ }
+ };
+ using TClientSessionPtr = TIntrusivePtr<IClientSession>;
+
+ class IClient: public TAtomicRefCount<IClient> {
+ public:
+ virtual ~IClient() = default;
+
+ virtual TClientSessionPtr CreateSession(const TSessionParameters& parameters = {}) = 0;
+
+ virtual void StartTracing(ELogPriority) {
+ }
+
+ virtual void FinishTracing() {
+ }
+ };
+ using TClientPtr = TIntrusivePtr<IClient>;
+
+ TClientPtr MakeClient(const TClientParameters& parameters);
+}
diff --git a/library/cpp/unified_agent_client/client_impl.cpp b/library/cpp/unified_agent_client/client_impl.cpp
new file mode 100644
index 0000000000..4db98120fd
--- /dev/null
+++ b/library/cpp/unified_agent_client/client_impl.cpp
@@ -0,0 +1,1274 @@
+#include "client_impl.h"
+#include "helpers.h"
+
+#include <contrib/libs/grpc/include/grpc/grpc.h>
+#include <contrib/libs/grpc/src/core/lib/gpr/string.h>
+#include <contrib/libs/grpc/src/core/lib/gprpp/fork.h>
+#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
+
+#include <util/charset/utf8.h>
+#include <util/generic/size_literals.h>
+#include <util/system/env.h>
+
+using namespace NThreading;
+using namespace NMonitoring;
+
+namespace NUnifiedAgent::NPrivate {
+ std::shared_ptr<grpc::Channel> CreateChannel(const grpc::string& target) {
+ grpc::ChannelArguments args;
+ args.SetCompressionAlgorithm(GRPC_COMPRESS_NONE);
+ args.SetMaxReceiveMessageSize(Max<int>());
+ args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
+ args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 5000);
+ args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100);
+ args.SetInt(GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 200);
+ args.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
+ args.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
+ args.SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
+ args.SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 5000);
+ args.SetInt(GRPC_ARG_TCP_READ_CHUNK_SIZE, 1024*1024);
+ return grpc::CreateCustomChannel(target, grpc::InsecureChannelCredentials(), args);
+ }
+
+ void AddMeta(NUnifiedAgentProto::Request_Initialize& init, const TString& name, const TString& value) {
+ auto* metaItem = init.MutableMeta()->Add();
+ metaItem->SetName(name);
+ metaItem->SetValue(value);
+ }
+
+ std::atomic<ui64> TClient::Id{0};
+
+ TClient::TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector)
+ : Parameters(parameters)
+ , ForkProtector(forkProtector)
+ , Counters(parameters.Counters ? parameters.Counters : MakeIntrusive<TClientCounters>())
+ , Log(parameters.Log)
+ , MainLogger(Log, MakeFMaybe(Parameters.LogRateLimitBytes))
+ , Logger(MainLogger.Child(Sprintf("ua_%lu", Id.fetch_add(1))))
+ , Channel(nullptr)
+ , Stub(nullptr)
+ , ActiveCompletionQueue(nullptr)
+ , SessionLogLabel(0)
+ , ActiveSessions()
+ , Started(false)
+ , Destroyed(false)
+ , Lock()
+ {
+ MainLogger.SetDroppedBytesCounter(&Counters->ClientLogDroppedBytes);
+
+ if (ForkProtector != nullptr) {
+ ForkProtector->Register(*this);
+ }
+
+ EnsureStarted();
+
+ YLOG_INFO(Sprintf("created, uri [%s]", Parameters.Uri.c_str()));
+ }
+
+ TClient::~TClient() {
+ with_lock(Lock) {
+ Y_VERIFY(ActiveSessions.empty(), "active sessions found");
+
+ EnsureStoppedNoLock();
+
+ Destroyed = true;
+ }
+
+ if (ForkProtector != nullptr) {
+ ForkProtector->Unregister(*this);
+ }
+
+ YLOG_INFO(Sprintf("destroyed, uri [%s]", Parameters.Uri.c_str()));
+ }
+
+ TClientSessionPtr TClient::CreateSession(const TSessionParameters& parameters) {
+ return MakeIntrusive<TClientSession>(this, parameters);
+ }
+
+ void TClient::StartTracing(ELogPriority logPriority) {
+ MainLogger.StartTracing(logPriority);
+ StartGrpcTracing();
+ YLOG_INFO("tracing started");
+ }
+
+ void TClient::FinishTracing() {
+ FinishGrpcTracing();
+ MainLogger.FinishTracing();
+ YLOG_INFO("tracing finished");
+ }
+
+ void TClient::RegisterSession(TClientSession* session) {
+ with_lock(Lock) {
+ ActiveSessions.push_back(session);
+ }
+ }
+
+ void TClient::UnregisterSession(TClientSession* session) {
+ with_lock(Lock) {
+ const auto it = Find(ActiveSessions, session);
+ Y_VERIFY(it != ActiveSessions.end());
+ ActiveSessions.erase(it);
+ }
+ }
+
+ void TClient::PreFork() {
+ YLOG_INFO("pre fork started");
+
+ Lock.Acquire();
+
+ auto futures = TVector<TFuture<void>>(Reserve(ActiveSessions.size()));
+ for (auto* s: ActiveSessions) {
+ futures.push_back(s->PreFork());
+ }
+ YLOG_INFO("waiting for sessions");
+ WaitAll(futures).Wait();
+
+ EnsureStoppedNoLock();
+
+ YLOG_INFO("shutdown grpc executor");
+ grpc_core::Executor::SetThreadingAll(false);
+
+ YLOG_INFO("pre fork finished");
+ }
+
+ void TClient::PostForkParent() {
+ YLOG_INFO("post fork parent started");
+
+ if (!Destroyed) {
+ EnsureStartedNoLock();
+ }
+ Lock.Release();
+
+ for (auto* s: ActiveSessions) {
+ s->PostForkParent();
+ }
+
+ YLOG_INFO("post fork parent finished");
+ }
+
+ void TClient::PostForkChild() {
+ YLOG_INFO("post fork child started");
+
+ Lock.Release();
+
+ for (auto* s: ActiveSessions) {
+ s->PostForkChild();
+ }
+
+ YLOG_INFO("post fork child finished");
+ }
+
+ void TClient::EnsureStarted() {
+ with_lock(Lock) {
+ EnsureStartedNoLock();
+ }
+ }
+
+ void TClient::EnsureStartedNoLock() {
+ // Lock must be held
+
+ if (Started) {
+ return;
+ }
+
+ Channel = CreateChannel(Parameters.Uri);
+ Stub = NUnifiedAgentProto::UnifiedAgentService::NewStub(Channel);
+ ActiveCompletionQueue = MakeHolder<TGrpcCompletionQueueHost>();
+ ActiveCompletionQueue->Start();
+
+ Started = true;
+ }
+
+ void TClient::EnsureStoppedNoLock() {
+ // Lock must be held
+
+ if (!Started) {
+ return;
+ }
+
+ YLOG_INFO("stopping");
+ ActiveCompletionQueue->Stop();
+ ActiveCompletionQueue = nullptr;
+ Stub = nullptr;
+ Channel = nullptr;
+ YLOG_INFO("stopped");
+
+ Started = false;
+ }
+
+ TScopeLogger TClient::CreateSessionLogger() {
+ return Logger.Child(ToString(SessionLogLabel.fetch_add(1)));
+ }
+
+ TForkProtector::TForkProtector()
+ : Clients()
+ , GrpcInitializer()
+ , Enabled(grpc_core::Fork::Enabled())
+ , Lock()
+ {
+ }
+
+ void TForkProtector::Register(TClient& client) {
+ if (!Enabled) {
+ return;
+ }
+
+ Y_VERIFY(grpc_is_initialized());
+ Y_VERIFY(grpc_core::Fork::Enabled());
+
+ with_lock(Lock) {
+ Clients.push_back(&client);
+ }
+ }
+
+ void TForkProtector::Unregister(TClient& client) {
+ if (!Enabled) {
+ return;
+ }
+
+ with_lock(Lock) {
+ const auto it = Find(Clients, &client);
+ Y_VERIFY(it != Clients.end());
+ Clients.erase(it);
+ }
+ }
+
+ std::shared_ptr<TForkProtector> TForkProtector::Get(bool createIfNotExists) {
+ with_lock(InstanceLock) {
+ auto result = Instance.lock();
+ if (!result && createIfNotExists) {
+ SetEnv("GRPC_ENABLE_FORK_SUPPORT", "true");
+ result = std::make_shared<TForkProtector>();
+ if (!result->Enabled) {
+ TLog log("cerr");
+ TLogger logger(log, Nothing());
+ auto scopeLogger = logger.Child("ua client");
+ YLOG(TLOG_WARNING,
+ "Grpc is already initialized, can't enable fork support. "
+ "If forks are possible, please set environment variable GRPC_ENABLE_FORK_SUPPORT to 'true'. "
+ "If not, you can suppress this warning by setting EnableForkSupport "
+ "to false when creating the ua client.",
+ scopeLogger);
+ } else if (!SubscribedToForks) {
+ SubscribedToForks = true;
+ #ifdef _unix_
+ pthread_atfork(
+ &TForkProtector::PreFork,
+ &TForkProtector::PostForkParent,
+ &TForkProtector::PostForkChild);
+ #endif
+ }
+
+ Instance = result;
+ }
+ return result;
+ }
+ }
+
+ void TForkProtector::PreFork() {
+ auto self = Get(false);
+ if (!self) {
+ return;
+ }
+ self->Lock.Acquire();
+ for (auto* c : self->Clients) {
+ c->PreFork();
+ }
+ }
+
+ void TForkProtector::PostForkParent() {
+ auto self = Get(false);
+ if (!self) {
+ return;
+ }
+ for (auto* c : self->Clients) {
+ c->PostForkParent();
+ }
+ self->Lock.Release();
+ }
+
+ void TForkProtector::PostForkChild() {
+ auto self = Get(false);
+ if (!self) {
+ return;
+ }
+ for (auto* c : self->Clients) {
+ c->PostForkChild();
+ }
+ self->Lock.Release();
+ }
+
+ std::weak_ptr<TForkProtector> TForkProtector::Instance{};
+ TMutex TForkProtector::InstanceLock{};
+ bool TForkProtector::SubscribedToForks{false};
+
+ TClientSession::TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters)
+ : AsyncJoiner()
+ , Client(client)
+ , OriginalSessionId(MakeFMaybe(parameters.SessionId))
+ , SessionId(OriginalSessionId)
+ , Meta(MakeFMaybe(parameters.Meta))
+ , Logger(Client->CreateSessionLogger())
+ , CloseStarted(false)
+ , ForcedCloseStarted(false)
+ , Closed(false)
+ , ForkInProgressLocal(false)
+ , Started(false)
+ , ClosePromise()
+ , ActiveGrpcCall(nullptr)
+ , WriteQueue()
+ , TrimmedCount(0)
+ , NextIndex(0)
+ , AckSeqNo(Nothing())
+ , PollerLastEventTimestamp()
+ , Counters(parameters.Counters ? parameters.Counters : Client->GetCounters()->GetDefaultSessionCounters())
+ , MakeGrpcCallTimer(nullptr)
+ , ForceCloseTimer(nullptr)
+ , PollTimer(nullptr)
+ , GrpcInflightMessages(0)
+ , GrpcInflightBytes(0)
+ , InflightBytes(0)
+ , CloseRequested(false)
+ , EventsBatchSize(0)
+ , PollingStatus(EPollingStatus::Inactive)
+ , EventNotification(nullptr)
+ , EventNotificationTriggered(false)
+ , EventsBatch()
+ , SecondaryEventsBatch()
+ , ForkInProgress(false)
+ , Lock()
+ , MaxInflightBytes(
+ parameters.MaxInflightBytes.GetOrElse(Client->GetParameters().MaxInflightBytes))
+ , AgentMaxReceiveMessage(Nothing()) {
+ if (Meta.Defined() && !IsUtf8(*Meta)) {
+ throw std::runtime_error("session meta contains non UTF-8 characters");
+ }
+ Y_ENSURE(!(Client->GetParameters().EnableForkSupport && SessionId.Defined()),
+ "explicit session id is not supported with forks");
+ Client->RegisterSession(this);
+
+ with_lock(Lock) {
+ DoStart();
+ }
+ }
+
+ TFuture<void> TClientSession::PreFork() {
+ YLOG_INFO("pre fork started");
+
+ Lock.Acquire();
+
+ YLOG_INFO("triggering event notification");
+ if (!EventNotificationTriggered) {
+ EventNotificationTriggered = true;
+ EventNotification->Trigger();
+ }
+
+ YLOG_INFO("setting 'fork in progress' flag");
+ ForkInProgress.store(true);
+
+ if (!Started) {
+ ClosePromise.TrySetValue();
+ }
+ YLOG_INFO("pre fork finished");
+ return ClosePromise.GetFuture();
+ }
+
+ void TClientSession::PostForkParent() {
+ YLOG_INFO("post fork parent started");
+ ForkInProgress.store(false);
+ ForkInProgressLocal = false;
+ Started = false;
+
+ if (!CloseRequested) {
+ DoStart();
+
+ YLOG_INFO("triggering event notification");
+ EventNotificationTriggered = true;
+ EventNotification->Trigger();
+ }
+
+ Lock.Release();
+
+ YLOG_INFO("post fork parent finished");
+ }
+
+ void TClientSession::PostForkChild() {
+ YLOG_INFO("post fork child started");
+ ForkInProgress.store(false);
+ ForkInProgressLocal = false;
+ Started = false;
+
+ SessionId.Clear();
+ TrimmedCount = 0;
+ NextIndex = 0;
+ AckSeqNo.Clear();
+ PurgeWriteQueue();
+ EventsBatch.clear();
+ SecondaryEventsBatch.clear();
+ EventsBatchSize = 0;
+
+ Lock.Release();
+
+ YLOG_INFO("post fork child finished");
+ }
+
+ void TClientSession::SetAgentMaxReceiveMessage(size_t newValue) {
+ AgentMaxReceiveMessage = newValue;
+ }
+
+ void TClientSession::DoStart() {
+ // Lock must be held
+
+ Y_VERIFY(!Started);
+ YLOG_INFO("starting");
+
+ Client->EnsureStarted();
+
+ MakeGrpcCallTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
+ MakeIOCallback([this](EIOStatus status) {
+ if (status == EIOStatus::Error) {
+ return;
+ }
+ MakeGrpcCall();
+ }, &AsyncJoiner));
+ ForceCloseTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
+ MakeIOCallback([this](EIOStatus status) {
+ if (status == EIOStatus::Error) {
+ return;
+ }
+ YLOG_INFO("ForceCloseTimer");
+ BeginClose(TInstant::Zero());
+ }, &AsyncJoiner));
+ PollTimer = MakeHolder<TGrpcTimer>(Client->GetCompletionQueue(),
+ MakeIOCallback([this](EIOStatus status) {
+ if (status == EIOStatus::Error) {
+ return;
+ }
+ Poll();
+ }, &AsyncJoiner));
+ EventNotification = MakeHolder<TGrpcNotification>(Client->GetCompletionQueue(),
+ MakeIOCallback([this](EIOStatus status) {
+ Y_VERIFY(status == EIOStatus::Ok);
+ Poll();
+ }, &AsyncJoiner));
+
+ CloseStarted = false;
+ ForcedCloseStarted = false;
+ Closed = false;
+ ClosePromise = NewPromise();
+ EventNotificationTriggered = false;
+ PollerLastEventTimestamp = Now();
+ PollingStatus = EPollingStatus::Inactive;
+
+ ++Client->GetCounters()->ActiveSessionsCount;
+ MakeGrpcCallTimer->Set(Now());
+ YLOG_INFO(Sprintf("started, sessionId [%s]", OriginalSessionId.GetOrElse("").c_str()));
+
+ Started = true;
+ }
+
+ void TClientSession::MakeGrpcCall() {
+ if (Closed) {
+ YLOG_INFO("MakeGrpcCall, session already closed");
+ return;
+ }
+ Y_VERIFY(!ForcedCloseStarted);
+ Y_VERIFY(!ActiveGrpcCall);
+ ActiveGrpcCall = MakeIntrusive<TGrpcCall>(*this);
+ ActiveGrpcCall->Start();
+ ++Counters->GrpcCalls;
+ if (CloseStarted) {
+ ActiveGrpcCall->BeginClose(false);
+ }
+ }
+
+ TClientSession::~TClientSession() {
+ Close(TInstant::Zero());
+ AsyncJoiner.Join().Wait();
+ Client->UnregisterSession(this);
+ YLOG_INFO("destroyed");
+ }
+
+ void TClientSession::Send(TClientMessage&& message) {
+ const auto messageSize = SizeOf(message);
+ ++Counters->ReceivedMessages;
+ Counters->ReceivedBytes += messageSize;
+ if (messageSize > Client->GetParameters().GrpcMaxMessageSize) {
+ YLOG_ERR(Sprintf("message size [%lu] is greater than max grpc message size [%lu], message dropped",
+ messageSize, Client->GetParameters().GrpcMaxMessageSize));
+ ++Counters->DroppedMessages;
+ Counters->DroppedBytes += messageSize;
+ ++Counters->ErrorsCount;
+ return;
+ }
+ if (message.Meta.Defined() && !IsUtf8(*message.Meta)) {
+ YLOG_ERR("message meta contains non UTF-8 characters, message dropped");
+ ++Counters->DroppedMessages;
+ Counters->DroppedBytes += messageSize;
+ ++Counters->ErrorsCount;
+ return;
+ }
+ if (!message.Timestamp.Defined()) {
+ message.Timestamp = TInstant::Now();
+ }
+ ++Counters->InflightMessages;
+ Counters->InflightBytes += messageSize;
+ {
+ auto g = Guard(Lock);
+
+ if (!Started) {
+ DoStart();
+ }
+
+ if (CloseRequested) {
+ g.Release();
+ YLOG_ERR(Sprintf("session is closing, message dropped, [%lu] bytes", messageSize));
+ --Counters->InflightMessages;
+ Counters->InflightBytes -= messageSize;
+ ++Counters->DroppedMessages;
+ Counters->DroppedBytes += messageSize;
+ ++Counters->ErrorsCount;
+ return;
+ }
+ if (InflightBytes.load() + messageSize > MaxInflightBytes) {
+ g.Release();
+ YLOG_ERR(Sprintf("max inflight of [%lu] bytes reached, [%lu] bytes dropped",
+ MaxInflightBytes, messageSize));
+ --Counters->InflightMessages;
+ Counters->InflightBytes -= messageSize;
+ ++Counters->DroppedMessages;
+ Counters->DroppedBytes += messageSize;
+ ++Counters->ErrorsCount;
+ return;
+ }
+ InflightBytes.fetch_add(messageSize);
+ EventsBatch.push_back(TMessageReceivedEvent{std::move(message), messageSize});
+ EventsBatchSize += messageSize;
+ if ((PollingStatus == EPollingStatus::Inactive ||
+ EventsBatchSize >= Client->GetParameters().GrpcMaxMessageSize) &&
+ !EventNotificationTriggered)
+ {
+ EventNotificationTriggered = true;
+ EventNotification->Trigger();
+ }
+ }
+ }
+
+ TFuture<void> TClientSession::CloseAsync(TInstant deadline) {
+ YLOG_INFO(Sprintf("close, deadline [%s]", ToString(deadline).c_str()));
+ if (!ClosePromise.GetFuture().HasValue()) {
+ with_lock(Lock) {
+ if (!Started) {
+ return MakeFuture();
+ }
+
+ CloseRequested = true;
+
+ EventsBatch.push_back(TCloseRequestedEvent{deadline});
+ if (!EventNotificationTriggered) {
+ EventNotificationTriggered = true;
+ EventNotification->Trigger();
+ }
+ }
+ }
+ return ClosePromise.GetFuture();
+ }
+
+ void TClientSession::BeginClose(TInstant deadline) {
+ if (Closed) {
+ return;
+ }
+ if (!CloseStarted) {
+ CloseStarted = true;
+ YLOG_INFO("close started");
+ }
+ const auto force = deadline == TInstant::Zero();
+ if (force && !ForcedCloseStarted) {
+ ForcedCloseStarted = true;
+ YLOG_INFO("forced close started");
+ }
+ if (!ActiveGrpcCall && (ForcedCloseStarted || WriteQueue.empty())) {
+ DoClose();
+ } else {
+ if (!force) {
+ ForceCloseTimer->Set(deadline);
+ }
+ if (ActiveGrpcCall) {
+ ActiveGrpcCall->BeginClose(ForcedCloseStarted);
+ }
+ }
+ }
+
+ void TClientSession::Poll() {
+ if (ForkInProgressLocal) {
+ return;
+ }
+
+ const auto now = Now();
+ const auto sendDelay = Client->GetParameters().GrpcSendDelay;
+ const auto oldPollingStatus = PollingStatus;
+
+ {
+ if (!Lock.TryAcquire()) {
+ TSpinWait sw;
+
+ while (Lock.IsLocked() || !Lock.TryAcquire()) {
+ if (ForkInProgress.load()) {
+ YLOG_INFO("poller 'fork in progress' signal received, stopping session");
+ ForkInProgressLocal = true;
+ if (!ActiveGrpcCall || !ActiveGrpcCall->Initialized()) {
+ BeginClose(TInstant::Max());
+ } else if (ActiveGrpcCall->ReuseSessions()) {
+ ActiveGrpcCall->Poison();
+ BeginClose(TInstant::Max());
+ } else {
+ BeginClose(TInstant::Zero());
+ }
+ return;
+ }
+ sw.Sleep();
+ }
+ }
+
+ if (!EventsBatch.empty()) {
+ DoSwap(EventsBatch, SecondaryEventsBatch);
+ EventsBatchSize = 0;
+ PollerLastEventTimestamp = now;
+ }
+ const auto needNextPollStep = sendDelay != TDuration::Zero() &&
+ !CloseRequested &&
+ (now - PollerLastEventTimestamp) < 10 * sendDelay;
+ PollingStatus = needNextPollStep ? EPollingStatus::Active : EPollingStatus::Inactive;
+ EventNotificationTriggered = false;
+
+ Lock.Release();
+ }
+
+ if (PollingStatus == EPollingStatus::Active) {
+ PollTimer->Set(now + sendDelay);
+ }
+ if (PollingStatus != oldPollingStatus) {
+ YLOG_DEBUG(Sprintf("poller %s", PollingStatus == EPollingStatus::Active ? "started" : "stopped"));
+ }
+ if (auto& batch = SecondaryEventsBatch; !batch.empty()) {
+ auto closeIt = FindIf(batch, [](const auto& e) {
+ return std::holds_alternative<TCloseRequestedEvent>(e);
+ });
+
+ if (auto it = begin(batch); it != closeIt) {
+ Y_VERIFY(!CloseStarted);
+ do {
+ auto& e = std::get<TMessageReceivedEvent>(*it++);
+ WriteQueue.push_back({std::move(e.Message), e.Size, false});
+ } while (it != closeIt);
+ if (ActiveGrpcCall) {
+ ActiveGrpcCall->NotifyMessageAdded();
+ }
+ }
+
+ for (auto endIt = end(batch); closeIt != endIt; ++closeIt) {
+ const auto& e = std::get<TCloseRequestedEvent>(*closeIt);
+ BeginClose(e.Deadline);
+ }
+
+ batch.clear();
+ }
+ };
+
+ void TClientSession::PrepareInitializeRequest(NUnifiedAgentProto::Request& target) {
+ auto& initializeMessage = *target.MutableInitialize();
+ if (SessionId.Defined()) {
+ initializeMessage.SetSessionId(*SessionId);
+ }
+ if (Client->GetParameters().SharedSecretKey.Defined()) {
+ initializeMessage.SetSharedSecretKey(*Client->GetParameters().SharedSecretKey);
+ }
+ if (Meta.Defined()) {
+ for (const auto& p: *Meta) {
+ AddMeta(initializeMessage, p.first, p.second);
+ }
+ }
+ if (!Meta.Defined() || Meta->find("_reusable") == Meta->end()) {
+ AddMeta(initializeMessage, "_reusable", "true");
+ }
+ }
+
+ TClientSession::TRequestBuilder::TRequestBuilder(NUnifiedAgentProto::Request& target, size_t RequestPayloadLimitBytes,
+ TFMaybe<size_t> serializedRequestLimitBytes)
+ : Target(target)
+ , PwTarget(MakeFMaybe<NPW::TRequest>())
+ , MetaItems()
+ , RequestPayloadSize(0)
+ , RequestPayloadLimitBytes(RequestPayloadLimitBytes)
+ , SerializedRequestSize(0)
+ , SerializedRequestLimitBytes(serializedRequestLimitBytes)
+ , CountersInvalid(false)
+ {
+ }
+
+ void TClientSession::TRequestBuilder::ResetCounters() {
+ RequestPayloadSize = 0;
+ SerializedRequestSize = 0;
+ PwTarget.Clear();
+ PwTarget.ConstructInPlace();
+ CountersInvalid = false;
+ }
+
+ TClientSession::TRequestBuilder::TAddResult TClientSession::TRequestBuilder::TryAddMessage(
+ const TPendingMessage& message, size_t seqNo) {
+ Y_VERIFY(!CountersInvalid);
+ {
+ // add item to pwRequest to increase calculated size
+ PwTarget->DataBatch.SeqNo.Add(seqNo);
+ PwTarget->DataBatch.Timestamp.Add(message.Message.Timestamp->MicroSeconds());
+ PwTarget->DataBatch.Payload.Add().SetValue(message.Message.Payload);
+ if (message.Message.Meta.Defined()) {
+ for (const auto &m: *message.Message.Meta) {
+ TMetaItemBuilder *metaItemBuilder = nullptr;
+ {
+ auto it = MetaItems.find(m.first);
+ if (it == MetaItems.end()) {
+ PwTarget->DataBatch.Meta.Add().Key.SetValue(m.first);
+ } else {
+ metaItemBuilder = &it->second;
+ }
+ }
+ size_t metaItemIdx = (metaItemBuilder != nullptr) ? metaItemBuilder->ItemIndex :
+ PwTarget->DataBatch.Meta.GetSize() - 1;
+ auto &pwMetaItem = PwTarget->DataBatch.Meta.Get(metaItemIdx);
+ pwMetaItem.Value.Add().SetValue(m.second);
+ const auto index = Target.GetDataBatch().SeqNoSize();
+ if ((metaItemBuilder != nullptr && metaItemBuilder->ValueIndex != index) ||
+ (metaItemBuilder == nullptr && index != 0)) {
+ const auto valueIdx = (metaItemBuilder) ? metaItemBuilder->ValueIndex : 0;
+ pwMetaItem.SkipStart.Add(valueIdx);
+ pwMetaItem.SkipLength.Add(index - valueIdx);
+ }
+ }
+ }
+ }
+ const auto newSerializedRequestSize = PwTarget->ByteSizeLong();
+ const auto newPayloadSize = RequestPayloadSize + message.Size;
+ if ((SerializedRequestLimitBytes.Defined() && newSerializedRequestSize > *SerializedRequestLimitBytes) ||
+ newPayloadSize > RequestPayloadLimitBytes) {
+ CountersInvalid = true;
+ return {true, newPayloadSize, newSerializedRequestSize};
+ }
+
+ {
+ // add item to the real request
+ auto& batch = *Target.MutableDataBatch();
+ batch.AddSeqNo(seqNo);
+ batch.AddTimestamp(message.Message.Timestamp->MicroSeconds());
+ batch.AddPayload(message.Message.Payload);
+ if (message.Message.Meta.Defined()) {
+ for (const auto &m: *message.Message.Meta) {
+ TMetaItemBuilder *metaItemBuilder;
+ {
+ auto it = MetaItems.find(m.first);
+ if (it == MetaItems.end()) {
+ batch.AddMeta()->SetKey(m.first);
+ auto insertResult = MetaItems.insert({m.first, {batch.MetaSize() - 1}});
+ Y_VERIFY(insertResult.second);
+ metaItemBuilder = &insertResult.first->second;
+ } else {
+ metaItemBuilder = &it->second;
+ }
+ }
+ auto *metaItem = batch.MutableMeta(metaItemBuilder->ItemIndex);
+ metaItem->AddValue(m.second);
+ const auto index = batch.SeqNoSize() - 1;
+ if (metaItemBuilder->ValueIndex != index) {
+ metaItem->AddSkipStart(metaItemBuilder->ValueIndex);
+ metaItem->AddSkipLength(index - metaItemBuilder->ValueIndex);
+ }
+ metaItemBuilder->ValueIndex = index + 1;
+ }
+ }
+ SerializedRequestSize = newSerializedRequestSize;
+ RequestPayloadSize = newPayloadSize;
+ }
+
+ return {false, newPayloadSize, newSerializedRequestSize};
+ }
+
+ void TClientSession::PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target) {
+ Y_VERIFY(AckSeqNo.Defined());
+ TRequestBuilder requestBuilder(target, Client->GetParameters().GrpcMaxMessageSize, AgentMaxReceiveMessage);
+ const auto startIndex = NextIndex - TrimmedCount;
+ for (size_t i = startIndex; i < WriteQueue.size(); ++i) {
+ auto& queueItem = WriteQueue[i];
+ if (queueItem.Skipped) {
+ NextIndex++;
+ continue;
+ }
+
+ const auto addResult = requestBuilder.TryAddMessage(queueItem, *AckSeqNo + i + 1);
+ const auto serializedLimitToLog = AgentMaxReceiveMessage.Defined() ? *AgentMaxReceiveMessage : 0;
+ if (addResult.LimitExceeded && target.GetDataBatch().SeqNoSize() == 0) {
+ YLOG_ERR(Sprintf("single serialized message is too large [%lu] > [%lu], dropping it",
+ addResult.NewSerializedRequestSize, serializedLimitToLog));
+ queueItem.Skipped = true;
+ ++Counters->DroppedMessages;
+ Counters->DroppedBytes += queueItem.Size;
+ ++Counters->ErrorsCount;
+ NextIndex++;
+ requestBuilder.ResetCounters();
+ continue;
+ }
+ if (addResult.LimitExceeded) {
+ YLOG_DEBUG(Sprintf(
+ "batch limit exceeded: [%lu] > [%lu] (limit for serialized batch)"
+ "OR [%lu] > [%lu] (limit for raw batch)",
+ addResult.NewSerializedRequestSize, serializedLimitToLog,
+ addResult.NewRequestPayloadSize, Client->GetParameters().GrpcMaxMessageSize));
+ break;
+ }
+
+ NextIndex++;
+ }
+ const auto messagesCount = target.GetDataBatch().SeqNoSize();
+ if (messagesCount == 0) {
+ return;
+ }
+ Y_VERIFY(requestBuilder.GetSerializedRequestSize() == target.ByteSizeLong(),
+ "failed to calculate size for message [%s]", target.ShortDebugString().c_str());
+ GrpcInflightMessages += messagesCount;
+ GrpcInflightBytes += requestBuilder.GetRequestPayloadSize();
+ YLOG_DEBUG(Sprintf("new write batch, [%lu] messages, [%lu] bytes, first seq_no [%lu], serialized size [%lu]",
+ messagesCount, requestBuilder.GetRequestPayloadSize(),
+ *target.GetDataBatch().GetSeqNo().begin(), requestBuilder.GetSerializedRequestSize()));
+ ++Counters->GrpcWriteBatchRequests;
+ Counters->GrpcInflightMessages += messagesCount;
+ Counters->GrpcInflightBytes += requestBuilder.GetRequestPayloadSize();
+ }
+
+ void TClientSession::Acknowledge(ui64 seqNo) {
+ size_t messagesCount = 0;
+ size_t bytesCount = 0;
+ size_t skippedMessagesCount = 0;
+ size_t skippedBytesCount = 0;
+
+ if (AckSeqNo.Defined()) {
+ while (!WriteQueue.empty() && ((*AckSeqNo < seqNo) || WriteQueue.front().Skipped)) {
+ if (WriteQueue.front().Skipped) {
+ skippedMessagesCount++;
+ skippedBytesCount += WriteQueue.front().Size;
+ } else {
+ ++messagesCount;
+ bytesCount += WriteQueue.front().Size;
+ }
+ ++(*AckSeqNo);
+ WriteQueue.pop_front();
+ ++TrimmedCount;
+ }
+ }
+ if (!AckSeqNo.Defined() || seqNo > *AckSeqNo) {
+ AckSeqNo = seqNo;
+ }
+
+ Counters->AcknowledgedMessages += messagesCount;
+ Counters->AcknowledgedBytes += bytesCount;
+ Counters->InflightMessages -= (messagesCount + skippedMessagesCount);
+ Counters->InflightBytes -= (bytesCount + skippedBytesCount);
+ InflightBytes.fetch_sub(bytesCount);
+ Counters->GrpcInflightMessages -= messagesCount;
+ Counters->GrpcInflightBytes -= bytesCount;
+ GrpcInflightMessages -= messagesCount;
+ GrpcInflightBytes -= bytesCount;
+
+ YLOG_DEBUG(Sprintf("ack [%lu], [%lu] messages, [%lu] bytes", seqNo, messagesCount, bytesCount));
+ }
+
+ void TClientSession::OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo) {
+ SessionId = sessionId;
+ Acknowledge(lastSeqNo);
+ NextIndex = TrimmedCount;
+ ++Counters->GrpcCallsInitialized;
+ Counters->GrpcInflightMessages -= GrpcInflightMessages;
+ Counters->GrpcInflightBytes -= GrpcInflightBytes;
+ GrpcInflightMessages = 0;
+ GrpcInflightBytes = 0;
+ YLOG_INFO(Sprintf("grpc call initialized, session_id [%s], last_seq_no [%lu]",
+ sessionId.c_str(), lastSeqNo));
+ }
+
+ void TClientSession::OnGrpcCallFinished() {
+ Y_VERIFY(!Closed);
+ Y_VERIFY(ActiveGrpcCall);
+ ActiveGrpcCall = nullptr;
+ if (CloseStarted && (ForcedCloseStarted || WriteQueue.empty())) {
+ DoClose();
+ } else {
+ const auto reconnectTime = TInstant::Now() + Client->GetParameters().GrpcReconnectDelay;
+ MakeGrpcCallTimer->Set(reconnectTime);
+ YLOG_INFO(Sprintf("grpc call delayed until [%s]", reconnectTime.ToString().c_str()));
+ }
+ }
+
+ auto TClientSession::PurgeWriteQueue() -> TPurgeWriteQueueStats {
+ size_t bytesCount = 0;
+ for (const auto& m: WriteQueue) {
+ bytesCount += m.Size;
+ }
+ auto result = TPurgeWriteQueueStats{WriteQueue.size(), bytesCount};
+
+ Counters->DroppedMessages += WriteQueue.size();
+ Counters->DroppedBytes += bytesCount;
+ Counters->InflightMessages -= WriteQueue.size();
+ Counters->InflightBytes -= bytesCount;
+ Counters->GrpcInflightMessages -= GrpcInflightMessages;
+ Counters->GrpcInflightBytes -= GrpcInflightBytes;
+
+ InflightBytes.fetch_sub(bytesCount);
+ GrpcInflightMessages = 0;
+ GrpcInflightBytes = 0;
+ WriteQueue.clear();
+
+ return result;
+ }
+
+ void TClientSession::DoClose() {
+ Y_VERIFY(CloseStarted);
+ Y_VERIFY(!Closed);
+ Y_VERIFY(!ClosePromise.HasValue());
+ MakeGrpcCallTimer->Cancel();
+ ForceCloseTimer->Cancel();
+ PollTimer->Cancel();
+ if (!ForkInProgressLocal && WriteQueue.size() > 0) {
+ const auto stats = PurgeWriteQueue();
+ ++Counters->ErrorsCount;
+ YLOG_ERR(Sprintf("DoClose, dropped [%lu] messages, [%lu] bytes",
+ stats.PurgedMessages, stats.PurgedBytes));
+ }
+ --Client->GetCounters()->ActiveSessionsCount;
+ Closed = true;
+ ClosePromise.SetValue();
+ YLOG_INFO("session closed");
+ }
+
+ TGrpcCall::TGrpcCall(TClientSession& session)
+ : Session(session)
+ , AsyncJoinerToken(&Session.GetAsyncJoiner())
+ , AcceptTag(MakeIOCallback(this, &TGrpcCall::EndAccept))
+ , ReadTag(MakeIOCallback(this, &TGrpcCall::EndRead))
+ , WriteTag(MakeIOCallback(this, &TGrpcCall::EndWrite))
+ , WritesDoneTag(MakeIOCallback(this, &TGrpcCall::EndWritesDone))
+ , FinishTag(MakeIOCallback(this, &TGrpcCall::EndFinish))
+ , Logger(session.GetLogger().Child("grpc"))
+ , AcceptPending(false)
+ , Initialized_(false)
+ , ReadPending(false)
+ , ReadsDone(false)
+ , WritePending(false)
+ , WritesBlocked(false)
+ , WritesDonePending(false)
+ , WritesDone(false)
+ , ErrorOccured(false)
+ , FinishRequested(false)
+ , FinishStarted(false)
+ , FinishDone(false)
+ , Cancelled(false)
+ , Poisoned(false)
+ , PoisonPillSent(false)
+ , ReuseSessions_(false)
+ , FinishStatus()
+ , ClientContext()
+ , ReaderWriter(nullptr)
+ , Request()
+ , Response()
+ {
+ }
+
+ void TGrpcCall::Start() {
+ AcceptPending = true;
+ auto& client = Session.GetClient();
+ ReaderWriter = client.GetStub().AsyncSession(&ClientContext,
+ &client.GetCompletionQueue(),
+ AcceptTag->Ref());
+ YLOG_INFO("AsyncSession started");
+ }
+
+ TGrpcCall::~TGrpcCall() {
+ YLOG_INFO("destroyed");
+ }
+
+ void TGrpcCall::EnsureFinishStarted() {
+ if (!FinishStarted) {
+ FinishStarted = true;
+ ReaderWriter->Finish(&FinishStatus, FinishTag->Ref());
+ YLOG_INFO("Finish started");
+ }
+ }
+
+ bool TGrpcCall::CheckHasError(EIOStatus status, const char* method) {
+ if (status == EIOStatus::Error) {
+ SetError(Sprintf("%s %s", method, ToString(status).c_str()));
+ return true;
+ }
+ if (ErrorOccured) {
+ ScheduleFinishOnError();
+ return true;
+ }
+ return false;
+ }
+
+ void TGrpcCall::SetError(const TString& error) {
+ if (!Cancelled) {
+ YLOG_ERR(error);
+ ++Session.GetCounters().ErrorsCount;
+ }
+ ErrorOccured = true;
+ ScheduleFinishOnError();
+ }
+
+ void TGrpcCall::ScheduleFinishOnError() {
+ if (!AcceptPending && !WritePending && !WritesDonePending) {
+ EnsureFinishStarted();
+ }
+ }
+
+ void TGrpcCall::BeginClose(bool force) {
+ if (force) {
+ if (!Cancelled) {
+ Cancelled = true;
+ ClientContext.TryCancel();
+ SetError("forced close");
+ }
+ return;
+ }
+ YLOG_INFO(Sprintf("Close Initialized [%d], AcceptPending [%d], "
+ "WritePending [%d], FinishRequested [%d], "
+ "ErrorOccured [%d]",
+ static_cast<int>(Initialized_),
+ static_cast<int>(AcceptPending),
+ static_cast<int>(WritePending),
+ static_cast<int>(FinishRequested),
+ static_cast<int>(ErrorOccured)));
+ if (ErrorOccured || FinishRequested) {
+ return;
+ }
+ FinishRequested = true;
+ if (!Initialized_ || WritePending) {
+ return;
+ }
+ WritesBlocked = true;
+ BeginWritesDone();
+ }
+
+ void TGrpcCall::Poison() {
+ Poisoned = true;
+ NotifyMessageAdded();
+ }
+
+ void TGrpcCall::NotifyMessageAdded() {
+ if (WritePending || !Initialized_ || ErrorOccured || FinishRequested) {
+ return;
+ }
+ ScheduleWrite();
+ }
+
+ void TGrpcCall::ScheduleWrite() {
+ Request.Clear();
+ if (!Poisoned) {
+ Session.PrepareWriteBatchRequest(Request);
+ } else if (!PoisonPillSent) {
+ PoisonPillSent = true;
+ auto& batch = *Request.mutable_data_batch();
+ batch.AddSeqNo(std::numeric_limits<::google::protobuf::uint64>::max());
+ batch.AddTimestamp(Now().MicroSeconds());
+ batch.AddPayload("");
+ YLOG_INFO("poison pill sent");
+ }
+ if (Request.GetDataBatch().GetSeqNo().empty()) {
+ if (FinishRequested) {
+ WritesBlocked = true;
+ BeginWritesDone();
+ }
+ return;
+ }
+
+ BeginWrite();
+ }
+
+ void TGrpcCall::EndAccept(EIOStatus status) {
+ Y_VERIFY(AcceptPending);
+ AcceptPending = false;
+ if (CheckHasError(status, "EndAccept")) {
+ return;
+ }
+ BeginRead();
+ Request.Clear();
+ Session.PrepareInitializeRequest(Request);
+ BeginWrite();
+ }
+
+ void TGrpcCall::EndRead(EIOStatus status) {
+ ReadPending = false;
+ if (FinishDone) {
+ Session.OnGrpcCallFinished();
+ return;
+ }
+ if (!ErrorOccured && status == EIOStatus::Error && WritesBlocked) {
+ Y_VERIFY(!WritePending);
+ YLOG_INFO("EndRead ReadsDone");
+ ReadsDone = true;
+ if (WritesDone) {
+ EnsureFinishStarted();
+ return;
+ }
+ return;
+ }
+ if (CheckHasError(status, "EndRead")) {
+ return;
+ }
+ if (!Initialized_) {
+ const auto metadata = ClientContext.GetServerInitialMetadata();
+ {
+ const auto it = metadata.find("ua-reuse-sessions");
+ if (it != metadata.end() && it->second == "true") {
+ ReuseSessions_ = true;
+ }
+ }
+ {
+ const auto it = metadata.find("ua-max-receive-message-size");
+ if (it != metadata.end()) {
+ Session.SetAgentMaxReceiveMessage(FromString<size_t>(TString{it->second.begin(), it->second.end()}));
+ }
+ }
+
+ if (Response.response_case() != NUnifiedAgentProto::Response::kInitialized) {
+ SetError(Sprintf("EndRead while initializing, unexpected response_case [%d]",
+ static_cast<int>(Response.response_case())));
+ return;
+ }
+ Session.OnGrpcCallInitialized(Response.GetInitialized().GetSessionId(),
+ Response.GetInitialized().GetLastSeqNo());
+ Initialized_ = true;
+ if (!WritePending) {
+ ScheduleWrite();
+ }
+ } else {
+ if (Response.response_case() != NUnifiedAgentProto::Response::kAck) {
+ SetError(Sprintf("EndRead unexpected response_case [%d]",
+ static_cast<int>(Response.response_case())));
+ return;
+ }
+ Session.Acknowledge(Response.GetAck().GetSeqNo());
+ }
+ BeginRead();
+ }
+
+ void TGrpcCall::EndWrite(EIOStatus status) {
+ WritePending = false;
+ if (CheckHasError(status, "EndWrite")) {
+ return;
+ }
+ if (!Initialized_) {
+ return;
+ }
+ ScheduleWrite();
+ }
+
+ void TGrpcCall::EndFinish(EIOStatus status) {
+ FinishDone = true;
+ const auto finishStatus = status == EIOStatus::Error
+ ? grpc::Status(grpc::UNKNOWN, "finish error")
+ : FinishStatus;
+ YLOG(finishStatus.ok() || Cancelled || Poisoned ? TLOG_INFO : TLOG_ERR,
+ Sprintf("EndFinish, code [%s], message [%s]",
+ ToString(finishStatus.error_code()).c_str(),
+ finishStatus.error_message().c_str()),
+ Logger);
+ if (!finishStatus.ok() && !Cancelled) {
+ ++Session.GetCounters().ErrorsCount;
+ }
+ if (!ReadPending) {
+ Session.OnGrpcCallFinished();
+ }
+ }
+
+ void TGrpcCall::EndWritesDone(EIOStatus status) {
+ YLOG_INFO(Sprintf("EndWritesDone [%s]", ToString(status).c_str()));
+ Y_VERIFY(!WritePending && !WritesDone && WritesDonePending);
+ WritesDonePending = false;
+ WritesDone = true;
+ if (CheckHasError(status, "EndWriteDone")) {
+ return;
+ }
+ if (ReadsDone) {
+ EnsureFinishStarted();
+ }
+ }
+
+ void TGrpcCall::BeginWritesDone() {
+ WritesDonePending = true;
+ ReaderWriter->WritesDone(WritesDoneTag->Ref());
+ YLOG_INFO("WritesDone started");
+ }
+
+ void TGrpcCall::BeginRead() {
+ ReadPending = true;
+ Response.Clear();
+ ReaderWriter->Read(&Response, ReadTag->Ref());
+ YLOG_DEBUG("Read started");
+ }
+
+ void TGrpcCall::BeginWrite() {
+ WritePending = true;
+ ReaderWriter->Write(Request, WriteTag->Ref());
+ YLOG_DEBUG("Write started");
+ }
+}
+
+namespace NUnifiedAgent {
+ size_t SizeOf(const TClientMessage& message) {
+ auto result = message.Payload.Size() + sizeof(TInstant);
+ if (message.Meta.Defined()) {
+ for (const auto& m: *message.Meta) {
+ result += m.first.Size() + m.second.Size();
+ }
+ }
+ return result;
+ }
+
+ TClientParameters::TClientParameters(const TString& uri)
+ : Uri(uri)
+ , SharedSecretKey(Nothing())
+ , MaxInflightBytes(DefaultMaxInflightBytes)
+ , Log(TLoggerOperator<TGlobalLog>::Log())
+ , LogRateLimitBytes(Nothing())
+ , GrpcReconnectDelay(TDuration::MilliSeconds(50))
+ , GrpcSendDelay(DefaultGrpcSendDelay)
+ , EnableForkSupport(false)
+ , GrpcMaxMessageSize(DefaultGrpcMaxMessageSize)
+ , Counters(nullptr)
+ {
+ }
+
+ TSessionParameters::TSessionParameters()
+ : SessionId(Nothing())
+ , Meta(Nothing())
+ , Counters(nullptr)
+ , MaxInflightBytes()
+ {
+ }
+
+ const size_t TClientParameters::DefaultMaxInflightBytes = 10_MB;
+ const size_t TClientParameters::DefaultGrpcMaxMessageSize = 1_MB;
+ const TDuration TClientParameters::DefaultGrpcSendDelay = TDuration::MilliSeconds(10);
+
+ TClientPtr MakeClient(const TClientParameters& parameters) {
+ if (!grpc_is_initialized()) {
+ EnsureGrpcConfigured();
+ }
+
+ std::shared_ptr<NPrivate::TForkProtector> forkProtector{};
+#ifdef _unix_
+ if (parameters.EnableForkSupport) {
+ forkProtector = NPrivate::TForkProtector::Get(true);
+ }
+#endif
+ return MakeIntrusive<NPrivate::TClient>(parameters, forkProtector);
+ }
+}
diff --git a/library/cpp/unified_agent_client/client_impl.h b/library/cpp/unified_agent_client/client_impl.h
new file mode 100644
index 0000000000..6adadf92e3
--- /dev/null
+++ b/library/cpp/unified_agent_client/client_impl.h
@@ -0,0 +1,364 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/client.h>
+#include <library/cpp/unified_agent_client/client_proto_weighing.h>
+#include <library/cpp/unified_agent_client/counters.h>
+#include <library/cpp/unified_agent_client/logger.h>
+#include <library/cpp/unified_agent_client/variant.h>
+#include <library/cpp/unified_agent_client/proto/unified_agent.grpc.pb.h>
+#include <library/cpp/unified_agent_client/grpc_io.h>
+
+#include <library/cpp/logger/global/global.h>
+
+#include <util/generic/deque.h>
+#include <util/system/mutex.h>
+
+namespace NUnifiedAgent::NPrivate {
+ class TClientSession;
+ class TGrpcCall;
+ class TForkProtector;
+
+ class TClient: public IClient {
+ public:
+ explicit TClient(const TClientParameters& parameters, std::shared_ptr<TForkProtector> forkProtector);
+
+ ~TClient() override;
+
+ TClientSessionPtr CreateSession(const TSessionParameters& parameters) override;
+
+ void StartTracing(ELogPriority logPriority) override;
+
+ void FinishTracing() override;
+
+ inline const TIntrusivePtr<TClientCounters>& GetCounters() const noexcept {
+ return Counters;
+ }
+
+ inline NUnifiedAgentProto::UnifiedAgentService::Stub& GetStub() noexcept {
+ return *Stub;
+ }
+
+ TScopeLogger CreateSessionLogger();
+
+ inline const TClientParameters& GetParameters() const noexcept {
+ return Parameters;
+ }
+
+ inline grpc::CompletionQueue& GetCompletionQueue() noexcept {
+ return ActiveCompletionQueue->GetCompletionQueue();
+ }
+
+ void RegisterSession(TClientSession* session);
+
+ void UnregisterSession(TClientSession* session);
+
+ void PreFork();
+
+ void PostForkParent();
+
+ void PostForkChild();
+
+ void EnsureStarted();
+
+ private:
+ void EnsureStartedNoLock();
+
+ void EnsureStoppedNoLock();
+
+ private:
+ const TClientParameters Parameters;
+ std::shared_ptr<TForkProtector> ForkProtector;
+ TIntrusivePtr<TClientCounters> Counters;
+ TLog Log;
+ TLogger MainLogger;
+ TScopeLogger Logger;
+ std::shared_ptr<grpc::Channel> Channel;
+ std::unique_ptr<NUnifiedAgentProto::UnifiedAgentService::Stub> Stub;
+ THolder<TGrpcCompletionQueueHost> ActiveCompletionQueue;
+ std::atomic<size_t> SessionLogLabel;
+ TVector<TClientSession*> ActiveSessions;
+ bool Started;
+ bool Destroyed;
+ TAdaptiveLock Lock;
+ static std::atomic<ui64> Id;
+ };
+
+ class TForkProtector {
+ public:
+ TForkProtector();
+
+ void Register(TClient& client);
+
+ void Unregister(TClient& client);
+
+ static std::shared_ptr<TForkProtector> Get(bool createIfNotExists);
+
+ private:
+ static void PreFork();
+
+ static void PostForkParent();
+
+ static void PostForkChild();
+
+ private:
+ TVector<TClient*> Clients;
+ grpc::GrpcLibraryCodegen GrpcInitializer;
+ bool Enabled;
+ TAdaptiveLock Lock;
+
+ static std::weak_ptr<TForkProtector> Instance;
+ static TMutex InstanceLock;
+ static bool SubscribedToForks;
+ };
+
+ class TClientSession: public IClientSession {
+ public:
+ TClientSession(const TIntrusivePtr<TClient>& client, const TSessionParameters& parameters);
+
+ ~TClientSession();
+
+ void Send(TClientMessage&& message) override;
+
+ NThreading::TFuture<void> CloseAsync(TInstant deadline) override;
+
+ inline TClient& GetClient() noexcept {
+ return *Client;
+ }
+
+ inline TScopeLogger& GetLogger() noexcept {
+ return Logger;
+ }
+
+ inline TClientSessionCounters& GetCounters() noexcept {
+ return *Counters;
+ }
+
+ inline TAsyncJoiner& GetAsyncJoiner() noexcept {
+ return AsyncJoiner;
+ }
+
+ void PrepareInitializeRequest(NUnifiedAgentProto::Request& target);
+
+ void PrepareWriteBatchRequest(NUnifiedAgentProto::Request& target);
+
+ void Acknowledge(ui64 seqNo);
+
+ void OnGrpcCallInitialized(const TString& sessionId, ui64 lastSeqNo);
+
+ void OnGrpcCallFinished();
+
+ NThreading::TFuture<void> PreFork();
+
+ void PostForkParent();
+
+ void PostForkChild();
+
+ void SetAgentMaxReceiveMessage(size_t);
+
+ private:
+ enum class EPollingStatus {
+ Active,
+ Inactive
+ };
+
+ struct TCloseRequestedEvent {
+ TInstant Deadline;
+ };
+
+ struct TMessageReceivedEvent {
+ TClientMessage Message;
+ size_t Size;
+ };
+
+ struct TPurgeWriteQueueStats {
+ size_t PurgedMessages{};
+ size_t PurgedBytes{};
+ };
+
+ using TEvent = std::variant<TCloseRequestedEvent, TMessageReceivedEvent>;
+
+ public:
+ struct TPendingMessage {
+ TClientMessage Message;
+ size_t Size;
+ bool Skipped;
+ };
+
+ class TRequestBuilder {
+ public:
+ struct TAddResult;
+
+ public:
+ TRequestBuilder(NUnifiedAgentProto::Request &target, size_t RequestPayloadLimitBytes,
+ TFMaybe<size_t> serializedRequestLimitBytes);
+
+ TAddResult TryAddMessage(const TPendingMessage& message, size_t seqNo);
+
+ void ResetCounters();
+
+ inline size_t GetSerializedRequestSize() const {
+ return SerializedRequestSize;
+ }
+
+ inline size_t GetRequestPayloadSize() const {
+ return RequestPayloadSize;
+ }
+
+ public:
+ struct TAddResult {
+ bool LimitExceeded;
+ size_t NewRequestPayloadSize; // == actual value, if !LimitExceeded
+ size_t NewSerializedRequestSize; // == actual value, if !LimitExceeded
+ };
+
+ private:
+ struct TMetaItemBuilder {
+ size_t ItemIndex;
+ size_t ValueIndex{0};
+ };
+
+ private:
+ NUnifiedAgentProto::Request& Target;
+ TFMaybe<NPW::TRequest> PwTarget;
+ THashMap<TString, TMetaItemBuilder> MetaItems;
+ size_t RequestPayloadSize;
+ size_t RequestPayloadLimitBytes;
+ size_t SerializedRequestSize;
+ TFMaybe<size_t> SerializedRequestLimitBytes;
+ bool CountersInvalid;
+ };
+
+ private:
+ void MakeGrpcCall();
+
+ void DoClose();
+
+ void BeginClose(TInstant deadline);
+
+ void Poll();
+
+ TPurgeWriteQueueStats PurgeWriteQueue();
+
+ void DoStart();
+
+ private:
+ TAsyncJoiner AsyncJoiner;
+ TIntrusivePtr<TClient> Client;
+ TFMaybe<TString> OriginalSessionId;
+ TFMaybe<TString> SessionId;
+ TFMaybe<THashMap<TString, TString>> Meta;
+ TScopeLogger Logger;
+ bool CloseStarted;
+ bool ForcedCloseStarted;
+ bool Closed;
+ bool ForkInProgressLocal;
+ bool Started;
+ NThreading::TPromise<void> ClosePromise;
+ TIntrusivePtr<TGrpcCall> ActiveGrpcCall;
+ TDeque<TPendingMessage> WriteQueue;
+ size_t TrimmedCount;
+ size_t NextIndex;
+ TFMaybe<ui64> AckSeqNo;
+ TInstant PollerLastEventTimestamp;
+ TIntrusivePtr<TClientSessionCounters> Counters;
+ THolder<TGrpcTimer> MakeGrpcCallTimer;
+ THolder<TGrpcTimer> ForceCloseTimer;
+ THolder<TGrpcTimer> PollTimer;
+ ui64 GrpcInflightMessages;
+ ui64 GrpcInflightBytes;
+
+ std::atomic<size_t> InflightBytes;
+ bool CloseRequested;
+ size_t EventsBatchSize;
+ EPollingStatus PollingStatus;
+ THolder<TGrpcNotification> EventNotification;
+ bool EventNotificationTriggered;
+ TVector<TEvent> EventsBatch;
+ TVector<TEvent> SecondaryEventsBatch;
+ std::atomic<bool> ForkInProgress;
+ TAdaptiveLock Lock;
+ size_t MaxInflightBytes;
+ TFMaybe<size_t> AgentMaxReceiveMessage;
+ };
+
+ class TGrpcCall final: public TAtomicRefCount<TGrpcCall> {
+ public:
+ explicit TGrpcCall(TClientSession& session);
+
+ void Start();
+
+ ~TGrpcCall();
+
+ void BeginClose(bool force);
+
+ void Poison();
+
+ void NotifyMessageAdded();
+
+ inline bool Initialized() const {
+ return Initialized_;
+ }
+
+ inline bool ReuseSessions() const {
+ return ReuseSessions_;
+ }
+
+ private:
+ void EndAccept(EIOStatus status);
+
+ void EndRead(EIOStatus status);
+
+ void EndWrite(EIOStatus status);
+
+ void EndFinish(EIOStatus status);
+
+ void EndWritesDone(EIOStatus);
+
+ void ScheduleWrite();
+
+ void BeginWritesDone();
+
+ bool CheckHasError(EIOStatus status, const char* method);
+
+ void SetError(const TString& error);
+
+ void EnsureFinishStarted();
+
+ void BeginRead();
+
+ void BeginWrite();
+
+ void ScheduleFinishOnError();
+
+ private:
+ TClientSession& Session;
+ TAsyncJoinerToken AsyncJoinerToken;
+ THolder<IIOCallback> AcceptTag;
+ THolder<IIOCallback> ReadTag;
+ THolder<IIOCallback> WriteTag;
+ THolder<IIOCallback> WritesDoneTag;
+ THolder<IIOCallback> FinishTag;
+ TScopeLogger Logger;
+ bool AcceptPending;
+ bool Initialized_;
+ bool ReadPending;
+ bool ReadsDone;
+ bool WritePending;
+ bool WritesBlocked;
+ bool WritesDonePending;
+ bool WritesDone;
+ bool ErrorOccured;
+ bool FinishRequested;
+ bool FinishStarted;
+ bool FinishDone;
+ bool Cancelled;
+ bool Poisoned;
+ bool PoisonPillSent;
+ bool ReuseSessions_;
+ grpc::Status FinishStatus;
+ grpc::ClientContext ClientContext;
+ std::unique_ptr<grpc::ClientAsyncReaderWriter<NUnifiedAgentProto::Request, NUnifiedAgentProto::Response>> ReaderWriter;
+ NUnifiedAgentProto::Request Request;
+ NUnifiedAgentProto::Response Response;
+ };
+}
diff --git a/library/cpp/unified_agent_client/client_proto_weighing.h b/library/cpp/unified_agent_client/client_proto_weighing.h
new file mode 100644
index 0000000000..728792e49f
--- /dev/null
+++ b/library/cpp/unified_agent_client/client_proto_weighing.h
@@ -0,0 +1,75 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/f_maybe.h>
+#include <library/cpp/unified_agent_client/proto_weighing.h>
+
+namespace NUnifiedAgent::NPW {
+ struct TMessageMetaItem: public TMessage {
+ TMessageMetaItem()
+ : TMessage()
+ , Key(this)
+ , Value(this)
+ , SkipStart(this)
+ , SkipLength(this)
+ {
+ }
+
+ explicit TMessageMetaItem(TMessage* parent)
+ : TMessage(parent)
+ , Key(this)
+ , Value(this)
+ , SkipStart(this)
+ , SkipLength(this)
+ {
+ }
+
+ explicit TMessageMetaItem(const NUnifiedAgent::TFMaybe<TFieldLink>& link)
+ : TMessage(link)
+ , Key(this)
+ , Value(this)
+ , SkipStart(this)
+ , SkipLength(this)
+ {
+ }
+
+ TStringField Key;
+ TRepeatedPtrField<TStringField> Value;
+ TRepeatedField<ui32> SkipStart;
+ TRepeatedField<ui32> SkipLength;
+ };
+
+ struct TDataBatch: public TMessage {
+ TDataBatch()
+ : TMessage()
+ , SeqNo(this)
+ , Timestamp(this)
+ , Payload(this, 2)
+ , Meta(this, 2)
+ {
+ }
+
+ TDataBatch(TMessage* parent)
+ : TMessage(parent)
+ , SeqNo(this)
+ , Timestamp(this)
+ , Payload(this, 2)
+ , Meta(this, 2)
+ {
+ }
+
+ TRepeatedField<ui64> SeqNo; // 1
+ TRepeatedField<ui64> Timestamp; // 2
+ TRepeatedPtrField<TStringField> Payload; // 100
+ TRepeatedPtrField<TMessageMetaItem> Meta; // 101
+ };
+
+ struct TRequest: public TMessage {
+ TRequest()
+ : TMessage()
+ , DataBatch(this)
+ {
+ }
+
+ TDataBatch DataBatch;
+ };
+}
diff --git a/library/cpp/unified_agent_client/clock.cpp b/library/cpp/unified_agent_client/clock.cpp
new file mode 100644
index 0000000000..192c998a02
--- /dev/null
+++ b/library/cpp/unified_agent_client/clock.cpp
@@ -0,0 +1,48 @@
+#include "clock.h"
+
+namespace NUnifiedAgent {
+ void TClock::Configure() {
+ Y_VERIFY(!Configured_);
+
+ Configured_ = true;
+ }
+
+ void TClock::SetBase(TInstant value) {
+ Y_VERIFY(Configured_);
+
+ Base_.store(value.GetValue());
+ }
+
+ void TClock::ResetBase() {
+ Base_.store(0);
+ }
+
+ void TClock::ResetBaseWithShift() {
+ Y_VERIFY(Configured_);
+
+ Shift_.store(static_cast<i64>(Base_.exchange(0)) - static_cast<i64>(::Now().GetValue()));
+ }
+
+ void TClock::SetShift(TDuration value) {
+ Y_VERIFY(Configured_);
+
+ Shift_.fetch_add(value.GetValue());
+ }
+
+ void TClock::ResetShift() {
+ Shift_.store(0);
+ }
+
+ TInstant TClock::Get() {
+ auto base = Base_.load();
+ if (base == 0) {
+ base = ::Now().GetValue();
+ }
+ base += Shift_.load();
+ return TInstant::FromValue(base);
+ }
+
+ bool TClock::Configured_{false};
+ std::atomic<ui64> TClock::Base_{0};
+ std::atomic<i64> TClock::Shift_{0};
+}
diff --git a/library/cpp/unified_agent_client/clock.h b/library/cpp/unified_agent_client/clock.h
new file mode 100644
index 0000000000..77ff44583e
--- /dev/null
+++ b/library/cpp/unified_agent_client/clock.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <util/datetime/base.h>
+
+#include <atomic>
+
+namespace NUnifiedAgent {
+ class TClock {
+ public:
+ static void Configure();
+
+ static inline bool Configured() {
+ return Configured_;
+ }
+
+ static inline TInstant Now() {
+ return Configured_ ? Get() : TInstant::Now();
+ }
+
+ static void SetBase(TInstant value);
+
+ static void ResetBase();
+
+ static void ResetBaseWithShift();
+
+ static void SetShift(TDuration value);
+
+ static void ResetShift();
+
+ static TInstant Get();
+
+ private:
+ static bool Configured_;
+ static std::atomic<ui64> Base_;
+ static std::atomic<i64> Shift_;
+ };
+}
diff --git a/library/cpp/unified_agent_client/counters.cpp b/library/cpp/unified_agent_client/counters.cpp
new file mode 100644
index 0000000000..776a86ec4e
--- /dev/null
+++ b/library/cpp/unified_agent_client/counters.cpp
@@ -0,0 +1,36 @@
+#include "counters.h"
+
+using namespace NMonitoring;
+
+namespace NUnifiedAgent {
+ TClientCounters::TClientCounters(const NMonitoring::TDynamicCounterPtr& counters)
+ : TDynamicCountersWrapper(counters)
+ , ActiveSessionsCount(GetCounter("ActiveSessionsCount", false))
+ , ClientLogDroppedBytes(GetCounter("ClientLogDroppedBytes", true))
+ {
+ }
+
+ TIntrusivePtr<TClientSessionCounters> TClientCounters::GetDefaultSessionCounters() {
+ auto group = Unwrap()->GetSubgroup("session", "default");
+ return MakeIntrusive<TClientSessionCounters>(group);
+ }
+
+ TClientSessionCounters::TClientSessionCounters(const NMonitoring::TDynamicCounterPtr& counters)
+ : TDynamicCountersWrapper(counters)
+ , ReceivedMessages(GetCounter("ReceivedMessages", true))
+ , ReceivedBytes(GetCounter("ReceivedBytes", true))
+ , AcknowledgedMessages(GetCounter("AcknowledgedMessages", true))
+ , AcknowledgedBytes(GetCounter("AcknowledgedBytes", true))
+ , InflightMessages(GetCounter("InflightMessages", false))
+ , InflightBytes(GetCounter("InflightBytes", false))
+ , GrpcWriteBatchRequests(GetCounter("GrpcWriteBatchRequests", true))
+ , GrpcInflightMessages(GetCounter("GrpcInflightMessages", false))
+ , GrpcInflightBytes(GetCounter("GrpcInflightBytes", false))
+ , GrpcCalls(GetCounter("GrpcCalls", true))
+ , GrpcCallsInitialized(GetCounter("GrpcCallsInitialized", true))
+ , DroppedMessages(GetCounter("DroppedMessages", true))
+ , DroppedBytes(GetCounter("DroppedBytes", true))
+ , ErrorsCount(GetCounter("ErrorsCount", true))
+ {
+ }
+}
diff --git a/library/cpp/unified_agent_client/counters.h b/library/cpp/unified_agent_client/counters.h
new file mode 100644
index 0000000000..3c2192c3c5
--- /dev/null
+++ b/library/cpp/unified_agent_client/counters.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/dynamic_counters_wrapper.h>
+
+namespace NUnifiedAgent {
+ struct TClientSessionCounters;
+
+ struct TClientCounters: public TDynamicCountersWrapper {
+ explicit TClientCounters(const NMonitoring::TDynamicCounterPtr& counters =
+ MakeIntrusive<NMonitoring::TDynamicCounters>());
+
+ NMonitoring::TDeprecatedCounter& ActiveSessionsCount;
+ NMonitoring::TDeprecatedCounter& ClientLogDroppedBytes;
+
+ public:
+ TIntrusivePtr<TClientSessionCounters> GetDefaultSessionCounters();
+ };
+
+ struct TClientSessionCounters: public TDynamicCountersWrapper {
+ explicit TClientSessionCounters(const NMonitoring::TDynamicCounterPtr& counters =
+ MakeIntrusive<NMonitoring::TDynamicCounters>());
+
+ NMonitoring::TDeprecatedCounter& ReceivedMessages;
+ NMonitoring::TDeprecatedCounter& ReceivedBytes;
+ NMonitoring::TDeprecatedCounter& AcknowledgedMessages;
+ NMonitoring::TDeprecatedCounter& AcknowledgedBytes;
+ NMonitoring::TDeprecatedCounter& InflightMessages;
+ NMonitoring::TDeprecatedCounter& InflightBytes;
+ NMonitoring::TDeprecatedCounter& GrpcWriteBatchRequests;
+ NMonitoring::TDeprecatedCounter& GrpcInflightMessages;
+ NMonitoring::TDeprecatedCounter& GrpcInflightBytes;
+ NMonitoring::TDeprecatedCounter& GrpcCalls;
+ NMonitoring::TDeprecatedCounter& GrpcCallsInitialized;
+ NMonitoring::TDeprecatedCounter& DroppedMessages;
+ NMonitoring::TDeprecatedCounter& DroppedBytes;
+ NMonitoring::TDeprecatedCounter& ErrorsCount;
+ };
+}
diff --git a/library/cpp/unified_agent_client/duration_counter.cpp b/library/cpp/unified_agent_client/duration_counter.cpp
new file mode 100644
index 0000000000..118778a226
--- /dev/null
+++ b/library/cpp/unified_agent_client/duration_counter.cpp
@@ -0,0 +1,41 @@
+#include "duration_counter.h"
+
+namespace NUnifiedAgent {
+ using namespace NMonitoring;
+
+ TDurationUsCounter::TDurationUsCounter(const TString& name, TDynamicCounters& owner)
+ : Counter(*owner.GetCounter(name, true))
+ , ActiveTimers()
+ , Lock()
+ {
+ }
+
+ NHPTimer::STime* TDurationUsCounter::Begin() {
+ with_lock (Lock) {
+ ActiveTimers.push_back(0);
+ auto& result = ActiveTimers.back();
+ NHPTimer::GetTime(&result);
+ return &result;
+ }
+ }
+
+ void TDurationUsCounter::End(NHPTimer::STime* startTime) {
+ with_lock (Lock) {
+ Counter += static_cast<ui64>(NHPTimer::GetTimePassed(startTime) * 1000000);
+ *startTime = 0;
+ while (!ActiveTimers.empty() && ActiveTimers.front() == 0) {
+ ActiveTimers.pop_front();
+ }
+ }
+ }
+
+ void TDurationUsCounter::Update() {
+ with_lock (Lock) {
+ for (auto& startTime : ActiveTimers) {
+ if (startTime != 0) {
+ Counter += static_cast<ui64>(NHPTimer::GetTimePassed(&startTime) * 1000000);
+ }
+ }
+ }
+ }
+}
diff --git a/library/cpp/unified_agent_client/duration_counter.h b/library/cpp/unified_agent_client/duration_counter.h
new file mode 100644
index 0000000000..dbdfc22ed4
--- /dev/null
+++ b/library/cpp/unified_agent_client/duration_counter.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <util/generic/deque.h>
+#include <util/system/hp_timer.h>
+#include <util/system/mutex.h>
+
+namespace NUnifiedAgent {
+ class TDurationUsCounter {
+ public:
+ class TScope {
+ public:
+ TScope(TDurationUsCounter& counter)
+ : Counter(counter)
+ , StartTime(Counter.Begin())
+ {
+ }
+
+ ~TScope() {
+ Counter.End(StartTime);
+ }
+
+ private:
+ TDurationUsCounter& Counter;
+ NHPTimer::STime* StartTime;
+ };
+
+ public:
+ TDurationUsCounter(const TString& name, NMonitoring::TDynamicCounters& owner);
+
+ NHPTimer::STime* Begin();
+
+ void End(NHPTimer::STime* startTime);
+
+ void Update();
+
+ private:
+ NMonitoring::TDeprecatedCounter& Counter;
+ TDeque<NHPTimer::STime> ActiveTimers;
+ TAdaptiveLock Lock;
+ };
+}
diff --git a/library/cpp/unified_agent_client/dynamic_counters_wrapper.h b/library/cpp/unified_agent_client/dynamic_counters_wrapper.h
new file mode 100644
index 0000000000..cac4c6813d
--- /dev/null
+++ b/library/cpp/unified_agent_client/dynamic_counters_wrapper.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NUnifiedAgent {
+ class TDynamicCountersWrapper: public TAtomicRefCount<TDynamicCountersWrapper> {
+ public:
+ explicit TDynamicCountersWrapper(const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters)
+ : Counters(counters)
+ {
+ }
+
+ virtual ~TDynamicCountersWrapper() = default;
+
+ const TIntrusivePtr<NMonitoring::TDynamicCounters>& Unwrap() const {
+ return Counters;
+ }
+
+ protected:
+ NMonitoring::TDeprecatedCounter& GetCounter(const TString& value, bool derivative) {
+ return *Counters->GetCounter(value, derivative);
+ }
+
+ private:
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
+ };
+
+ class TUpdatableCounters: public TDynamicCountersWrapper {
+ public:
+ using TDynamicCountersWrapper::TDynamicCountersWrapper;
+
+ virtual void Update() = 0;
+ };
+}
diff --git a/library/cpp/unified_agent_client/enum.h b/library/cpp/unified_agent_client/enum.h
new file mode 100644
index 0000000000..b21e21acb0
--- /dev/null
+++ b/library/cpp/unified_agent_client/enum.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <util/generic/serialized_enum.h>
+#include <util/generic/vector.h>
+
+namespace NUnifiedAgent {
+ namespace NPrivate {
+ using TEnumNames = TVector<const TString*>;
+
+ template <typename TEnum>
+ TEnumNames BuildEnumNames() {
+ const auto names = GetEnumNames<TEnum>();
+ auto result = TEnumNames(names.size());
+ size_t index = 0;
+ for (const auto& p: names) {
+ Y_VERIFY(static_cast<size_t>(p.first) == index);
+ result[index++] = &p.second;
+ }
+ return result;
+ }
+
+ template <typename TEnum>
+ inline const auto EnumNames = BuildEnumNames<TEnum>();
+ }
+
+ template <typename TEnum, typename = std::enable_if_t<std::is_enum_v<TEnum>>>
+ inline const TString& NameOf(TEnum val) noexcept {
+ return *NPrivate::EnumNames<TEnum>[static_cast<size_t>(val)];
+ }
+}
diff --git a/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp b/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp
new file mode 100644
index 0000000000..a9eb423d13
--- /dev/null
+++ b/library/cpp/unified_agent_client/examples/ua_grpc_client/main.cpp
@@ -0,0 +1,122 @@
+#include <library/cpp/unified_agent_client/client.h>
+
+#include <library/cpp/getopt/opt.h>
+
+#include <util/string/split.h>
+
+using namespace NUnifiedAgent;
+
+class TOptions {
+public:
+ TString Uri;
+ TString SharedSecretKey;
+ TString SessionId;
+ TString SessionMeta;
+
+ TOptions(int argc, const char* argv[]) {
+ NLastGetopt::TOpts opts;
+ TString logPriorityStr;
+
+ opts
+ .AddLongOption("uri")
+ .RequiredArgument()
+ .Required()
+ .StoreResult(&Uri);
+ opts
+ .AddLongOption("shared-secret-key")
+ .RequiredArgument()
+ .Optional()
+ .StoreResult(&SharedSecretKey);
+ opts
+ .AddLongOption("session-id")
+ .RequiredArgument()
+ .Optional()
+ .StoreResult(&SessionId);
+ opts
+ .AddLongOption("session-meta", "key-value pairs separated by comma, e.g. 'k1=v1,k2=v2'")
+ .RequiredArgument()
+ .Optional()
+ .StoreResult(&SessionMeta);
+
+ opts.AddHelpOption();
+ opts.AddVersionOption();
+ NLastGetopt::TOptsParseResult res(&opts, argc, argv);
+ }
+};
+
+bool TryParseMeta(const TString& s, THashMap<TString, TString>& meta) {
+ for (auto& t: StringSplitter(s).Split(',')) {
+ TString key;
+ TString value;
+ if (!StringSplitter(t.Token()).Split('=').TryCollectInto(&key, &value)) {
+ Cout << "invalid meta, can't extract key-value pair from [" << t.Token() << "]" << Endl;
+ return false;
+ }
+ meta[key] = value;
+ }
+ return true;
+}
+
+bool TryParseLine(const TString& line, TVector<TString>& lineItems) {
+ lineItems = StringSplitter(line).Split('|').ToList<TString>();
+ Y_VERIFY(lineItems.size() >= 1);
+ if (lineItems.size() > 2) {
+ Cout << "invalid line format, expected 'k1=v1,k2=v2|payload' or just 'payload'" << Endl;
+ return false;
+ }
+ return true;
+}
+
+int main(int argc, const char* argv[]) {
+ TOptions options(argc, argv);
+
+ TClientSessionPtr sessionPtr;
+ {
+ TLog emptyLog;
+ auto clientParameters = TClientParameters(options.Uri).SetLog(emptyLog);
+ if (!options.SharedSecretKey.Empty()) {
+ clientParameters.SetSharedSecretKey(options.SharedSecretKey);
+ }
+ auto clientPtr = MakeClient(clientParameters);
+ auto sessionParameters = TSessionParameters();
+ if (!options.SessionId.Empty()) {
+ sessionParameters.SetSessionId(options.SessionId);
+ }
+ if (!options.SessionMeta.empty()) {
+ THashMap<TString, TString> sessionMeta;
+ if (!TryParseMeta(options.SessionMeta, sessionMeta)) {
+ return -1;
+ }
+ sessionParameters.SetMeta(sessionMeta);
+ }
+ sessionPtr = clientPtr->CreateSession(sessionParameters);
+ }
+
+ TString line;
+ while (true) {
+ Cin.ReadLine(line);
+ if (line.Empty()) {
+ break;
+ }
+
+ TVector<TString> lineItems;
+ if (!TryParseLine(line, lineItems)) {
+ continue;
+ }
+
+ TClientMessage clientMessage;
+ clientMessage.Payload = lineItems.back();
+ if (lineItems.size() == 2) {
+ THashMap<TString, TString> messageMeta;
+ if (!TryParseMeta(lineItems[0], messageMeta)) {
+ continue;
+ }
+ clientMessage.Meta = std::move(messageMeta);
+ }
+ sessionPtr->Send(std::move(clientMessage));
+ }
+
+ sessionPtr->Close();
+
+ return 0;
+}
diff --git a/library/cpp/unified_agent_client/f_maybe.h b/library/cpp/unified_agent_client/f_maybe.h
new file mode 100644
index 0000000000..7abd4c0aac
--- /dev/null
+++ b/library/cpp/unified_agent_client/f_maybe.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <util/generic/maybe.h>
+
+namespace NUnifiedAgent {
+ template <typename T>
+ using TFMaybe = TMaybe<T, ::NMaybe::TPolicyUndefinedFail>;
+
+ template <class T>
+ inline constexpr TFMaybe<std::decay_t<T>> MakeFMaybe(T&& value) {
+ return TMaybe<std::decay_t<T>, ::NMaybe::TPolicyUndefinedFail>(std::forward<T>(value));
+ }
+
+ template <class T, class... TArgs>
+ inline constexpr TFMaybe<T> MakeFMaybe(TArgs&&... args) {
+ return TFMaybe<T>(typename TFMaybe<T>::TInPlace{}, std::forward<TArgs>(args)...);
+ }
+
+ template <class T>
+ inline constexpr TFMaybe<std::decay_t<T>> MakeFMaybe(const TMaybe<T>& source) {
+ return source.Defined() ? MakeFMaybe(*source) : Nothing();
+ }
+}
diff --git a/library/cpp/unified_agent_client/grpc_io.cpp b/library/cpp/unified_agent_client/grpc_io.cpp
new file mode 100644
index 0000000000..6d237d75ec
--- /dev/null
+++ b/library/cpp/unified_agent_client/grpc_io.cpp
@@ -0,0 +1,161 @@
+#include "grpc_io.h"
+
+#include <contrib/libs/grpc/src/core/lib/iomgr/executor.h>
+#include <contrib/libs/grpc/src/core/lib/surface/completion_queue.h>
+#include <contrib/libs/grpc/include/grpc/impl/codegen/log.h>
+
+#include <util/generic/yexception.h>
+#include <util/string/cast.h>
+#include <util/system/env.h>
+#include <util/system/mutex.h>
+#include <util/system/thread.h>
+
+namespace NUnifiedAgent {
+ namespace {
+ std::once_flag GrpcConfigured{};
+ }
+
+ TGrpcNotification::TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
+ : CompletionQueue(completionQueue)
+ , IOCallback(std::move(ioCallback))
+ , Completion(MakeHolder<grpc_cq_completion>())
+ , InQueue(false)
+ {
+ }
+
+ TGrpcNotification::~TGrpcNotification() = default;
+
+ void TGrpcNotification::Trigger() {
+ {
+ bool inQueue = false;
+ if (!InQueue.compare_exchange_strong(inQueue, true)) {
+ return;
+ }
+ }
+ grpc_core::ApplicationCallbackExecCtx callbackExecCtx;
+ grpc_core::ExecCtx execCtx;
+ IOCallback->Ref();
+ Y_VERIFY(grpc_cq_begin_op(CompletionQueue.cq(), this));
+ grpc_cq_end_op(CompletionQueue.cq(), this, nullptr,
+ [](void* self, grpc_cq_completion*) {
+ Y_VERIFY(static_cast<TGrpcNotification*>(self)->InQueue.exchange(false));
+ },
+ this, Completion.Get());
+ }
+
+ bool TGrpcNotification::FinalizeResult(void** tag, bool*) {
+ *tag = IOCallback.Get();
+ return true;
+ }
+
+ TGrpcTimer::TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback)
+ : CompletionQueue(completionQueue)
+ , IOCallback(std::move(ioCallback))
+ , Alarm()
+ , AlarmIsSet(false)
+ , NextTriggerTime(Nothing())
+ {
+ }
+
+ void TGrpcTimer::Set(TInstant triggerTime) {
+ if (AlarmIsSet) {
+ NextTriggerTime = triggerTime;
+ Alarm.Cancel();
+ } else {
+ AlarmIsSet = true;
+ Alarm.Set(&CompletionQueue, InstantToTimespec(triggerTime), Ref());
+ }
+ }
+
+ void TGrpcTimer::Cancel() {
+ NextTriggerTime.Clear();
+ if (AlarmIsSet) {
+ Alarm.Cancel();
+ }
+ }
+
+ IIOCallback* TGrpcTimer::Ref() {
+ IOCallback->Ref();
+ return this;
+ }
+
+ void TGrpcTimer::OnIOCompleted(EIOStatus status) {
+ Y_VERIFY(AlarmIsSet);
+ if (NextTriggerTime) {
+ Alarm.Set(&CompletionQueue, InstantToTimespec(*NextTriggerTime), this);
+ NextTriggerTime.Clear();
+ } else {
+ AlarmIsSet = false;
+ IOCallback->OnIOCompleted(status);
+ }
+ }
+
+ TGrpcCompletionQueuePoller::TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue)
+ : Queue(queue)
+ , Thread()
+ {
+ }
+
+ void TGrpcCompletionQueuePoller::Start() {
+ Thread = std::thread([this]() {
+ TThread::SetCurrentThreadName("ua_grpc_cq");
+ void* tag;
+ bool ok;
+ while (Queue.Next(&tag, &ok)) {
+ try {
+ static_cast<IIOCallback*>(tag)->OnIOCompleted(ok ? EIOStatus::Ok : EIOStatus::Error);
+ } catch (...) {
+ Y_FAIL("unexpected exception [%s]", CurrentExceptionMessage().c_str());
+ }
+ }
+ });
+ }
+
+ void TGrpcCompletionQueuePoller::Join() {
+ Thread.join();
+ }
+
+ TGrpcCompletionQueueHost::TGrpcCompletionQueueHost()
+ : CompletionQueue()
+ , Poller(CompletionQueue)
+ {
+ }
+
+ void TGrpcCompletionQueueHost::Start() {
+ Poller.Start();
+ }
+
+ void TGrpcCompletionQueueHost::Stop() {
+ CompletionQueue.Shutdown();
+ Poller.Join();
+ }
+
+ gpr_timespec InstantToTimespec(TInstant instant) {
+ gpr_timespec result;
+ result.clock_type = GPR_CLOCK_REALTIME;
+ result.tv_sec = static_cast<int64_t>(instant.Seconds());
+ result.tv_nsec = instant.NanoSecondsOfSecond();
+ return result;
+ }
+
+ void EnsureGrpcConfigured() {
+ std::call_once(GrpcConfigured, []() {
+ const auto limitStr = GetEnv("UA_GRPC_EXECUTOR_THREADS_LIMIT");
+ ui64 limit;
+ if (limitStr.Empty() || !TryFromString(limitStr, limit)) {
+ limit = 2;
+ }
+ grpc_core::Executor::SetThreadsLimit(limit);
+ });
+ }
+
+ void StartGrpcTracing() {
+ grpc_tracer_set_enabled("all", true);
+ gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
+ }
+
+ void FinishGrpcTracing() {
+ grpc_tracer_set_enabled("all", false);
+ gpr_set_log_verbosity(GPR_LOG_SEVERITY_ERROR);
+ }
+}
diff --git a/library/cpp/unified_agent_client/grpc_io.h b/library/cpp/unified_agent_client/grpc_io.h
new file mode 100644
index 0000000000..5f368a5943
--- /dev/null
+++ b/library/cpp/unified_agent_client/grpc_io.h
@@ -0,0 +1,141 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/async_joiner.h>
+#include <library/cpp/unified_agent_client/f_maybe.h>
+
+#include <contrib/libs/grpc/include/grpcpp/alarm.h>
+#include <contrib/libs/grpc/include/grpc++/grpc++.h>
+
+#include <thread>
+
+struct grpc_cq_completion;
+
+namespace NUnifiedAgent {
+ enum class EIOStatus {
+ Ok,
+ Error
+ };
+
+ class IIOCallback {
+ public:
+ virtual ~IIOCallback() = default;
+
+ virtual IIOCallback* Ref() = 0;
+
+ virtual void OnIOCompleted(EIOStatus status) = 0;
+ };
+
+ template<typename TCallback, typename TCounter>
+ class TIOCallback: public IIOCallback {
+ public:
+ explicit TIOCallback(TCallback&& callback, TCounter* counter)
+ : Callback(std::move(callback))
+ , Counter(counter)
+ {
+ }
+
+ IIOCallback* Ref() override {
+ Counter->Ref();
+ return this;
+ }
+
+ void OnIOCompleted(EIOStatus status) override {
+ Callback(status);
+ Counter->UnRef();
+ }
+
+ private:
+ TCallback Callback;
+ TCounter* Counter;
+ };
+
+ template<typename TCallback, typename TCounter>
+ THolder<IIOCallback> MakeIOCallback(TCallback&& callback, TCounter* counter) {
+ return MakeHolder<TIOCallback<TCallback, TCounter>>(std::move(callback), counter);
+ }
+
+ template<typename TTarget, typename TCounter = TTarget>
+ THolder<IIOCallback> MakeIOCallback(TTarget* target, void (TTarget::*method)(EIOStatus),
+ TCounter* counter = nullptr)
+ {
+ return MakeIOCallback([target, method](EIOStatus status) { ((*target).*method)(status); },
+ counter ? counter : target);
+ }
+
+ class TGrpcNotification: private ::grpc::internal::CompletionQueueTag {
+ public:
+ TGrpcNotification(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback);
+
+ ~TGrpcNotification();
+
+ void Trigger();
+
+ private:
+ bool FinalizeResult(void** tag, bool* status) override;
+
+ private:
+ grpc::CompletionQueue& CompletionQueue;
+ THolder<IIOCallback> IOCallback;
+ THolder<grpc_cq_completion> Completion;
+ std::atomic<bool> InQueue;
+ };
+
+ class TGrpcTimer: private IIOCallback {
+ public:
+ TGrpcTimer(grpc::CompletionQueue& completionQueue, THolder<IIOCallback>&& ioCallback);
+
+ void Set(TInstant triggerTime);
+
+ void Cancel();
+
+ private:
+ IIOCallback* Ref() override;
+
+ void OnIOCompleted(EIOStatus status) override;
+
+ private:
+ grpc::CompletionQueue& CompletionQueue;
+ THolder<IIOCallback> IOCallback;
+ grpc::Alarm Alarm;
+ bool AlarmIsSet;
+ TFMaybe<TInstant> NextTriggerTime;
+ };
+
+ class TGrpcCompletionQueuePoller {
+ public:
+ explicit TGrpcCompletionQueuePoller(grpc::CompletionQueue& queue);
+
+ void Start();
+
+ void Join();
+
+ private:
+ grpc::CompletionQueue& Queue;
+ std::thread Thread;
+ };
+
+ class TGrpcCompletionQueueHost {
+ public:
+ TGrpcCompletionQueueHost();
+
+ void Start();
+
+ void Stop();
+
+ inline grpc::CompletionQueue& GetCompletionQueue() noexcept {
+ return CompletionQueue;
+ }
+
+ private:
+ grpc::CompletionQueue CompletionQueue;
+ TGrpcCompletionQueuePoller Poller;
+ };
+
+ gpr_timespec InstantToTimespec(TInstant instant);
+
+ void EnsureGrpcConfigured();
+
+ void StartGrpcTracing();
+
+ void FinishGrpcTracing();
+}
diff --git a/library/cpp/unified_agent_client/grpc_status_code.cpp b/library/cpp/unified_agent_client/grpc_status_code.cpp
new file mode 100644
index 0000000000..662bbbe7a5
--- /dev/null
+++ b/library/cpp/unified_agent_client/grpc_status_code.cpp
@@ -0,0 +1,56 @@
+#include <contrib/libs/grpc/include/grpcpp/impl/codegen/status_code_enum.h>
+
+#include <util/stream/output.h>
+
+namespace {
+ const char* GrpcStatusCodeToString(grpc::StatusCode statusCode) {
+ switch (statusCode) {
+ case grpc::OK:
+ return "OK";
+ case grpc::CANCELLED:
+ return "CANCELLED";
+ case grpc::UNKNOWN:
+ return "UNKNOWN";
+ case grpc::INVALID_ARGUMENT:
+ return "INVALID_ARGUMENT";
+ case grpc::DEADLINE_EXCEEDED:
+ return "DEADLINE_EXCEEDED";
+ case grpc::NOT_FOUND:
+ return "NOT_FOUND";
+ case grpc::ALREADY_EXISTS:
+ return "ALREADY_EXISTS";
+ case grpc::PERMISSION_DENIED:
+ return "PERMISSION_DENIED";
+ case grpc::UNAUTHENTICATED:
+ return "UNAUTHENTICATED";
+ case grpc::RESOURCE_EXHAUSTED:
+ return "RESOURCE_EXHAUSTED";
+ case grpc::FAILED_PRECONDITION:
+ return "FAILED_PRECONDITION";
+ case grpc::ABORTED:
+ return "ABORTED";
+ case grpc::OUT_OF_RANGE:
+ return "OUT_OF_RANGE";
+ case grpc::UNIMPLEMENTED:
+ return "UNIMPLEMENTED";
+ case grpc::INTERNAL:
+ return "INTERNAL";
+ case grpc::UNAVAILABLE:
+ return "UNAVAILABLE";
+ case grpc::DATA_LOSS:
+ return "DATA_LOSS";
+ default:
+ return nullptr;
+ }
+ }
+}
+
+template <>
+void Out<grpc::StatusCode>(IOutputStream& o, grpc::StatusCode statusCode) {
+ const auto* s = GrpcStatusCodeToString(statusCode);
+ if (s == nullptr) {
+ o << "grpc::StatusCode [" << static_cast<int>(statusCode) << "]";
+ } else {
+ o << s;
+ }
+}
diff --git a/library/cpp/unified_agent_client/helpers.cpp b/library/cpp/unified_agent_client/helpers.cpp
new file mode 100644
index 0000000000..335cc4e323
--- /dev/null
+++ b/library/cpp/unified_agent_client/helpers.cpp
@@ -0,0 +1,12 @@
+#include "helpers.h"
+
+namespace NUnifiedAgent::NPrivate {
+ bool IsUtf8(const THashMap<TString, TString>& meta) {
+ for (const auto& p: meta) {
+ if (!IsUtf(p.first) || !IsUtf(p.second)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git a/library/cpp/unified_agent_client/helpers.h b/library/cpp/unified_agent_client/helpers.h
new file mode 100644
index 0000000000..33defa2b49
--- /dev/null
+++ b/library/cpp/unified_agent_client/helpers.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include "client.h"
+
+#include <util/charset/utf8.h>
+
+namespace NUnifiedAgent::NPrivate {
+ bool IsUtf8(const THashMap<TString, TString>& meta);
+}
diff --git a/library/cpp/unified_agent_client/logger.cpp b/library/cpp/unified_agent_client/logger.cpp
new file mode 100644
index 0000000000..e9c713f0d0
--- /dev/null
+++ b/library/cpp/unified_agent_client/logger.cpp
@@ -0,0 +1,130 @@
+#include "logger.h"
+
+#include <library/cpp/unified_agent_client/clock.h>
+
+#include <library/cpp/logger/log.h>
+
+#include <util/datetime/base.h>
+#include <util/stream/str.h>
+#include <util/system/getpid.h>
+#include <util/system/thread.h>
+
+namespace NUnifiedAgent {
+ namespace {
+ TString FormatLogLine(ELogPriority logLevel, const TStringBuf message, const TString& scope) {
+ TString result;
+ {
+ TStringOutput output(result);
+ output << FormatIsoLocal(TClock::Now())
+ << " " << GetPID()
+ << " " << TThread::CurrentThreadId()
+ << " " << logLevel;
+ if (!scope.Empty()) {
+ output << " " << scope;
+ }
+ output << " " << message << "\n";
+ }
+ return result;
+ }
+ }
+
+ TLogger::TThrottlerWithLock::TThrottlerWithLock(size_t rateLimitBytes)
+ : Throttler(rateLimitBytes, rateLimitBytes / 2)
+ , Lock()
+ {
+ }
+
+ bool TLogger::TThrottlerWithLock::TryConsume(double tokens) {
+ with_lock(Lock) {
+ return Throttler.TryConsume(tokens);
+ }
+ }
+
+ TLogger::TLogger(TLog& log, TFMaybe<size_t> rateLimitBytes)
+ : DefaultLogContext{log, log.IsNullLog() ? ELogPriority::TLOG_EMERG : log.FiltrationLevel()}
+ , TracingLogContexts()
+ , CurrentLogContext_()
+ , Errors(nullptr)
+ , DroppedBytes(nullptr)
+ , Throttler(rateLimitBytes.Defined() ? MakeHolder<TThrottlerWithLock>(*rateLimitBytes) : nullptr)
+ , Lock()
+ {
+ SetCurrentLogContext(DefaultLogContext);
+ }
+
+ void TLogger::SetCurrentLogContext(TLogContext& logContext) {
+ CurrentLogContext_.store(logContext.Log.IsNullLog() ? nullptr : &logContext, std::memory_order_release);
+ }
+
+ void TLogger::Log(TLog& log, ELogPriority logPriority, const TStringBuf message, const TString& scope) const {
+ try {
+ const auto logLine = FormatLogLine(logPriority, message, scope);
+ if (Throttler && &log == &DefaultLogContext.Log && !Throttler->TryConsume(logLine.size())) {
+ if (DroppedBytes) {
+ DroppedBytes->Add(logLine.size());
+ }
+ return;
+ }
+ log.Write(logPriority, logLine);
+ } catch (...) {
+ }
+ }
+
+ void TLogger::StartTracing(ELogPriority logPriority) noexcept {
+ with_lock(Lock) {
+ auto& logContext = GetOrCreateTracingLogContext(logPriority);
+ SetTracing(logContext, "started");
+ }
+ }
+
+ void TLogger::FinishTracing() noexcept {
+ with_lock(Lock) {
+ SetTracing(DefaultLogContext, "finished");
+ }
+ }
+
+ void TLogger::SetTracing(TLogContext& logContext, const char* action) {
+ // Lock must be held
+
+ SetCurrentLogContext(logContext);
+
+ Log(logContext.Log,
+ TLOG_INFO,
+ Sprintf("tracing %s, log priority is set to [%s]",
+ action, ToString(logContext.Priority).c_str()),
+ "");
+ }
+
+ auto TLogger::GetOrCreateTracingLogContext(ELogPriority logPriority) -> TLogContext& {
+ // Lock must be held
+
+ for (const auto& c: TracingLogContexts) {
+ if (c->Priority == logPriority) {
+ return *c;
+ }
+ }
+
+ auto newLogContext = MakeHolder<TLogContext>();
+ newLogContext->Log = TLog("cerr", logPriority);
+ newLogContext->Priority = logPriority;
+ auto* result = newLogContext.Get();
+ TracingLogContexts.push_back(std::move(newLogContext));
+ return *result;
+ }
+
+ TScopeLogger::TScopeLogger()
+ : Logger(nullptr)
+ , Scope()
+ , Errors(nullptr)
+ {
+ }
+
+ TScopeLogger::TScopeLogger(TLogger* logger,
+ const TString& scope,
+ NMonitoring::TDeprecatedCounter* errors)
+ : Logger(logger)
+ , Scope(scope)
+ , Errors(errors)
+ {
+ }
+}
diff --git a/library/cpp/unified_agent_client/logger.h b/library/cpp/unified_agent_client/logger.h
new file mode 100644
index 0000000000..d83cba92de
--- /dev/null
+++ b/library/cpp/unified_agent_client/logger.h
@@ -0,0 +1,157 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/f_maybe.h>
+#include <library/cpp/unified_agent_client/throttling.h>
+
+#include <library/cpp/logger/log.h>
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <util/generic/string.h>
+#include <util/string/join.h>
+#include <util/string/printf.h>
+#include <util/system/file.h>
+
+#include <atomic>
+
+#define YLOG(logPriority, message, logger) \
+ do { \
+ const auto __logPriority = logPriority; \
+ if (auto* log = logger.Accept(__logPriority, false); log != nullptr) { \
+ logger.Log(*log, __logPriority, message); \
+ } \
+ } while (false)
+
+#define YLOG_EMERG(msg) YLOG(TLOG_EMERG, msg, Logger)
+#define YLOG_ALERT(msg) YLOG(TLOG_ALERT, msg, Logger)
+#define YLOG_CRIT(msg) YLOG(TLOG_CRIT, msg, Logger)
+#define YLOG_ERR(msg) YLOG(TLOG_ERR, msg, Logger)
+#define YLOG_WARNING(msg) YLOG(TLOG_WARNING, msg, Logger)
+#define YLOG_NOTICE(msg) YLOG(TLOG_NOTICE, msg, Logger)
+#define YLOG_INFO(msg) YLOG(TLOG_INFO, msg, Logger)
+#define YLOG_DEBUG(msg) YLOG(TLOG_DEBUG, msg, Logger)
+#define YLOG_RESOURCES(msg) YLOG(TLOG_RESOURCES , msg, Logger)
+
+#define YLOG_FATAL(msg) \
+ YLOG(TLOG_CRIT, msg, Logger); \
+ _Exit(1);
+
+namespace NUnifiedAgent {
+ class TScopeLogger;
+
+ class TLogger {
+ public:
+ TLogger(TLog& log, TFMaybe<size_t> rateLimitBytes);
+
+ void StartTracing(ELogPriority logPriority) noexcept;
+
+ void FinishTracing() noexcept;
+
+ inline TScopeLogger Child(const TString& v, NMonitoring::TDeprecatedCounter* errors = nullptr);
+
+ inline void SetErrorsCounter(NMonitoring::TDeprecatedCounter* counter) noexcept {
+ Errors = counter;
+ }
+
+ inline void SetDroppedBytesCounter(NMonitoring::TDeprecatedCounter* counter) noexcept {
+ DroppedBytes = counter;
+ }
+
+ inline bool HasRateLimit() const noexcept {
+ return Throttler != nullptr;
+ }
+
+ friend class TScopeLogger;
+
+ private:
+ void Log(TLog& log, ELogPriority logPriority, const TStringBuf message, const TString& scope) const;
+
+ inline TLog* Accept(ELogPriority logPriority, NMonitoring::TDeprecatedCounter* errors) const noexcept {
+ if ((logPriority <= TLOG_ERR) && (errors != nullptr)) {
+ ++(*errors);
+ }
+ auto* result = CurrentLogContext_.load(std::memory_order_acquire);
+ return result != nullptr && static_cast<int>(logPriority) <= static_cast<int>(result->Priority)
+ ? &result->Log
+ : nullptr;
+ }
+
+ private:
+ struct TLogContext {
+ TLog Log;
+ ELogPriority Priority;
+ };
+
+ class TThrottlerWithLock {
+ public:
+ explicit TThrottlerWithLock(size_t rateLimitBytes);
+
+ bool TryConsume(double tokens);
+
+ private:
+ TThrottler Throttler;
+ TAdaptiveLock Lock;
+ };
+
+ private:
+ void SetCurrentLogContext(TLogContext& logContext);
+
+ TLogContext& GetOrCreateTracingLogContext(ELogPriority logPriority);
+
+ void SetTracing(TLogContext& logContext, const char* action);
+
+ private:
+ TLogContext DefaultLogContext;
+ TVector<THolder<TLogContext>> TracingLogContexts;
+ std::atomic<TLogContext*> CurrentLogContext_;
+ NMonitoring::TDeprecatedCounter* Errors;
+ NMonitoring::TDeprecatedCounter* DroppedBytes;
+ const THolder<TThrottlerWithLock> Throttler;
+ TAdaptiveLock Lock;
+ };
+
+ class TScopeLogger {
+ public:
+ TScopeLogger();
+
+ inline void Log(TLog& log, ELogPriority logPriority, const TStringBuf message) const {
+ if (Logger) {
+ Logger->Log(log, logPriority, message, Scope);
+ }
+ }
+
+ inline TLog* Accept(ELogPriority logPriority, bool silent) const noexcept {
+ return Logger ? Logger->Accept(logPriority, silent ? nullptr : Errors) : nullptr;
+ }
+
+ inline TScopeLogger Child(const TString& v, NMonitoring::TDeprecatedCounter* errors = nullptr) {
+ return Logger
+ ? Logger->Child(Join('/', Scope, v), errors == nullptr ? Errors : errors)
+ : TScopeLogger();
+ }
+
+ inline TLogger* Unwrap() noexcept {
+ return Logger;
+ }
+
+ friend class TLogger;
+
+ private:
+ TScopeLogger(TLogger* logger,
+ const TString& scope,
+ NMonitoring::TDeprecatedCounter* errors);
+
+ private:
+ TLogger* Logger;
+ TString Scope;
+ NMonitoring::TDeprecatedCounter* Errors;
+ };
+
+ inline TScopeLogger TLogger::Child(const TString& v, NMonitoring::TDeprecatedCounter* errors) {
+ return TScopeLogger(this, v, errors == nullptr ? Errors : errors);
+ }
+
+ inline ELogPriority ToLogPriority(int level) noexcept {
+ const auto result = ClampVal(level, 0, static_cast<int>(TLOG_RESOURCES));
+ return static_cast<ELogPriority>(result);
+ }
+}
diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt b/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..7cd90f489d
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto/CMakeLists.darwin.txt
@@ -0,0 +1,45 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-unified_agent_client-proto)
+set_property(TARGET cpp-unified_agent_client-proto PROPERTY
+ PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h
+)
+target_link_libraries(cpp-unified_agent_client-proto PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ tools-enum_parser-enum_serialization_runtime
+ contrib-libs-protobuf
+)
+target_proto_messages(cpp-unified_agent_client-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto
+)
+generate_enum_serilization(cpp-unified_agent_client-proto
+ ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/proto/unified_agent.pb.h
+)
+target_proto_addincls(cpp-unified_agent_client-proto
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(cpp-unified_agent_client-proto
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
+target_proto_plugin(cpp-unified_agent_client-proto
+ grpc_cpp
+ grpc_cpp
+)
diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt b/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..db3af9681d
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,46 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-unified_agent_client-proto)
+set_property(TARGET cpp-unified_agent_client-proto PROPERTY
+ PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h
+)
+target_link_libraries(cpp-unified_agent_client-proto PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ tools-enum_parser-enum_serialization_runtime
+ contrib-libs-protobuf
+)
+target_proto_messages(cpp-unified_agent_client-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto
+)
+generate_enum_serilization(cpp-unified_agent_client-proto
+ ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/proto/unified_agent.pb.h
+)
+target_proto_addincls(cpp-unified_agent_client-proto
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(cpp-unified_agent_client-proto
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
+target_proto_plugin(cpp-unified_agent_client-proto
+ grpc_cpp
+ grpc_cpp
+)
diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt b/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt
new file mode 100644
index 0000000000..db3af9681d
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto/CMakeLists.linux.txt
@@ -0,0 +1,46 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(cpp-unified_agent_client-proto)
+set_property(TARGET cpp-unified_agent_client-proto PROPERTY
+ PROTOC_EXTRA_OUTS .grpc.pb.cc .grpc.pb.h
+)
+target_link_libraries(cpp-unified_agent_client-proto PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-grpc
+ tools-enum_parser-enum_serialization_runtime
+ contrib-libs-protobuf
+)
+target_proto_messages(cpp-unified_agent_client-proto PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/unified_agent_client/proto/unified_agent.proto
+)
+generate_enum_serilization(cpp-unified_agent_client-proto
+ ${CMAKE_BINARY_DIR}/library/cpp/unified_agent_client/proto/unified_agent.pb.h
+ INCLUDE_HEADERS
+ library/cpp/unified_agent_client/proto/unified_agent.pb.h
+)
+target_proto_addincls(cpp-unified_agent_client-proto
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(cpp-unified_agent_client-proto
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
+target_proto_plugin(cpp-unified_agent_client-proto
+ grpc_cpp
+ grpc_cpp
+)
diff --git a/library/cpp/unified_agent_client/proto/CMakeLists.txt b/library/cpp/unified_agent_client/proto/CMakeLists.txt
new file mode 100644
index 0000000000..3e0811fb22
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/library/cpp/unified_agent_client/proto/unified_agent.proto b/library/cpp/unified_agent_client/proto/unified_agent.proto
new file mode 100644
index 0000000000..68efe35747
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto/unified_agent.proto
@@ -0,0 +1,101 @@
+syntax = "proto3";
+import "google/protobuf/descriptor.proto";
+
+package NUnifiedAgentProto;
+
+option java_package = "com.yandex.unified_agent";
+option go_package = "a.yandex-team.ru/library/cpp/unified_agent_client/proto;unifiedagent";
+
+extend google.protobuf.FileOptions {
+ bool GenerateYaStyle = 66777;
+}
+
+message Request {
+ message SessionMetaItem {
+ string name = 1;
+ string value = 2;
+ }
+
+ message Initialize {
+ // Session_id provided by server, use it in case of reconnects.
+ string session_id = 1;
+
+ // Session metadata
+ repeated SessionMetaItem meta = 2;
+
+ string shared_secret_key = 3;
+ }
+
+ message MessageMetaItem {
+ // Arbitrary key-value pairs. Can be used by agent filters to modify/filter messages
+ // or to route them to target outputs.
+
+ // Meta items of all messages should be grouped by meta key, it's expected in the 'key' field.
+ // Meta values should be passed in the 'value' sequence, it corresponds to the payload
+ // sequence from DataBatch. If some messages don't have a meta with this key, the range of such messages
+ // can be passed via skip_start/skip_length sequences.
+ // For example, [{m:v1}, {}, {}, {m: v2}, {}, {m: v3}, {}, {}] can be represented as follows:
+ // key: 'm'
+ // value: ['v1', 'v2', 'v3']
+ // skip_start: [1, 4]
+ // skip_length: [2, 1]
+
+ string key = 1;
+ repeated string value = 2;
+ repeated uint32 skip_start = 3;
+ repeated uint32 skip_length = 4;
+ }
+
+ message DataBatch {
+ repeated uint64 seq_no = 1;
+ repeated uint64 timestamp = 2; //microseconds
+ repeated bytes payload = 100;
+ repeated MessageMetaItem meta = 101;
+ }
+
+ oneof request {
+ Initialize initialize = 1;
+ DataBatch data_batch = 2;
+ }
+}
+
+message Response {
+ message Initialized {
+ // Session identifier for log and deduplication purposes.
+ string session_id = 1;
+
+ // Application can skip all formed messages by seq_no upto last_seq_no - they are consumed by server.
+ uint64 last_seq_no = 2;
+ }
+
+ message Ack {
+ uint64 seq_no = 1;
+ }
+
+ oneof response {
+ Initialized initialized = 1;
+ Ack ack = 2;
+ }
+}
+
+service UnifiedAgentService {
+ rpc Session(stream Request) returns (stream Response);
+}
+
+
+// dataflow:
+// Request.initialize -> UnifiedAgent;
+// specify session_id when this is a retry. Сlient can already have sesison_id from previous init response,
+// or it can use some pregenerated sessionId for each session.
+// Response.initializeded -> client;
+// Request.entry -> UnifiedAgent;
+// ....
+// Response.ack -> client;
+// when this record is consumed by UnifiedAgent with choosen garanties UnifiedAgent will send ack to client;
+// client can forget about this log record now
+//
+// grpc finish session -> client;
+// something went wrong; client must reconnect and retry all not acknowleged records
+//
+// Exactly once retries - when reconnect, client must provide previous session_id and same seq_no`s
+// for records - only in this case UnifiedAgent can dedup.
diff --git a/library/cpp/unified_agent_client/proto_weighing.cpp b/library/cpp/unified_agent_client/proto_weighing.cpp
new file mode 100644
index 0000000000..7a532213ea
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto_weighing.cpp
@@ -0,0 +1,99 @@
+#include "proto_weighing.h"
+
+#include <google/protobuf/io/coded_stream.h>
+
+namespace NUnifiedAgent::NPW {
+ template <typename T>
+ inline size_t SizeOf(T value);
+
+ using CodedOutputStream = google::protobuf::io::CodedOutputStream;
+
+ template <>
+ inline size_t SizeOf(ui64 value) {
+ return CodedOutputStream::VarintSize64(value);
+ }
+
+ template <>
+ inline size_t SizeOf(ui32 value) {
+ return CodedOutputStream::VarintSize32(value);
+ }
+
+ template <>
+ inline size_t SizeOf(i64 value) {
+ return CodedOutputStream::VarintSize64(static_cast<google::protobuf::uint64>(value));
+ }
+
+ TFieldLink::TFieldLink(TLengthDelimited* container, bool repeated, size_t keySize)
+ : Container(container)
+ , OuterSize(0)
+ , Repeated(repeated)
+ , KeySize(keySize)
+ {
+ }
+
+ void TFieldLink::SetValueSize(bool empty, size_t size) {
+ const auto newOuterSize = empty && !Repeated ? 0 : KeySize + static_cast<int>(size);
+ Container->IncSize(newOuterSize - OuterSize);
+ OuterSize = newOuterSize;
+ }
+
+ TLengthDelimited::TLengthDelimited(const TFMaybe<TFieldLink>& link)
+ : Link(link)
+ , ByteSize(0)
+ {
+ }
+
+ void TLengthDelimited::IncSize(int sizeDelta) {
+ ByteSize += sizeDelta;
+ if (Link) {
+ const auto byteSize = static_cast<ui32>(ByteSize);
+ Link->SetValueSize(false, byteSize + SizeOf(byteSize));
+ }
+ }
+
+ template <typename T>
+ void TRepeatedField<T>::Add(T value) {
+ IncSize(static_cast<int>(SizeOf(value)));
+ }
+
+ template <typename T>
+ TNumberField<T>::TNumberField(const TFieldLink& link)
+ : Link(link)
+ {
+ }
+
+ template <typename T>
+ void TNumberField<T>::SetValue(T value) {
+ Link.SetValueSize(value == 0, SizeOf(value));
+ }
+
+ template <typename T>
+ TFixedNumberField<T>::TFixedNumberField(const TFieldLink& link)
+ : Link(link)
+ {
+ }
+
+ template <typename T>
+ void TFixedNumberField<T>::SetValue() {
+ Link.SetValueSize(false, sizeof(T));
+ }
+
+ TStringField::TStringField(const TFieldLink& link)
+ : Link(link)
+ {
+ }
+
+ void TStringField::SetValue(const TString& value) {
+ Link.SetValueSize(value.Empty(), value.Size() + SizeOf(static_cast<ui32>(value.Size())));
+ }
+
+ template class TNumberField<ui64>;
+ template class TNumberField<ui32>;
+ template class TNumberField<i64>;
+ template class TFixedNumberField<ui64>;
+ template class TFixedNumberField<ui32>;
+ template class TFixedNumberField<i64>;
+ template class TRepeatedField<ui64>;
+ template class TRepeatedField<ui32>;
+ template class TRepeatedField<i64>;
+}
diff --git a/library/cpp/unified_agent_client/proto_weighing.h b/library/cpp/unified_agent_client/proto_weighing.h
new file mode 100644
index 0000000000..47cf577e14
--- /dev/null
+++ b/library/cpp/unified_agent_client/proto_weighing.h
@@ -0,0 +1,138 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/f_maybe.h>
+
+#include <util/generic/deque.h>
+#include <util/generic/string.h>
+
+namespace NUnifiedAgent::NPW {
+ class TLengthDelimited;
+
+ class TFieldLink {
+ public:
+ TFieldLink(TLengthDelimited* container, bool repeated = false, size_t keySize = 1);
+
+ void SetValueSize(bool empty, size_t size);
+
+ private:
+ TLengthDelimited* Container;
+ int OuterSize;
+ bool Repeated;
+ size_t KeySize;
+ };
+
+ class TLengthDelimited {
+ public:
+ explicit TLengthDelimited(const TFMaybe<TFieldLink>& link = Nothing());
+
+ void IncSize(int sizeDelta);
+
+ size_t ByteSizeLong() const {
+ return static_cast<size_t>(ByteSize);
+ }
+
+ private:
+ TFMaybe<TFieldLink> Link;
+ int ByteSize;
+ };
+
+ using TMessage = TLengthDelimited;
+
+ template <typename T>
+ class TRepeatedField: public TLengthDelimited {
+ public:
+ static_assert(std::is_same_v<T, ui32> ||
+ std::is_same_v<T, ui64> ||
+ std::is_same_v<T, i64>,
+ "type is not supported");
+
+ using TLengthDelimited::TLengthDelimited;
+
+ void Add(T value);
+ };
+
+ template <typename T>
+ class TRepeatedPtrField {
+ public:
+ explicit TRepeatedPtrField(TMessage* message, size_t keySize = 1)
+ : Message(message)
+ , Children()
+ , KeySize(keySize)
+ {
+ }
+
+ size_t GetSize() const {
+ return Children.size();
+ }
+
+ T& Get(size_t index) {
+ return Children[index];
+ }
+
+ T& Add() {
+ if constexpr (std::is_constructible<T, TFieldLink>::value) {
+ Children.emplace_back(TFieldLink(Message, true, KeySize));
+ } else {
+ Children.emplace_back(Message);
+ }
+ return Children.back();
+ }
+
+ private:
+ TMessage* Message;
+ TDeque<T> Children;
+ size_t KeySize;
+ };
+
+ template <typename T>
+ class TNumberField {
+ public:
+ static_assert(std::is_same_v<T, ui32> ||
+ std::is_same_v<T, ui64> ||
+ std::is_same_v<T, i64>,
+ "type is not supported");
+
+ explicit TNumberField(const TFieldLink& link);
+
+ void SetValue(T value);
+
+ private:
+ TFieldLink Link;
+ };
+
+ template <typename T>
+ class TFixedNumberField {
+ public:
+ static_assert(std::is_same_v<T, ui32> ||
+ std::is_same_v<T, ui64> ||
+ std::is_same_v<T, i64>,
+ "type is not supported");
+
+ explicit TFixedNumberField(const TFieldLink& link);
+
+ void SetValue();
+
+ private:
+ TFieldLink Link;
+ };
+
+ class TStringField {
+ public:
+ explicit TStringField(const TFieldLink& link);
+
+ void SetValue(const TString& value);
+
+ private:
+ TFieldLink Link;
+ };
+
+ extern template class TNumberField<ui64>;
+ extern template class TNumberField<ui32>;
+ extern template class TNumberField<i64>;
+ extern template class TFixedNumberField<ui64>;
+ extern template class TFixedNumberField<ui32>;
+ extern template class TFixedNumberField<i64>;
+ extern template class TRepeatedField<ui64>;
+ extern template class TRepeatedField<ui32>;
+ extern template class TRepeatedField<i64>;
+}
diff --git a/library/cpp/unified_agent_client/registrar.cpp b/library/cpp/unified_agent_client/registrar.cpp
new file mode 100644
index 0000000000..41f6eb34ca
--- /dev/null
+++ b/library/cpp/unified_agent_client/registrar.cpp
@@ -0,0 +1,8 @@
+#include "backend_creator.h"
+
+namespace NUnifiedAgent {
+
+ ILogBackendCreator::TFactory::TRegistrator<NUnifiedAgent::TLogBackendCreator> TLogBackendCreator::Registrar("unified_agent");
+
+}
+
diff --git a/library/cpp/unified_agent_client/throttling.cpp b/library/cpp/unified_agent_client/throttling.cpp
new file mode 100644
index 0000000000..271f7b0e7e
--- /dev/null
+++ b/library/cpp/unified_agent_client/throttling.cpp
@@ -0,0 +1,67 @@
+#include "throttling.h"
+
+#include <util/datetime/cputimer.h>
+
+namespace NUnifiedAgent {
+ TThrottler::TThrottler(double rate, TDuration updatePeriod)
+ : CyclesPerMillisecond(GetCyclesPerMillisecond())
+ , UpdatePeriod(updatePeriod.MilliSeconds() * CyclesPerMillisecond)
+ , PeriodTokens(updatePeriod.SecondsFloat() * rate)
+ , AvailableTokens(0)
+ , ExpirationTime(0)
+ {
+ }
+
+ TThrottler::TThrottler(double rate, double burst)
+ : TThrottler(rate, TDuration::Seconds(burst / rate))
+ {
+ }
+
+ void TThrottler::Consume(double& tokens, TFMaybe<TDuration>& nextCheckDelay) {
+ const auto updateTime = UpdateTokens();
+
+ if (tokens <= AvailableTokens) {
+ AvailableTokens -= tokens;
+ tokens = 0.0;
+ nextCheckDelay = Nothing();
+ } else {
+ tokens -= AvailableTokens;
+ AvailableTokens = 0.0;
+ nextCheckDelay = TDuration::MicroSeconds((ExpirationTime - updateTime) * 1000 / CyclesPerMillisecond + 1);
+ }
+ }
+
+ bool TThrottler::TryConsume(double tokens) {
+ UpdateTokens();
+
+ if (tokens > AvailableTokens) {
+ return false;
+ }
+ AvailableTokens -= tokens;
+ return true;
+ }
+
+ void TThrottler::ConsumeAndWait(double tokens) {
+ TFMaybe<TDuration> nextCheckDelay;
+ while (true) {
+ Consume(tokens, nextCheckDelay);
+ if (!nextCheckDelay.Defined()) {
+ return;
+ }
+ Sleep(*nextCheckDelay);
+ }
+ }
+
+ ui64 TThrottler::UpdateTokens() {
+ const auto updateTime = GetCycleCount();
+ if (updateTime >= ExpirationTime) {
+ if (ExpirationTime == 0) {
+ ExpirationTime = updateTime + UpdatePeriod;
+ } else {
+ ExpirationTime += ((updateTime - ExpirationTime) / UpdatePeriod + 1) * UpdatePeriod;
+ }
+ AvailableTokens = PeriodTokens;
+ }
+ return updateTime;
+ }
+}
diff --git a/library/cpp/unified_agent_client/throttling.h b/library/cpp/unified_agent_client/throttling.h
new file mode 100644
index 0000000000..1e5db1e8fa
--- /dev/null
+++ b/library/cpp/unified_agent_client/throttling.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <library/cpp/unified_agent_client/f_maybe.h>
+
+#include <util/datetime/base.h>
+
+namespace NUnifiedAgent {
+ class TThrottler {
+ public:
+ explicit TThrottler(double rate, TDuration updatePeriod = TDuration::MilliSeconds(100));
+
+ TThrottler(double rate, double burst);
+
+ void Consume(double& tokens, TFMaybe<TDuration>& nextCheckDelay);
+
+ bool TryConsume(double tokens);
+
+ void ConsumeAndWait(double tokens);
+
+ private:
+ ui64 UpdateTokens();
+
+ private:
+ ui64 CyclesPerMillisecond;
+ ui64 UpdatePeriod;
+ double PeriodTokens;
+ double AvailableTokens;
+ ui64 ExpirationTime;
+ };
+}
diff --git a/library/cpp/unified_agent_client/variant.h b/library/cpp/unified_agent_client/variant.h
new file mode 100644
index 0000000000..e261aa9af3
--- /dev/null
+++ b/library/cpp/unified_agent_client/variant.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <variant>
+
+namespace NUnifiedAgent {
+ template<class... Ts> struct TOverloaded : Ts... { using Ts::operator()...; };
+ template<class... Ts> TOverloaded(Ts...) -> TOverloaded<Ts...>;
+
+ template <class T, class... U>
+ auto Visit(T&& variant, U&&... visitorOverloads) {
+ return std::visit(TOverloaded{std::forward<U>(visitorOverloads)...}, std::forward<T>(variant));
+ }
+
+ template <typename TTarget, typename... TSourceTypes>
+ auto CastTo(std::variant<TSourceTypes...>&& variant) {
+ return Visit(variant, [](auto& p) -> TTarget { return std::move(p); });
+ }
+}