diff options
author | pumpurum <olegsh@ydb.tech> | 2023-12-13 18:37:05 +0300 |
---|---|---|
committer | pumpurum <olegsh@ydb.tech> | 2023-12-13 22:11:15 +0300 |
commit | 33b1f1a3020fa35f93c7b9d0a4bf2a61a47f45cc (patch) | |
tree | 08a91ad48ef2becb801fbdcea7691ed1adbf005d | |
parent | 22206f8dd3ccdd3661540177e3c319894e770739 (diff) | |
download | ydb-33b1f1a3020fa35f93c7b9d0a4bf2a61a47f45cc.tar.gz |
KIKIMR-20042 Wilson uploader fix
-rw-r--r-- | ydb/library/actors/wilson/wilson_uploader.cpp | 20 |
1 files changed, 13 insertions, 7 deletions
diff --git a/ydb/library/actors/wilson/wilson_uploader.cpp b/ydb/library/actors/wilson/wilson_uploader.cpp index 6aeb3851ef7..24063fe625a 100644 --- a/ydb/library/actors/wilson/wilson_uploader.cpp +++ b/ydb/library/actors/wilson/wilson_uploader.cpp @@ -21,6 +21,8 @@ namespace NWilson { class TWilsonUploader : public TActorBootstrapped<TWilsonUploader> { + static constexpr size_t WILSON_SERVICE_ID = 430; + TString Host; ui16 Port; TString RootCA; @@ -72,20 +74,19 @@ namespace NWilson { }) : grpc::InsecureChannelCredentials()); Stub = NServiceProto::TraceService::NewStub(Channel); - LOG_INFO_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "TWilsonUploader::Bootstrap"); + LOG_INFO_S(*TlsActivationContext, WILSON_SERVICE_ID, "TWilsonUploader::Bootstrap"); } void Handle(TEvWilson::TPtr ev) { if (SpansSize >= 100'000'000) { - LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "dropped span due to overflow"); + LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID, "dropped span due to overflow"); } else { const TMonotonic expirationTimestamp = TActivationContext::Monotonic() + MaxSpanTimeInQueue; auto& span = ev->Get()->Span; const ui32 size = span.ByteSizeLong(); Spans.push_back(TSpanQueueItem{expirationTimestamp, std::move(span), size}); SpansSize += size; - CheckIfDone(); - TryToSend(); + TryMakeProgress(); } } @@ -105,7 +106,7 @@ namespace NWilson { } if (numSpansDropped) { - LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, + LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID, "dropped " << numSpansDropped << " span(s) due to expiration"); } @@ -128,7 +129,7 @@ namespace NWilson { auto& item = Spans.front(); auto& s = item.Span; - LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span" + LOG_DEBUG_S(*TlsActivationContext, WILSON_SERVICE_ID, "exporting span" << " TraceId# " << HexEncode(s.trace_id()) << " SpanId# " << HexEncode(s.span_id()) << " ParentSpanId# " << HexEncode(s.parent_span_id()) @@ -139,6 +140,7 @@ namespace NWilson { NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond); } + ScheduleWakeup(NextSendTimestamp); Context = std::make_unique<grpc::ClientContext>(); Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ); Reader->Finish(&Response, &Status, nullptr); @@ -150,7 +152,7 @@ namespace NWilson { bool ok; if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) { if (!Status.ok()) { - LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, + LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID, "failed to commit traces: " << Status.error_message()); } @@ -174,6 +176,10 @@ namespace NWilson { void HandleWakeup() { Y_ABORT_UNLESS(WakeupScheduled); WakeupScheduled = false; + TryMakeProgress(); + } + + void TryMakeProgress() { CheckIfDone(); TryToSend(); } |