#include "wilson_uploader.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> #include <library/cpp/actors/wilson/protos/service.pb.h> #include <library/cpp/actors/wilson/protos/service.grpc.pb.h> #include <util/stream/file.h> #include <util/string/hex.h> #include <grpc++/grpc++.h> #include <chrono> namespace NWilson { using namespace NActors; namespace NServiceProto = opentelemetry::proto::collector::trace::v1; namespace NTraceProto = opentelemetry::proto::trace::v1; namespace { class TWilsonUploader : public TActorBootstrapped<TWilsonUploader> { TString Host; ui16 Port; TString RootCA; std::shared_ptr<grpc::Channel> Channel; std::unique_ptr<NServiceProto::TraceService::Stub> Stub; grpc::CompletionQueue CQ; std::unique_ptr<grpc::ClientContext> Context; std::unique_ptr<grpc::ClientAsyncResponseReader<NServiceProto::ExportTraceServiceResponse>> Reader; NServiceProto::ExportTraceServiceResponse Response; grpc::Status Status; struct TSpanQueueItem { TMonotonic ExpirationTimestamp; NTraceProto::Span Span; ui32 Size; }; std::deque<TSpanQueueItem> Spans; ui64 SpansSize = 0; TMonotonic NextSendTimestamp; ui32 MaxSpansAtOnce = 25; ui32 MaxSpansPerSecond = 10; TDuration MaxSpanTimeInQueue = TDuration::Seconds(60); bool WakeupScheduled = false; public: TWilsonUploader(TString host, ui16 port, TString rootCA) : Host(std::move(host)) , Port(std::move(port)) , RootCA(std::move(rootCA)) {} ~TWilsonUploader() { CQ.Shutdown(); } static constexpr char ActorName[] = "WILSON_UPLOADER_ACTOR"; void Bootstrap() { Become(&TThis::StateFunc); Channel = grpc::CreateChannel(TStringBuilder() << Host << ":" << Port, grpc::SslCredentials({ .pem_root_certs = TFileInput(RootCA).ReadAll(), })); Stub = NServiceProto::TraceService::NewStub(Channel); LOG_INFO_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "TWilsonUploader::Bootstrap"); } void Handle(TEvWilson::TPtr ev) { if (SpansSize >= 100'000'000) { LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "dropped span due to overflow"); } else { const TMonotonic expirationTimestamp = TActivationContext::Monotonic() + MaxSpanTimeInQueue; auto& span = ev->Get()->Span; const ui32 size = span.ByteSizeLong(); Spans.push_back(TSpanQueueItem{expirationTimestamp, std::move(span), size}); SpansSize += size; CheckIfDone(); TryToSend(); } } void TryToSend() { const TMonotonic now = TActivationContext::Monotonic(); ui32 numSpansDropped = 0; while (!Spans.empty()) { const TSpanQueueItem& item = Spans.front(); if (item.ExpirationTimestamp <= now) { SpansSize -= item.Size; Spans.pop_front(); ++numSpansDropped; } else { break; } } if (numSpansDropped) { LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "dropped " << numSpansDropped << " span(s) due to expiration"); } if (Context || Spans.empty()) { return; } else if (now < NextSendTimestamp) { ScheduleWakeup(NextSendTimestamp); return; } NServiceProto::ExportTraceServiceRequest request; auto *rspan = request.add_resource_spans(); auto *sspan = rspan->add_scope_spans(); NextSendTimestamp = now; for (ui32 i = 0; i < MaxSpansAtOnce && !Spans.empty(); ++i, Spans.pop_front()) { auto& item = Spans.front(); auto& s = item.Span; LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span" << " TraceId# " << HexEncode(s.trace_id()) << " SpanId# " << HexEncode(s.span_id()) << " ParentSpanId# " << HexEncode(s.parent_span_id()) << " Name# " << s.name()); SpansSize -= item.Size; s.Swap(sspan->add_spans()); NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond); } Context = std::make_unique<grpc::ClientContext>(); Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ); Reader->Finish(&Response, &Status, nullptr); } void CheckIfDone() { if (Context) { void *tag; bool ok; if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) { if (!Status.ok()) { LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "failed to commit traces: " << Status.error_message()); } Reader.reset(); Context.reset(); } else { ScheduleWakeup(TDuration::MilliSeconds(100)); } } } template<typename T> void ScheduleWakeup(T&& deadline) { if (!WakeupScheduled) { TActivationContext::Schedule(deadline, new IEventHandle(TEvents::TSystem::Wakeup, 0, SelfId(), {}, nullptr, 0)); WakeupScheduled = true; } } void HandleWakeup() { Y_VERIFY(WakeupScheduled); WakeupScheduled = false; CheckIfDone(); TryToSend(); } STRICT_STFUNC(StateFunc, hFunc(TEvWilson, Handle); cFunc(TEvents::TSystem::Wakeup, HandleWakeup); ); }; } // anonymous IActor *CreateWilsonUploader(TString host, ui16 port, TString rootCA) { return new TWilsonUploader(std::move(host), port, std::move(rootCA)); } } // NWilson