aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-07-29 18:58:34 +0300
committeralexvru <alexvru@ydb.tech>2022-07-29 18:58:34 +0300
commit002a9e8a0143ca3bca5d1614bc8cd3f5ac300935 (patch)
tree2b5776bc00d656572dd9e824e76c9c9276ec3f48 /library/cpp
parent9e1df78041fdd7052eedd4904110ddaee37b8510 (diff)
downloadydb-002a9e8a0143ca3bca5d1614bc8cd3f5ac300935.tar.gz
Throttle Wilson uploader
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/wilson/wilson_uploader.cpp86
1 files changed, 56 insertions, 30 deletions
diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp
index 6b4ef92b812..57e7dceee95 100644
--- a/library/cpp/actors/wilson/wilson_uploader.cpp
+++ b/library/cpp/actors/wilson/wilson_uploader.cpp
@@ -14,6 +14,7 @@ namespace NWilson {
using namespace NActors;
namespace NServiceProto = opentelemetry::proto::collector::trace::v1;
+ namespace NTraceProto = opentelemetry::proto::trace::v1;
namespace {
@@ -30,10 +31,15 @@ namespace NWilson {
std::unique_ptr<grpc::ClientContext> Context;
std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader;
- NServiceProto::ExportTraceServiceRequest Request;
NServiceProto::ExportTraceServiceResponse Response;
grpc::Status Status;
+ std::deque<NTraceProto::Span> Spans;
+ ui64 SpansSize = 0;
+ TMonotonic NextSendTimestamp;
+ ui32 MaxSpansAtOnce = 25;
+ ui32 MaxSpansPerSecond = 10;
+
bool WakeupScheduled = false;
public:
@@ -55,36 +61,51 @@ namespace NWilson {
}));
Stub = NServiceProto::TraceService::NewStub(Channel);
- LOG_INFO_S(*TlsActivationContext, 430, "TWilsonUploader::Bootstrap");
+ LOG_INFO_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "TWilsonUploader::Bootstrap");
}
void Handle(TEvWilson::TPtr ev) {
- CheckIfDone();
+ if (SpansSize >= 100'000'000) {
+ LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "dropped span due to overflow");
+ } else {
+ Spans.push_back(std::move(ev->Get()->Span));
+ SpansSize += Spans.back().ByteSizeLong();
+ CheckIfDone();
+ TryToSend();
+ }
+ }
- auto *rspan = Request.resource_spans_size() ? Request.mutable_resource_spans(0) : Request.add_resource_spans();
- auto *sspan = rspan->scope_spans_size() ? rspan->mutable_scope_spans(0) : rspan->add_scope_spans();
- ev->Get()->Span.Swap(sspan->add_spans());
+ void TryToSend() {
+ const TMonotonic now = TActivationContext::Monotonic();
- if (!Context) {
- SendRequest();
+ if (Context || Spans.empty()) {
+ return;
+ } else if (now < NextSendTimestamp) {
+ ScheduleWakeup(NextSendTimestamp);
+ return;
}
- }
- void SendRequest() {
- Y_VERIFY(!Reader && !Context);
- Context = std::make_unique<grpc::ClientContext>();
- for (const auto& rs : Request.resource_spans()) {
- for (const auto& ss : rs.scope_spans()) {
- for (const auto& s : ss.spans()) {
- LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span"
- << " TraceId# " << HexEncode(s.trace_id())
- << " SpanId# " << HexEncode(s.span_id())
- << " ParentSpanId# " << HexEncode(s.parent_span_id())
- << " Name# " << s.name());
- }
- }
+ NServiceProto::ExportTraceServiceRequest request;
+ auto *rspan = request.add_resource_spans();
+ auto *sspan = rspan->add_scope_spans();
+
+ NextSendTimestamp = now;
+ for (ui32 i = 0; i < MaxSpansAtOnce && !Spans.empty(); ++i, Spans.pop_front()) {
+ auto& s = Spans.front();
+
+ LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span"
+ << " TraceId# " << HexEncode(s.trace_id())
+ << " SpanId# " << HexEncode(s.span_id())
+ << " ParentSpanId# " << HexEncode(s.parent_span_id())
+ << " Name# " << s.name());
+
+ SpansSize -= s.ByteSizeLong();
+ s.Swap(sspan->add_spans());
+ NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond);
}
- Reader = Stub->AsyncExport(Context.get(), std::exchange(Request, {}), &CQ);
+
+ Context = std::make_unique<grpc::ClientContext>();
+ Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ);
Reader->Finish(&Response, &Status, nullptr);
}
@@ -100,21 +121,26 @@ namespace NWilson {
Reader.reset();
Context.reset();
-
- if (Request.resource_spans_size()) {
- SendRequest();
- }
- } else if (!WakeupScheduled) {
- WakeupScheduled = true;
- Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
+ } else {
+ ScheduleWakeup(TDuration::MilliSeconds(100));
}
}
}
+ template<typename T>
+ void ScheduleWakeup(T&& deadline) {
+ if (!WakeupScheduled) {
+ TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {},
+ nullptr, 0));
+ WakeupScheduled = true;
+ }
+ }
+
void HandleWakeup() {
Y_VERIFY(WakeupScheduled);
WakeupScheduled = false;
CheckIfDone();
+ TryToSend();
}
STRICT_STFUNC(StateFunc,