diff options
author | alexvru <alexvru@ydb.tech> | 2022-07-29 18:58:34 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-07-29 18:58:34 +0300 |
commit | 002a9e8a0143ca3bca5d1614bc8cd3f5ac300935 (patch) | |
tree | 2b5776bc00d656572dd9e824e76c9c9276ec3f48 /library/cpp | |
parent | 9e1df78041fdd7052eedd4904110ddaee37b8510 (diff) | |
download | ydb-002a9e8a0143ca3bca5d1614bc8cd3f5ac300935.tar.gz |
Throttle Wilson uploader
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/wilson/wilson_uploader.cpp | 86 |
1 files changed, 56 insertions, 30 deletions
diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp index 6b4ef92b812..57e7dceee95 100644 --- a/library/cpp/actors/wilson/wilson_uploader.cpp +++ b/library/cpp/actors/wilson/wilson_uploader.cpp @@ -14,6 +14,7 @@ namespace NWilson { using namespace NActors; namespace NServiceProto = opentelemetry::proto::collector::trace::v1; + namespace NTraceProto = opentelemetry::proto::trace::v1; namespace { @@ -30,10 +31,15 @@ namespace NWilson { std::unique_ptr<grpc::ClientContext> Context; std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader; - NServiceProto::ExportTraceServiceRequest Request; NServiceProto::ExportTraceServiceResponse Response; grpc::Status Status; + std::deque<NTraceProto::Span> Spans; + ui64 SpansSize = 0; + TMonotonic NextSendTimestamp; + ui32 MaxSpansAtOnce = 25; + ui32 MaxSpansPerSecond = 10; + bool WakeupScheduled = false; public: @@ -55,36 +61,51 @@ namespace NWilson { })); Stub = NServiceProto::TraceService::NewStub(Channel); - LOG_INFO_S(*TlsActivationContext, 430, "TWilsonUploader::Bootstrap"); + LOG_INFO_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "TWilsonUploader::Bootstrap"); } void Handle(TEvWilson::TPtr ev) { - CheckIfDone(); + if (SpansSize >= 100'000'000) { + LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "dropped span due to overflow"); + } else { + Spans.push_back(std::move(ev->Get()->Span)); + SpansSize += Spans.back().ByteSizeLong(); + CheckIfDone(); + TryToSend(); + } + } - auto *rspan = Request.resource_spans_size() ? Request.mutable_resource_spans(0) : Request.add_resource_spans(); - auto *sspan = rspan->scope_spans_size() ? rspan->mutable_scope_spans(0) : rspan->add_scope_spans(); - ev->Get()->Span.Swap(sspan->add_spans()); + void TryToSend() { + const TMonotonic now = TActivationContext::Monotonic(); - if (!Context) { - SendRequest(); + if (Context || Spans.empty()) { + return; + } else if (now < NextSendTimestamp) { + ScheduleWakeup(NextSendTimestamp); + return; } - } - void SendRequest() { - Y_VERIFY(!Reader && !Context); - Context = std::make_unique<grpc::ClientContext>(); - for (const auto& rs : Request.resource_spans()) { - for (const auto& ss : rs.scope_spans()) { - for (const auto& s : ss.spans()) { - LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span" - << " TraceId# " << HexEncode(s.trace_id()) - << " SpanId# " << HexEncode(s.span_id()) - << " ParentSpanId# " << HexEncode(s.parent_span_id()) - << " Name# " << s.name()); - } - } + NServiceProto::ExportTraceServiceRequest request; + auto *rspan = request.add_resource_spans(); + auto *sspan = rspan->add_scope_spans(); + + NextSendTimestamp = now; + for (ui32 i = 0; i < MaxSpansAtOnce && !Spans.empty(); ++i, Spans.pop_front()) { + auto& s = Spans.front(); + + LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span" + << " TraceId# " << HexEncode(s.trace_id()) + << " SpanId# " << HexEncode(s.span_id()) + << " ParentSpanId# " << HexEncode(s.parent_span_id()) + << " Name# " << s.name()); + + SpansSize -= s.ByteSizeLong(); + s.Swap(sspan->add_spans()); + NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond); } - Reader = Stub->AsyncExport(Context.get(), std::exchange(Request, {}), &CQ); + + Context = std::make_unique<grpc::ClientContext>(); + Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ); Reader->Finish(&Response, &Status, nullptr); } @@ -100,21 +121,26 @@ namespace NWilson { Reader.reset(); Context.reset(); - - if (Request.resource_spans_size()) { - SendRequest(); - } - } else if (!WakeupScheduled) { - WakeupScheduled = true; - Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); + } else { + ScheduleWakeup(TDuration::MilliSeconds(100)); } } } + template<typename T> + void ScheduleWakeup(T&& deadline) { + if (!WakeupScheduled) { + TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, + nullptr, 0)); + WakeupScheduled = true; + } + } + void HandleWakeup() { Y_VERIFY(WakeupScheduled); WakeupScheduled = false; CheckIfDone(); + TryToSend(); } STRICT_STFUNC(StateFunc, |