aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-10-12 10:29:41 +0300
committeralexvru <alexvru@ydb.tech>2022-10-12 10:29:41 +0300
commit8a5b941d775b0f8ed7ee4b903478e69de619683c (patch)
tree7f26fb57a71430d89e59e65d5c0fa7d9bcf2fcfd /library/cpp
parentc9f4954181160141a2f86785405560804ce69345 (diff)
downloadydb-8a5b941d775b0f8ed7ee4b903478e69de619683c.tar.gz
Fix wilson uploader event handler
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/wilson/wilson_uploader.cpp38
1 files changed, 33 insertions, 5 deletions
diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp
index e17f2024642..e6db248b972 100644
--- a/library/cpp/actors/wilson/wilson_uploader.cpp
+++ b/library/cpp/actors/wilson/wilson_uploader.cpp
@@ -34,11 +34,18 @@ namespace NWilson {
NServiceProto::ExportTraceServiceResponse Response;
grpc::Status Status;
- std::deque<NTraceProto::Span> Spans;
+ struct TSpanQueueItem {
+ TMonotonic ExpirationTimestamp;
+ NTraceProto::Span Span;
+ ui32 Size;
+ };
+
+ std::deque<TSpanQueueItem> Spans;
ui64 SpansSize = 0;
TMonotonic NextSendTimestamp;
ui32 MaxSpansAtOnce = 25;
ui32 MaxSpansPerSecond = 10;
+ TDuration MaxSpanTimeInQueue = TDuration::Seconds(60);
bool WakeupScheduled = false;
@@ -70,8 +77,11 @@ namespace NWilson {
if (SpansSize >= 100'000'000) {
LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "dropped span due to overflow");
} else {
- Spans.push_back(std::move(ev->Get()->Span));
- SpansSize += Spans.back().ByteSizeLong();
+ const TMonotonic expirationTimestamp = TActivationContext::Monotonic() + MaxSpanTimeInQueue;
+ auto& span = ev->Get()->Span;
+ const ui32 size = span.ByteSizeLong();
+ Spans.push_back(TSpanQueueItem{expirationTimestamp, std::move(span), size});
+ SpansSize += size;
CheckIfDone();
TryToSend();
}
@@ -80,6 +90,23 @@ namespace NWilson {
void TryToSend() {
const TMonotonic now = TActivationContext::Monotonic();
+ ui32 numSpansDropped = 0;
+ while (!Spans.empty()) {
+ const TSpanQueueItem& item = Spans.front();
+ if (item.ExpirationTimestamp <= now) {
+ SpansSize -= item.Size;
+ Spans.pop_front();
+ ++numSpansDropped;
+ } else {
+ break;
+ }
+ }
+
+ if (numSpansDropped) {
+ LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */,
+ "dropped " << numSpansDropped << " span(s) due to expiration");
+ }
+
if (Context || Spans.empty()) {
return;
} else if (now < NextSendTimestamp) {
@@ -93,7 +120,8 @@ namespace NWilson {
NextSendTimestamp = now;
for (ui32 i = 0; i < MaxSpansAtOnce && !Spans.empty(); ++i, Spans.pop_front()) {
- auto& s = Spans.front();
+ auto& item = Spans.front();
+ auto& s = item.Span;
LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span"
<< " TraceId# " << HexEncode(s.trace_id())
@@ -101,7 +129,7 @@ namespace NWilson {
<< " ParentSpanId# " << HexEncode(s.parent_span_id())
<< " Name# " << s.name());
- SpansSize -= s.ByteSizeLong();
+ SpansSize -= item.Size;
s.Swap(sspan->add_spans());
NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond);
}