diff options
author | alexvru <alexvru@ydb.tech> | 2022-10-12 10:29:41 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-10-12 10:29:41 +0300 |
commit | 8a5b941d775b0f8ed7ee4b903478e69de619683c (patch) | |
tree | 7f26fb57a71430d89e59e65d5c0fa7d9bcf2fcfd /library/cpp | |
parent | c9f4954181160141a2f86785405560804ce69345 (diff) | |
download | ydb-8a5b941d775b0f8ed7ee4b903478e69de619683c.tar.gz |
Fix wilson uploader event handler
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/wilson/wilson_uploader.cpp | 38 |
1 files changed, 33 insertions, 5 deletions
diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp index e17f2024642..e6db248b972 100644 --- a/library/cpp/actors/wilson/wilson_uploader.cpp +++ b/library/cpp/actors/wilson/wilson_uploader.cpp @@ -34,11 +34,18 @@ namespace NWilson { NServiceProto::ExportTraceServiceResponse Response; grpc::Status Status; - std::deque<NTraceProto::Span> Spans; + struct TSpanQueueItem { + TMonotonic ExpirationTimestamp; + NTraceProto::Span Span; + ui32 Size; + }; + + std::deque<TSpanQueueItem> Spans; ui64 SpansSize = 0; TMonotonic NextSendTimestamp; ui32 MaxSpansAtOnce = 25; ui32 MaxSpansPerSecond = 10; + TDuration MaxSpanTimeInQueue = TDuration::Seconds(60); bool WakeupScheduled = false; @@ -70,8 +77,11 @@ namespace NWilson { if (SpansSize >= 100'000'000) { LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "dropped span due to overflow"); } else { - Spans.push_back(std::move(ev->Get()->Span)); - SpansSize += Spans.back().ByteSizeLong(); + const TMonotonic expirationTimestamp = TActivationContext::Monotonic() + MaxSpanTimeInQueue; + auto& span = ev->Get()->Span; + const ui32 size = span.ByteSizeLong(); + Spans.push_back(TSpanQueueItem{expirationTimestamp, std::move(span), size}); + SpansSize += size; CheckIfDone(); TryToSend(); } @@ -80,6 +90,23 @@ namespace NWilson { void TryToSend() { const TMonotonic now = TActivationContext::Monotonic(); + ui32 numSpansDropped = 0; + while (!Spans.empty()) { + const TSpanQueueItem& item = Spans.front(); + if (item.ExpirationTimestamp <= now) { + SpansSize -= item.Size; + Spans.pop_front(); + ++numSpansDropped; + } else { + break; + } + } + + if (numSpansDropped) { + LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, + "dropped " << numSpansDropped << " span(s) due to expiration"); + } + if (Context || Spans.empty()) { return; } else if (now < NextSendTimestamp) { @@ -93,7 +120,8 @@ namespace NWilson { NextSendTimestamp = now; for (ui32 i = 0; i < MaxSpansAtOnce && !Spans.empty(); ++i, Spans.pop_front()) { - auto& s = Spans.front(); + auto& item = Spans.front(); + auto& s = item.Span; LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span" << " TraceId# " << HexEncode(s.trace_id()) @@ -101,7 +129,7 @@ namespace NWilson { << " ParentSpanId# " << HexEncode(s.parent_span_id()) << " Name# " << s.name()); - SpansSize -= s.ByteSizeLong(); + SpansSize -= item.Size; s.Swap(sspan->add_spans()); NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond); } |