diff options
author | alexvru <alexvru@ydb.tech> | 2022-07-28 15:33:09 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-07-28 15:33:09 +0300 |
commit | 0836b1ef7920fc395f848c298b2ea601d83dbab2 (patch) | |
tree | 6c2bb4b2c6dcd32f045c8f8620d68a57e3d1f57a /library/cpp | |
parent | b333b9c0b2519d13f1b00518e4ad398b6c06ace5 (diff) | |
download | ydb-0836b1ef7920fc395f848c298b2ea601d83dbab2.tar.gz |
Mark down DS proxy and VDisk parts to use Wilson tracing
Diffstat (limited to 'library/cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_channel.h | 5 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_tcp_session.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/packet.h | 4 | ||||
-rw-r--r-- | library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp | 2 | ||||
-rw-r--r-- | library/cpp/actors/protos/actors.proto | 4 | ||||
-rw-r--r-- | library/cpp/actors/wilson/CMakeLists.txt | 1 | ||||
-rw-r--r-- | library/cpp/actors/wilson/wilson_span.cpp | 5 | ||||
-rw-r--r-- | library/cpp/actors/wilson/wilson_span.h | 85 | ||||
-rw-r--r-- | library/cpp/actors/wilson/wilson_trace.h | 55 | ||||
-rw-r--r-- | library/cpp/actors/wilson/wilson_uploader.cpp | 32 |
11 files changed, 163 insertions, 34 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index f839741cb54..fed13946866 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -18,7 +18,7 @@ namespace NActors { return false; } - const NWilson::TTraceId traceId(event.Span); + auto traceId = event.Span.GetTraceId(); event.Span.EndOk(); LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 25e996ddcae..8849d19b92c 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -55,12 +55,11 @@ namespace NActors { ~TEventOutputChannel() { } - std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TInstant now) { + std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) { TEventHolder& event = Pool.Allocate(Queue); const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1)); OutputQueueSize += bytes; - if (auto span = NWilson::TSpan(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf, - NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue")) { + if (auto span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) { event.Span = std::move(span .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize))); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 0f286153ea5..feb55a16ad6 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -126,7 +126,7 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev, TActivationContext::Now()); + const auto [dataSize, event] = oChannel.Push(*ev); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 9161e3af710..c8909c08a76 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -138,8 +138,8 @@ struct TEventHolder : TNonCopyable { const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr; Span.EndError("nondelivery"); auto ev = Event - ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span) - : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span); + ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId()) + : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId()); NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure)); } diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 9b47f5b592d..565a511859e 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); - ch.Push(*ev, TInstant::Now()); + ch.Push(*ev); if (!wasWorking) { scheduler.AddToHeap(ch, 0); } diff --git a/library/cpp/actors/protos/actors.proto b/library/cpp/actors/protos/actors.proto index ad347dc2907..329eb998dfc 100644 --- a/library/cpp/actors/protos/actors.proto +++ b/library/cpp/actors/protos/actors.proto @@ -7,6 +7,10 @@ message TActorId { required fixed64 RawX2 = 2; } +message TTraceId { + optional bytes Data = 1; +} + message TCallbackException { required TActorId ActorId = 1; required string ExceptionMessage = 2; diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt index 75c7b16dff8..09a555a1317 100644 --- a/library/cpp/actors/wilson/CMakeLists.txt +++ b/library/cpp/actors/wilson/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(cpp-actors-wilson PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-actors-protos actors-wilson-protos ) target_sources(cpp-actors-wilson PRIVATE diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp index a97184c3f09..c932560254d 100644 --- a/library/cpp/actors/wilson/wilson_span.cpp +++ b/library/cpp/actors/wilson/wilson_span.cpp @@ -53,9 +53,8 @@ namespace NWilson { } void TSpan::Send() { - if (TlsActivationContext) { - TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span))); - } + TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span))); + Data->Sent = true; } } // NWilson diff --git a/library/cpp/actors/wilson/wilson_span.h b/library/cpp/actors/wilson/wilson_span.h index c2de2f0b68c..3c201abd381 100644 --- a/library/cpp/actors/wilson/wilson_span.h +++ b/library/cpp/actors/wilson/wilson_span.h @@ -1,5 +1,7 @@ #pragma once +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/wilson/protos/trace.pb.h> #include <util/generic/hash.h> #include <util/datetime/cputimer.h> @@ -42,12 +44,17 @@ namespace NWilson { const ui64 StartCycles; const TTraceId TraceId; NTraceProto::Span Span; + bool Sent = false; TData(TInstant startTime, ui64 startCycles, TTraceId traceId) : StartTime(startTime) , StartCycles(startCycles) , TraceId(std::move(traceId)) {} + + ~TData() { + Y_VERIFY_DEBUG(Sent); + } }; std::unique_ptr<TData> Data; @@ -57,94 +64,134 @@ namespace NWilson { TSpan(const TSpan&) = delete; TSpan(TSpan&&) = default; - TSpan(ui8 verbosity, ERelation /*relation*/, TTraceId parentId, TInstant now, std::optional<TString> name) - : Data(parentId ? std::make_unique<TData>(now, GetCycleCount(), parentId.Span(verbosity)) : nullptr) + TSpan(ui8 verbosity, TTraceId parentId, std::optional<TString> name) + : Data(parentId ? std::make_unique<TData>(TInstant::Now(), GetCycleCount(), parentId.Span(verbosity)) : nullptr) { - if (*this) { + if (Y_UNLIKELY(*this)) { if (!parentId.IsRoot()) { Data->Span.set_parent_span_id(parentId.GetSpanIdPtr(), parentId.GetSpanIdSize()); } - Data->Span.set_start_time_unix_nano(now.NanoSeconds()); + Data->Span.set_start_time_unix_nano(Data->StartTime.NanoSeconds()); + Data->Span.set_kind(opentelemetry::proto::trace::v1::Span::SPAN_KIND_INTERNAL); if (name) { Name(std::move(*name)); } + + Attribute("node_id", NActors::TActivationContext::ActorSystem()->NodeId); + } + } + + ~TSpan() { + if (Y_UNLIKELY(*this)) { + EndError("unterminated span"); } } TSpan& operator =(const TSpan&) = delete; - TSpan& operator =(TSpan&&) = default; - operator bool() const { - return static_cast<bool>(Data); + TSpan& operator =(TSpan&& other) { + if (this != &other) { + if (Y_UNLIKELY(*this)) { + EndError("TSpan instance incorrectly overwritten"); + } + Data = std::exchange(other.Data, nullptr); + } + return *this; + } + + explicit operator bool() const { + return Data && !Data->Sent; + } + + TSpan& Relation(ERelation /*relation*/) { + if (Y_UNLIKELY(*this)) { + // update relation in data somehow + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); + } + return *this; } TSpan& Name(TString name) { - if (*this) { + if (Y_UNLIKELY(*this)) { Data->Span.set_name(std::move(name)); + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } return *this; } TSpan& Attribute(TString name, TAttributeValue value) { - if (*this) { + if (Y_UNLIKELY(*this)) { SerializeKeyValue(std::move(name), std::move(value), Data->Span.add_attributes()); + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } return *this; } TSpan& Event(TString name, TKeyValueList attributes) { - if (*this) { + if (Y_UNLIKELY(*this)) { auto *event = Data->Span.add_events(); event->set_time_unix_nano(TimeUnixNano()); event->set_name(std::move(name)); for (auto&& [key, value] : attributes) { SerializeKeyValue(std::move(key), std::move(value), event->add_attributes()); } + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } return *this; } TSpan& Link(const TTraceId& traceId, TKeyValueList attributes) { - if (*this) { + if (Y_UNLIKELY(*this)) { auto *link = Data->Span.add_links(); link->set_trace_id(traceId.GetTraceIdPtr(), traceId.GetTraceIdSize()); link->set_span_id(traceId.GetSpanIdPtr(), traceId.GetSpanIdSize()); for (auto&& [key, value] : attributes) { SerializeKeyValue(std::move(key), std::move(value), link->add_attributes()); } + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } return *this; } void EndOk() { - if (*this) { + if (Y_UNLIKELY(*this)) { auto *status = Data->Span.mutable_status(); status->set_code(NTraceProto::Status::STATUS_CODE_OK); + End(); + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } - End(); } void EndError(TString error) { - if (*this) { + if (Y_UNLIKELY(*this)) { auto *status = Data->Span.mutable_status(); status->set_code(NTraceProto::Status::STATUS_CODE_ERROR); status->set_message(std::move(error)); + End(); + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } - End(); } void End() { - if (*this) { - Data->Span.set_end_time_unix_nano(TimeUnixNano()); + if (Y_UNLIKELY(*this)) { Data->Span.set_trace_id(Data->TraceId.GetTraceIdPtr(), Data->TraceId.GetTraceIdSize()); Data->Span.set_span_id(Data->TraceId.GetSpanIdPtr(), Data->TraceId.GetSpanIdSize()); + Data->Span.set_end_time_unix_nano(TimeUnixNano()); Send(); - Data.reset(); // tracing finished + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } } - operator TTraceId() const { + TTraceId GetTraceId() const { return Data ? TTraceId(Data->TraceId) : TTraceId(); } diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h index e3631a2403d..8c1fe05b082 100644 --- a/library/cpp/actors/wilson/wilson_trace.h +++ b/library/cpp/actors/wilson/wilson_trace.h @@ -1,5 +1,8 @@ #pragma once +#include <library/cpp/actors/core/monotonic.h> +#include <library/cpp/actors/protos/actors.pb.h> + #include <library/cpp/string_utils/base64/base64.h> #include <util/stream/output.h> @@ -28,6 +31,11 @@ namespace NWilson { TTraceId(TTrace traceId, ui64 spanId, ui8 verbosity, ui32 timeToLive) : TraceId(traceId) { + if (timeToLive == Max<ui32>()) { + timeToLive = 4095; + } + Y_VERIFY(verbosity <= 15); + Y_VERIFY(timeToLive <= 4095); SpanId = spanId; Verbosity = verbosity; TimeToLive = timeToLive; @@ -86,7 +94,7 @@ namespace NWilson { , Raw(other.Raw) { other.TraceId.fill(0); - other.SpanId = 0; + other.SpanId = 1; // make it explicitly invalid other.Raw = 0; } @@ -95,7 +103,10 @@ namespace NWilson { : TraceId(other.TraceId) , SpanId(other.SpanId) , Raw(other.Raw) - {} + { + // validate trace id only when we are making a copy + other.Validate(); + } TTraceId(const TSerializedTraceId& in) { const char *p = in; @@ -108,6 +119,17 @@ namespace NWilson { Y_VERIFY_DEBUG(p - in == sizeof(TSerializedTraceId)); } + TTraceId(const NActorsProto::TTraceId& pb) + : TTraceId() + { + if (pb.HasData()) { + const auto& data = pb.GetData(); + if (data.size() == sizeof(TSerializedTraceId)) { + *this = *reinterpret_cast<const TSerializedTraceId*>(data.data()); + } + } + } + void Serialize(TSerializedTraceId *out) const { char *p = *out; memcpy(p, TraceId.data(), sizeof(TraceId)); @@ -119,13 +141,21 @@ namespace NWilson { Y_VERIFY_DEBUG(p - *out == sizeof(TSerializedTraceId)); } + void Serialize(NActorsProto::TTraceId *pb) const { + if (*this) { + TSerializedTraceId data; + Serialize(&data); + pb->SetData(reinterpret_cast<const char*>(&data), sizeof(data)); + } + } + TTraceId& operator=(TTraceId&& other) { if (this != &other) { TraceId = other.TraceId; SpanId = other.SpanId; Raw = other.Raw; other.TraceId.fill(0); - other.SpanId = 0; + other.SpanId = 1; // make it explicitly invalid other.Raw = 0; } return *this; @@ -138,11 +168,25 @@ namespace NWilson { return TTraceId(GenerateTraceId(), 0, verbosity, timeToLive); } + static TTraceId NewTraceIdThrottled(ui8 verbosity, ui32 timeToLive, std::atomic<NActors::TMonotonic>& counter, + NActors::TMonotonic now, TDuration periodBetweenSamples) { + static_assert(std::atomic<NActors::TMonotonic>::is_always_lock_free); + for (;;) { + NActors::TMonotonic ts = counter.load(); + if (now < ts) { + return {}; + } else if (counter.compare_exchange_strong(ts, now + periodBetweenSamples)) { + return NewTraceId(verbosity, timeToLive); + } + } + } + static TTraceId NewTraceId() { // NBS stub return TTraceId(); } TTraceId Span(ui8 verbosity) const { + Validate(); return *this && TimeToLive && verbosity <= Verbosity ? TTraceId(TraceId, GenerateSpanId(), Verbosity, TimeToLive - 1) : TTraceId(); @@ -174,10 +218,13 @@ namespace NWilson { const void *GetSpanIdPtr() const { return &SpanId; } static constexpr size_t GetSpanIdSize() { return sizeof(ui64); } + void Validate() const { + Y_VERIFY_DEBUG(*this || !SpanId); + } + // for compatibility with NBS TTraceId Clone() const { return NWilson::TTraceId(*this); } ui64 GetTraceId() const { return 0; } void OutputSpanId(IOutputStream&) const {} }; - } diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp index e87776717bb..6b4ef92b812 100644 --- a/library/cpp/actors/wilson/wilson_uploader.cpp +++ b/library/cpp/actors/wilson/wilson_uploader.cpp @@ -1,9 +1,11 @@ #include "wilson_uploader.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> #include <library/cpp/actors/wilson/protos/service.pb.h> #include <library/cpp/actors/wilson/protos/service.grpc.pb.h> #include <util/stream/file.h> +#include <util/string/hex.h> #include <grpc++/grpc++.h> #include <chrono> @@ -32,6 +34,8 @@ namespace NWilson { NServiceProto::ExportTraceServiceResponse Response; grpc::Status Status; + bool WakeupScheduled = false; + public: TWilsonUploader(TString host, ui16 port, TString rootCA) : Host(std::move(host)) @@ -50,6 +54,8 @@ namespace NWilson { .pem_root_certs = TFileInput(RootCA).ReadAll(), })); Stub = NServiceProto::TraceService::NewStub(Channel); + + LOG_INFO_S(*TlsActivationContext, 430, "TWilsonUploader::Bootstrap"); } void Handle(TEvWilson::TPtr ev) { @@ -67,6 +73,17 @@ namespace NWilson { void SendRequest() { Y_VERIFY(!Reader && !Context); Context = std::make_unique<grpc::ClientContext>(); + for (const auto& rs : Request.resource_spans()) { + for (const auto& ss : rs.scope_spans()) { + for (const auto& s : ss.spans()) { + LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span" + << " TraceId# " << HexEncode(s.trace_id()) + << " SpanId# " << HexEncode(s.span_id()) + << " ParentSpanId# " << HexEncode(s.parent_span_id()) + << " Name# " << s.name()); + } + } + } Reader = Stub->AsyncExport(Context.get(), std::exchange(Request, {}), &CQ); Reader->Finish(&Response, &Status, nullptr); } @@ -76,18 +93,33 @@ namespace NWilson { void *tag; bool ok; if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) { + if (!Status.ok()) { + LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, + "failed to commit traces: " << Status.error_message()); + } + Reader.reset(); Context.reset(); if (Request.resource_spans_size()) { SendRequest(); } + } else if (!WakeupScheduled) { + WakeupScheduled = true; + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); } } } + void HandleWakeup() { + Y_VERIFY(WakeupScheduled); + WakeupScheduled = false; + CheckIfDone(); + } + STRICT_STFUNC(StateFunc, hFunc(TEvWilson, Handle); + cFunc(TEvents::TSystem::Wakeup, HandleWakeup); ); }; |