aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpumpurum <olegsh@ydb.tech>2023-12-13 18:37:05 +0300
committerpumpurum <olegsh@ydb.tech>2023-12-13 22:11:15 +0300
commit33b1f1a3020fa35f93c7b9d0a4bf2a61a47f45cc (patch)
tree08a91ad48ef2becb801fbdcea7691ed1adbf005d
parent22206f8dd3ccdd3661540177e3c319894e770739 (diff)
downloadydb-33b1f1a3020fa35f93c7b9d0a4bf2a61a47f45cc.tar.gz
KIKIMR-20042 Wilson uploader fix
-rw-r--r--ydb/library/actors/wilson/wilson_uploader.cpp20
1 files changed, 13 insertions, 7 deletions
diff --git a/ydb/library/actors/wilson/wilson_uploader.cpp b/ydb/library/actors/wilson/wilson_uploader.cpp
index 6aeb3851ef7..24063fe625a 100644
--- a/ydb/library/actors/wilson/wilson_uploader.cpp
+++ b/ydb/library/actors/wilson/wilson_uploader.cpp
@@ -21,6 +21,8 @@ namespace NWilson {
class TWilsonUploader
: public TActorBootstrapped<TWilsonUploader>
{
+ static constexpr size_t WILSON_SERVICE_ID = 430;
+
TString Host;
ui16 Port;
TString RootCA;
@@ -72,20 +74,19 @@ namespace NWilson {
}) : grpc::InsecureChannelCredentials());
Stub = NServiceProto::TraceService::NewStub(Channel);
- LOG_INFO_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "TWilsonUploader::Bootstrap");
+ LOG_INFO_S(*TlsActivationContext, WILSON_SERVICE_ID, "TWilsonUploader::Bootstrap");
}
void Handle(TEvWilson::TPtr ev) {
if (SpansSize >= 100'000'000) {
- LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "dropped span due to overflow");
+ LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID, "dropped span due to overflow");
} else {
const TMonotonic expirationTimestamp = TActivationContext::Monotonic() + MaxSpanTimeInQueue;
auto& span = ev->Get()->Span;
const ui32 size = span.ByteSizeLong();
Spans.push_back(TSpanQueueItem{expirationTimestamp, std::move(span), size});
SpansSize += size;
- CheckIfDone();
- TryToSend();
+ TryMakeProgress();
}
}
@@ -105,7 +106,7 @@ namespace NWilson {
}
if (numSpansDropped) {
- LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */,
+ LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID,
"dropped " << numSpansDropped << " span(s) due to expiration");
}
@@ -128,7 +129,7 @@ namespace NWilson {
auto& item = Spans.front();
auto& s = item.Span;
- LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span"
+ LOG_DEBUG_S(*TlsActivationContext, WILSON_SERVICE_ID, "exporting span"
<< " TraceId# " << HexEncode(s.trace_id())
<< " SpanId# " << HexEncode(s.span_id())
<< " ParentSpanId# " << HexEncode(s.parent_span_id())
@@ -139,6 +140,7 @@ namespace NWilson {
NextSendTimestamp += TDuration::MicroSeconds(1'000'000 / MaxSpansPerSecond);
}
+ ScheduleWakeup(NextSendTimestamp);
Context = std::make_unique<grpc::ClientContext>();
Reader = Stub->AsyncExport(Context.get(), std::move(request), &CQ);
Reader->Finish(&Response, &Status, nullptr);
@@ -150,7 +152,7 @@ namespace NWilson {
bool ok;
if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) {
if (!Status.ok()) {
- LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */,
+ LOG_ERROR_S(*TlsActivationContext, WILSON_SERVICE_ID,
"failed to commit traces: " << Status.error_message());
}
@@ -174,6 +176,10 @@ namespace NWilson {
void HandleWakeup() {
Y_ABORT_UNLESS(WakeupScheduled);
WakeupScheduled = false;
+ TryMakeProgress();
+ }
+
+ void TryMakeProgress() {
CheckIfDone();
TryToSend();
}