diff options
| author | Oleg Shatov <[email protected]> | 2024-02-16 14:59:02 +0100 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-02-16 14:59:02 +0100 |
| commit | 126ee0c9e96f0682e1c0ae9aa381b6d535a6c79a (patch) | |
| tree | 21dad944e40f56cdba4accaa8472c8a2818a74e4 | |
| parent | 75f69cfa118af2d3c6034bd00ac0d59783ae5124 (diff) | |
Implemented batching in wilson uploader (#1955)
| -rw-r--r-- | ydb/library/actors/wilson/wilson_uploader.cpp | 217 |
1 files changed, 167 insertions, 50 deletions
diff --git a/ydb/library/actors/wilson/wilson_uploader.cpp b/ydb/library/actors/wilson/wilson_uploader.cpp index 798964f9d00..134d22616c4 100644 --- a/ydb/library/actors/wilson/wilson_uploader.cpp +++ b/ydb/library/actors/wilson/wilson_uploader.cpp @@ -1,4 +1,5 @@ #include "wilson_uploader.h" + #include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/log.h> @@ -7,7 +8,9 @@ #include <library/cpp/string_utils/url/url.h> #include <util/stream/file.h> #include <util/string/hex.h> + #include <chrono> +#include <queue> namespace NWilson { @@ -18,11 +21,83 @@ namespace NWilson { namespace { + struct TSpan { + TMonotonic ExpirationTimestamp; + NTraceProto::Span Span; + size_t Size; + }; + + class TBatch { + private: + ui64 MaxSpansInBatch; + ui64 MaxBytesInBatch; + + NServiceProto::ExportTraceServiceRequest Request; + NTraceProto::ScopeSpans* ScopeSpans; + ui64 SizeBytes = 0; + TMonotonic ExpirationTimestamp = TMonotonic::Zero(); + + public: + struct TData { + NServiceProto::ExportTraceServiceRequest Request; + ui64 SizeBytes; + ui64 SizeSpans; + TMonotonic ExpirationTimestamp; + }; + + TBatch(ui64 maxSpansInBatch, ui64 maxBytesInBatch, TString serviceName) + : MaxSpansInBatch(maxSpansInBatch) + , MaxBytesInBatch(maxBytesInBatch) + { + auto *rspan = Request.add_resource_spans(); + auto *serviceNameAttr = rspan->mutable_resource()->add_attributes(); + serviceNameAttr->set_key("service.name"); + serviceNameAttr->mutable_value()->set_string_value(std::move(serviceName)); + ScopeSpans = rspan->add_scope_spans(); + } + + size_t SizeSpans() const { + return ScopeSpans->spansSize(); + } + + bool IsEmpty() const { + return SizeSpans() == 0; + } + + bool Add(TSpan& span) { + if (SizeBytes + span.Size > MaxBytesInBatch || SizeSpans() == MaxSpansInBatch) { + return false; + } + SizeBytes += span.Size; + span.Span.Swap(ScopeSpans->add_spans()); + ExpirationTimestamp = span.ExpirationTimestamp; + return true; + } + + TData Complete() && { + return TData { + .Request = std::move(Request), + .SizeBytes = SizeBytes, + .SizeSpans = SizeSpans(), + .ExpirationTimestamp = ExpirationTimestamp, + }; + } + }; + class TWilsonUploader : public TActorBootstrapped<TWilsonUploader> { static constexpr size_t WILSON_SERVICE_ID = 430; + ui64 MaxPendingSpanBytes = 100'000'000; + ui64 MaxSpansInBatch = 150; + ui64 MaxBytesInBatch = 20'000'000; + TDuration MaxBatchAccumulation = TDuration::Seconds(1); + ui32 MaxSpansPerSecond = 10; + TDuration MaxSpanTimeInQueue = TDuration::Seconds(60); + + bool WakeupScheduled = false; + TString CollectorUrl; TString ServiceName; @@ -36,26 +111,20 @@ namespace NWilson { NServiceProto::ExportTraceServiceResponse Response; grpc::Status Status; - struct TSpanQueueItem { - TMonotonic ExpirationTimestamp; - NTraceProto::Span Span; - ui32 Size; - }; - - std::deque<TSpanQueueItem> Spans; - ui64 SpansSize = 0; + TBatch CurrentBatch; + std::queue<TBatch::TData> BatchQueue; + ui64 SpansSizeBytes = 0; TMonotonic NextSendTimestamp; - ui32 MaxSpansAtOnce = 25; - ui32 MaxSpansPerSecond = 10; - TDuration MaxSpanTimeInQueue = TDuration::Seconds(60); - bool WakeupScheduled = false; + bool BatchCompletionScheduled = false; + TMonotonic NextBatchCompletion; public: TWilsonUploader(WilsonUploaderParams params) : CollectorUrl(std::move(params.CollectorUrl)) , ServiceName(std::move(params.ServiceName)) , GrpcSigner(std::move(params.GrpcSigner)) + , CurrentBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName) {} ~TWilsonUploader() { @@ -87,28 +156,69 @@ namespace NWilson { } void Handle(TEvWilson::TPtr ev) { - if (SpansSize >= 100'000'000) { + if (SpansSizeBytes >= MaxPendingSpanBytes) { LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID, "dropped span due to overflow"); } else { - const TMonotonic expirationTimestamp = TActivationContext::Monotonic() + MaxSpanTimeInQueue; + const TMonotonic now = TActivationContext::Monotonic(); + const TMonotonic expirationTimestamp = now + MaxSpanTimeInQueue; auto& span = ev->Get()->Span; const ui32 size = span.ByteSizeLong(); - Spans.push_back(TSpanQueueItem{expirationTimestamp, std::move(span), size}); - SpansSize += size; + if (size > MaxBytesInBatch) { + ALOG_ERROR(WILSON_SERVICE_ID, "dropped span of size " << size << ", which exceeds max batch size " << MaxBytesInBatch); + return; + } + TSpan spanItem { + .ExpirationTimestamp = expirationTimestamp, + .Span = std::move(span), + .Size = size, + }; + SpansSizeBytes += size; + if (CurrentBatch.IsEmpty()) { + ScheduleBatchCompletion(now); + } + if (CurrentBatch.Add(spanItem)) { + return; + } + CompleteCurrentBatch(); TryMakeProgress(); + Y_ABORT_UNLESS(CurrentBatch.Add(spanItem), "failed to add span to empty batch"); + ScheduleBatchCompletion(now); + } + } + + void ScheduleBatchCompletionEvent() { + Y_ABORT_UNLESS(!BatchCompletionScheduled); + auto cookie = NextBatchCompletion.GetValue(); + TActivationContext::Schedule(NextBatchCompletion, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, cookie)); + ALOG_TRACE(WILSON_SERVICE_ID, "scheduling batch completion w/ cookie=" << cookie); + BatchCompletionScheduled = true; + } + + void ScheduleBatchCompletion(TMonotonic now) { + NextBatchCompletion = now + MaxBatchAccumulation; + if (!BatchCompletionScheduled) { + ScheduleBatchCompletionEvent(); } } + void CompleteCurrentBatch() { + if (CurrentBatch.IsEmpty()) { + return; + } + BatchQueue.push(std::move(CurrentBatch).Complete()); + CurrentBatch = TBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName); + } + void TryToSend() { const TMonotonic now = TActivationContext::Monotonic(); ui32 numSpansDropped = 0; - while (!Spans.empty()) { - const TSpanQueueItem& item = Spans.front(); + while (!BatchQueue.empty()) { + const TBatch::TData& item = BatchQueue.front(); if (item.ExpirationTimestamp <= now) { - SpansSize -= item.Size; - Spans.pop_front(); - ++numSpansDropped; + SpansSizeBytes -= item.SizeBytes; + numSpansDropped += item.SizeSpans; + BatchQueue.pop(); } else { break; } @@ -119,42 +229,36 @@ namespace NWilson { "dropped " << numSpansDropped << " span(s) due to expiration"); } - if (Context || Spans.empty()) { + if (Context || BatchQueue.empty()) { return; } else if (now < NextSendTimestamp) { ScheduleWakeup(NextSendTimestamp); return; } - NServiceProto::ExportTraceServiceRequest request; - auto *rspan = request.add_resource_spans(); - auto *serviceNameAttr = rspan->mutable_resource()->add_attributes(); - serviceNameAttr->set_key("service.name"); - serviceNameAttr->mutable_value()->set_string_value(ServiceName); - auto *sspan = rspan->add_scope_spans(); - - NextSendTimestamp = now; - for (ui32 i = 0; i < MaxSpansAtOnce && !Spans.empty(); ++i, Spans.pop_front()) { - auto& item = Spans.front(); - auto& s = item.Span; - - LOG_DEBUG_S(*TlsActivationContext, WILSON_SERVICE_ID, "exporting span" - << " TraceId# " << HexEncode(s.trace_id()) - << " SpanId# " << HexEncode(s.span_id()) - << " ParentSpanId# " << HexEncode(s.parent_span_id()) - << " Name# " << s.name()); - - SpansSize -= item.Size; - s.Swap(sspan->add_spans()); - NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond); + + TBatch::TData batch = std::move(BatchQueue.front()); + BatchQueue.pop(); + + ALOG_DEBUG(WILSON_SERVICE_ID, "exporting batch of " << batch.SizeSpans << " spans, total spans size: " << batch.SizeBytes); + Y_ABORT_UNLESS(batch.Request.resource_spansSize() == 1 && batch.Request.resource_spans(0).scope_spansSize() == 1); + for (const auto& span : batch.Request.resource_spans(0).scope_spans(0).spans()) { + ALOG_DEBUG(WILSON_SERVICE_ID, "exporting span" + << " TraceId# " << HexEncode(span.trace_id()) + << " SpanId# " << HexEncode(span.span_id()) + << " ParentSpanId# " << HexEncode(span.parent_span_id()) + << " Name# " << span.name()); } + NextSendTimestamp = now + TDuration::MicroSeconds((batch.SizeSpans * 1'000'000) / MaxSpansPerSecond); + SpansSizeBytes -= batch.SizeBytes; + ScheduleWakeup(NextSendTimestamp); Context = std::make_unique<grpc::ClientContext>(); if (GrpcSigner) { GrpcSigner->SignClientContext(*Context); } - Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ); + Reader = Stub->AsyncExport(Context.get(), std::move(batch.Request), &CQ); Reader->Finish(&Response, &Status, nullptr); } @@ -179,15 +283,28 @@ namespace NWilson { template<typename T> void ScheduleWakeup(T&& deadline) { if (!WakeupScheduled) { - TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, - nullptr, 0)); + TActivationContext::Schedule(deadline, + new IEventHandle(TEvents::TSystem::Wakeup, 0, + SelfId(), {}, nullptr, 0)); WakeupScheduled = true; } } - void HandleWakeup() { - Y_ABORT_UNLESS(WakeupScheduled); - WakeupScheduled = false; + void HandleWakeup(TEvents::TEvWakeup::TPtr& ev) { + const auto cookie = ev->Cookie; + ALOG_TRACE(WILSON_SERVICE_ID, "wakeup received w/ cookie=" << cookie); + if (cookie == 0) { + Y_ABORT_UNLESS(WakeupScheduled); + WakeupScheduled = false; + } else { + Y_ABORT_UNLESS(BatchCompletionScheduled); + BatchCompletionScheduled = false; + if (cookie == NextBatchCompletion.GetValue()) { + CompleteCurrentBatch(); + } else { + ScheduleBatchCompletionEvent(); + } + } TryMakeProgress(); } @@ -198,7 +315,7 @@ namespace NWilson { STRICT_STFUNC(StateWork, hFunc(TEvWilson, Handle); - cFunc(TEvents::TSystem::Wakeup, HandleWakeup); + hFunc(TEvents::TEvWakeup, HandleWakeup); ); STRICT_STFUNC(StateBroken, |
