summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg Shatov <[email protected]>2024-02-16 14:59:02 +0100
committerGitHub <[email protected]>2024-02-16 14:59:02 +0100
commit126ee0c9e96f0682e1c0ae9aa381b6d535a6c79a (patch)
tree21dad944e40f56cdba4accaa8472c8a2818a74e4
parent75f69cfa118af2d3c6034bd00ac0d59783ae5124 (diff)
Implemented batching in wilson uploader (#1955)
-rw-r--r--ydb/library/actors/wilson/wilson_uploader.cpp217
1 files changed, 167 insertions, 50 deletions
diff --git a/ydb/library/actors/wilson/wilson_uploader.cpp b/ydb/library/actors/wilson/wilson_uploader.cpp
index 798964f9d00..134d22616c4 100644
--- a/ydb/library/actors/wilson/wilson_uploader.cpp
+++ b/ydb/library/actors/wilson/wilson_uploader.cpp
@@ -1,4 +1,5 @@
#include "wilson_uploader.h"
+
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
@@ -7,7 +8,9 @@
#include <library/cpp/string_utils/url/url.h>
#include <util/stream/file.h>
#include <util/string/hex.h>
+
#include <chrono>
+#include <queue>
namespace NWilson {
@@ -18,11 +21,83 @@ namespace NWilson {
namespace {
+ struct TSpan {
+ TMonotonic ExpirationTimestamp;
+ NTraceProto::Span Span;
+ size_t Size;
+ };
+
+ class TBatch {
+ private:
+ ui64 MaxSpansInBatch;
+ ui64 MaxBytesInBatch;
+
+ NServiceProto::ExportTraceServiceRequest Request;
+ NTraceProto::ScopeSpans* ScopeSpans;
+ ui64 SizeBytes = 0;
+ TMonotonic ExpirationTimestamp = TMonotonic::Zero();
+
+ public:
+ struct TData {
+ NServiceProto::ExportTraceServiceRequest Request;
+ ui64 SizeBytes;
+ ui64 SizeSpans;
+ TMonotonic ExpirationTimestamp;
+ };
+
+ TBatch(ui64 maxSpansInBatch, ui64 maxBytesInBatch, TString serviceName)
+ : MaxSpansInBatch(maxSpansInBatch)
+ , MaxBytesInBatch(maxBytesInBatch)
+ {
+ auto *rspan = Request.add_resource_spans();
+ auto *serviceNameAttr = rspan->mutable_resource()->add_attributes();
+ serviceNameAttr->set_key("service.name");
+ serviceNameAttr->mutable_value()->set_string_value(std::move(serviceName));
+ ScopeSpans = rspan->add_scope_spans();
+ }
+
+ size_t SizeSpans() const {
+ return ScopeSpans->spansSize();
+ }
+
+ bool IsEmpty() const {
+ return SizeSpans() == 0;
+ }
+
+ bool Add(TSpan& span) {
+ if (SizeBytes + span.Size > MaxBytesInBatch || SizeSpans() == MaxSpansInBatch) {
+ return false;
+ }
+ SizeBytes += span.Size;
+ span.Span.Swap(ScopeSpans->add_spans());
+ ExpirationTimestamp = span.ExpirationTimestamp;
+ return true;
+ }
+
+ TData Complete() && {
+ return TData {
+ .Request = std::move(Request),
+ .SizeBytes = SizeBytes,
+ .SizeSpans = SizeSpans(),
+ .ExpirationTimestamp = ExpirationTimestamp,
+ };
+ }
+ };
+
class TWilsonUploader
: public TActorBootstrapped<TWilsonUploader>
{
static constexpr size_t WILSON_SERVICE_ID = 430;
+ ui64 MaxPendingSpanBytes = 100'000'000;
+ ui64 MaxSpansInBatch = 150;
+ ui64 MaxBytesInBatch = 20'000'000;
+ TDuration MaxBatchAccumulation = TDuration::Seconds(1);
+ ui32 MaxSpansPerSecond = 10;
+ TDuration MaxSpanTimeInQueue = TDuration::Seconds(60);
+
+ bool WakeupScheduled = false;
+
TString CollectorUrl;
TString ServiceName;
@@ -36,26 +111,20 @@ namespace NWilson {
NServiceProto::ExportTraceServiceResponse Response;
grpc::Status Status;
- struct TSpanQueueItem {
- TMonotonic ExpirationTimestamp;
- NTraceProto::Span Span;
- ui32 Size;
- };
-
- std::deque<TSpanQueueItem> Spans;
- ui64 SpansSize = 0;
+ TBatch CurrentBatch;
+ std::queue<TBatch::TData> BatchQueue;
+ ui64 SpansSizeBytes = 0;
TMonotonic NextSendTimestamp;
- ui32 MaxSpansAtOnce = 25;
- ui32 MaxSpansPerSecond = 10;
- TDuration MaxSpanTimeInQueue = TDuration::Seconds(60);
- bool WakeupScheduled = false;
+ bool BatchCompletionScheduled = false;
+ TMonotonic NextBatchCompletion;
public:
TWilsonUploader(WilsonUploaderParams params)
: CollectorUrl(std::move(params.CollectorUrl))
, ServiceName(std::move(params.ServiceName))
, GrpcSigner(std::move(params.GrpcSigner))
+ , CurrentBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName)
{}
~TWilsonUploader() {
@@ -87,28 +156,69 @@ namespace NWilson {
}
void Handle(TEvWilson::TPtr ev) {
- if (SpansSize >= 100'000'000) {
+ if (SpansSizeBytes >= MaxPendingSpanBytes) {
LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID, "dropped span due to overflow");
} else {
- const TMonotonic expirationTimestamp = TActivationContext::Monotonic() + MaxSpanTimeInQueue;
+ const TMonotonic now = TActivationContext::Monotonic();
+ const TMonotonic expirationTimestamp = now + MaxSpanTimeInQueue;
auto& span = ev->Get()->Span;
const ui32 size = span.ByteSizeLong();
- Spans.push_back(TSpanQueueItem{expirationTimestamp, std::move(span), size});
- SpansSize += size;
+ if (size > MaxBytesInBatch) {
+ ALOG_ERROR(WILSON_SERVICE_ID, "dropped span of size " << size << ", which exceeds max batch size " << MaxBytesInBatch);
+ return;
+ }
+ TSpan spanItem {
+ .ExpirationTimestamp = expirationTimestamp,
+ .Span = std::move(span),
+ .Size = size,
+ };
+ SpansSizeBytes += size;
+ if (CurrentBatch.IsEmpty()) {
+ ScheduleBatchCompletion(now);
+ }
+ if (CurrentBatch.Add(spanItem)) {
+ return;
+ }
+ CompleteCurrentBatch();
TryMakeProgress();
+ Y_ABORT_UNLESS(CurrentBatch.Add(spanItem), "failed to add span to empty batch");
+ ScheduleBatchCompletion(now);
+ }
+ }
+
+ void ScheduleBatchCompletionEvent() {
+ Y_ABORT_UNLESS(!BatchCompletionScheduled);
+ auto cookie = NextBatchCompletion.GetValue();
+ TActivationContext::Schedule(NextBatchCompletion, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, cookie));
+ ALOG_TRACE(WILSON_SERVICE_ID, "scheduling batch completion w/ cookie=" << cookie);
+ BatchCompletionScheduled = true;
+ }
+
+ void ScheduleBatchCompletion(TMonotonic now) {
+ NextBatchCompletion = now + MaxBatchAccumulation;
+ if (!BatchCompletionScheduled) {
+ ScheduleBatchCompletionEvent();
}
}
+ void CompleteCurrentBatch() {
+ if (CurrentBatch.IsEmpty()) {
+ return;
+ }
+ BatchQueue.push(std::move(CurrentBatch).Complete());
+ CurrentBatch = TBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName);
+ }
+
void TryToSend() {
const TMonotonic now = TActivationContext::Monotonic();
ui32 numSpansDropped = 0;
- while (!Spans.empty()) {
- const TSpanQueueItem& item = Spans.front();
+ while (!BatchQueue.empty()) {
+ const TBatch::TData& item = BatchQueue.front();
if (item.ExpirationTimestamp <= now) {
- SpansSize -= item.Size;
- Spans.pop_front();
- ++numSpansDropped;
+ SpansSizeBytes -= item.SizeBytes;
+ numSpansDropped += item.SizeSpans;
+ BatchQueue.pop();
} else {
break;
}
@@ -119,42 +229,36 @@ namespace NWilson {
"dropped " << numSpansDropped << " span(s) due to expiration");
}
- if (Context || Spans.empty()) {
+ if (Context || BatchQueue.empty()) {
return;
} else if (now < NextSendTimestamp) {
ScheduleWakeup(NextSendTimestamp);
return;
}
- NServiceProto::ExportTraceServiceRequest request;
- auto *rspan = request.add_resource_spans();
- auto *serviceNameAttr = rspan->mutable_resource()->add_attributes();
- serviceNameAttr->set_key("service.name");
- serviceNameAttr->mutable_value()->set_string_value(ServiceName);
- auto *sspan = rspan->add_scope_spans();
-
- NextSendTimestamp = now;
- for (ui32 i = 0; i < MaxSpansAtOnce && !Spans.empty(); ++i, Spans.pop_front()) {
- auto& item = Spans.front();
- auto& s = item.Span;
-
- LOG_DEBUG_S(*TlsActivationContext, WILSON_SERVICE_ID, "exporting span"
- << " TraceId# " << HexEncode(s.trace_id())
- << " SpanId# " << HexEncode(s.span_id())
- << " ParentSpanId# " << HexEncode(s.parent_span_id())
- << " Name# " << s.name());
-
- SpansSize -= item.Size;
- s.Swap(sspan->add_spans());
- NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond);
+
+ TBatch::TData batch = std::move(BatchQueue.front());
+ BatchQueue.pop();
+
+ ALOG_DEBUG(WILSON_SERVICE_ID, "exporting batch of " << batch.SizeSpans << " spans, total spans size: " << batch.SizeBytes);
+ Y_ABORT_UNLESS(batch.Request.resource_spansSize() == 1 && batch.Request.resource_spans(0).scope_spansSize() == 1);
+ for (const auto& span : batch.Request.resource_spans(0).scope_spans(0).spans()) {
+ ALOG_DEBUG(WILSON_SERVICE_ID, "exporting span"
+ << " TraceId# " << HexEncode(span.trace_id())
+ << " SpanId# " << HexEncode(span.span_id())
+ << " ParentSpanId# " << HexEncode(span.parent_span_id())
+ << " Name# " << span.name());
}
+ NextSendTimestamp = now + TDuration::MicroSeconds((batch.SizeSpans * 1'000'000) / MaxSpansPerSecond);
+ SpansSizeBytes -= batch.SizeBytes;
+
ScheduleWakeup(NextSendTimestamp);
Context = std::make_unique<grpc::ClientContext>();
if (GrpcSigner) {
GrpcSigner->SignClientContext(*Context);
}
- Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ);
+ Reader = Stub->AsyncExport(Context.get(), std::move(batch.Request), &CQ);
Reader->Finish(&Response, &Status, nullptr);
}
@@ -179,15 +283,28 @@ namespace NWilson {
template<typename T>
void ScheduleWakeup(T&& deadline) {
if (!WakeupScheduled) {
- TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {},
- nullptr, 0));
+ TActivationContext::Schedule(deadline,
+ new IEventHandle(TEvents::TSystem::Wakeup, 0,
+ SelfId(), {}, nullptr, 0));
WakeupScheduled = true;
}
}
- void HandleWakeup() {
- Y_ABORT_UNLESS(WakeupScheduled);
- WakeupScheduled = false;
+ void HandleWakeup(TEvents::TEvWakeup::TPtr& ev) {
+ const auto cookie = ev->Cookie;
+ ALOG_TRACE(WILSON_SERVICE_ID, "wakeup received w/ cookie=" << cookie);
+ if (cookie == 0) {
+ Y_ABORT_UNLESS(WakeupScheduled);
+ WakeupScheduled = false;
+ } else {
+ Y_ABORT_UNLESS(BatchCompletionScheduled);
+ BatchCompletionScheduled = false;
+ if (cookie == NextBatchCompletion.GetValue()) {
+ CompleteCurrentBatch();
+ } else {
+ ScheduleBatchCompletionEvent();
+ }
+ }
TryMakeProgress();
}
@@ -198,7 +315,7 @@ namespace NWilson {
STRICT_STFUNC(StateWork,
hFunc(TEvWilson, Handle);
- cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
+ hFunc(TEvents::TEvWakeup, HandleWakeup);
);
STRICT_STFUNC(StateBroken,