aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-07-28 15:33:09 +0300
committeralexvru <alexvru@ydb.tech>2022-07-28 15:33:09 +0300
commit0836b1ef7920fc395f848c298b2ea601d83dbab2 (patch)
tree6c2bb4b2c6dcd32f045c8f8620d68a57e3d1f57a /library/cpp
parentb333b9c0b2519d13f1b00518e4ad398b6c06ace5 (diff)
downloadydb-0836b1ef7920fc395f848c298b2ea601d83dbab2.tar.gz
Mark down DS proxy and VDisk parts to use Wilson tracing
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h5
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp2
-rw-r--r--library/cpp/actors/interconnect/packet.h4
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/protos/actors.proto4
-rw-r--r--library/cpp/actors/wilson/CMakeLists.txt1
-rw-r--r--library/cpp/actors/wilson/wilson_span.cpp5
-rw-r--r--library/cpp/actors/wilson/wilson_span.h85
-rw-r--r--library/cpp/actors/wilson/wilson_trace.h55
-rw-r--r--library/cpp/actors/wilson/wilson_uploader.cpp32
11 files changed, 163 insertions, 34 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index f839741cb54..fed13946866 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -18,7 +18,7 @@ namespace NActors {
return false;
}
- const NWilson::TTraceId traceId(event.Span);
+ auto traceId = event.Span.GetTraceId();
event.Span.EndOk();
LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize());
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 25e996ddcae..8849d19b92c 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -55,12 +55,11 @@ namespace NActors {
~TEventOutputChannel() {
}
- std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TInstant now) {
+ std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
TEventHolder& event = Pool.Allocate(Queue);
const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1));
OutputQueueSize += bytes;
- if (auto span = NWilson::TSpan(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf,
- NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue")) {
+ if (auto span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
event.Span = std::move(span
.Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
.Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)));
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 0f286153ea5..feb55a16ad6 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -126,7 +126,7 @@ namespace NActors {
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
- const auto [dataSize, event] = oChannel.Push(*ev, TActivationContext::Now());
+ const auto [dataSize, event] = oChannel.Push(*ev);
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
TotalOutputQueueSize += dataSize;
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 9161e3af710..c8909c08a76 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -138,8 +138,8 @@ struct TEventHolder : TNonCopyable {
const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr;
Span.EndError("nondelivery");
auto ev = Event
- ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span)
- : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span);
+ ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId())
+ : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId());
NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure));
}
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 9b47f5b592d..565a511859e 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
- ch.Push(*ev, TInstant::Now());
+ ch.Push(*ev);
if (!wasWorking) {
scheduler.AddToHeap(ch, 0);
}
diff --git a/library/cpp/actors/protos/actors.proto b/library/cpp/actors/protos/actors.proto
index ad347dc2907..329eb998dfc 100644
--- a/library/cpp/actors/protos/actors.proto
+++ b/library/cpp/actors/protos/actors.proto
@@ -7,6 +7,10 @@ message TActorId {
required fixed64 RawX2 = 2;
}
+message TTraceId {
+ optional bytes Data = 1;
+}
+
message TCallbackException {
required TActorId ActorId = 1;
required string ExceptionMessage = 2;
diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt
index 75c7b16dff8..09a555a1317 100644
--- a/library/cpp/actors/wilson/CMakeLists.txt
+++ b/library/cpp/actors/wilson/CMakeLists.txt
@@ -12,6 +12,7 @@ target_link_libraries(cpp-actors-wilson PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-actors-protos
actors-wilson-protos
)
target_sources(cpp-actors-wilson PRIVATE
diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp
index a97184c3f09..c932560254d 100644
--- a/library/cpp/actors/wilson/wilson_span.cpp
+++ b/library/cpp/actors/wilson/wilson_span.cpp
@@ -53,9 +53,8 @@ namespace NWilson {
}
void TSpan::Send() {
- if (TlsActivationContext) {
- TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
- }
+ TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
+ Data->Sent = true;
}
} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_span.h b/library/cpp/actors/wilson/wilson_span.h
index c2de2f0b68c..3c201abd381 100644
--- a/library/cpp/actors/wilson/wilson_span.h
+++ b/library/cpp/actors/wilson/wilson_span.h
@@ -1,5 +1,7 @@
#pragma once
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/wilson/protos/trace.pb.h>
#include <util/generic/hash.h>
#include <util/datetime/cputimer.h>
@@ -42,12 +44,17 @@ namespace NWilson {
const ui64 StartCycles;
const TTraceId TraceId;
NTraceProto::Span Span;
+ bool Sent = false;
TData(TInstant startTime, ui64 startCycles, TTraceId traceId)
: StartTime(startTime)
, StartCycles(startCycles)
, TraceId(std::move(traceId))
{}
+
+ ~TData() {
+ Y_VERIFY_DEBUG(Sent);
+ }
};
std::unique_ptr<TData> Data;
@@ -57,94 +64,134 @@ namespace NWilson {
TSpan(const TSpan&) = delete;
TSpan(TSpan&&) = default;
- TSpan(ui8 verbosity, ERelation /*relation*/, TTraceId parentId, TInstant now, std::optional<TString> name)
- : Data(parentId ? std::make_unique<TData>(now, GetCycleCount(), parentId.Span(verbosity)) : nullptr)
+ TSpan(ui8 verbosity, TTraceId parentId, std::optional<TString> name)
+ : Data(parentId ? std::make_unique<TData>(TInstant::Now(), GetCycleCount(), parentId.Span(verbosity)) : nullptr)
{
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
if (!parentId.IsRoot()) {
Data->Span.set_parent_span_id(parentId.GetSpanIdPtr(), parentId.GetSpanIdSize());
}
- Data->Span.set_start_time_unix_nano(now.NanoSeconds());
+ Data->Span.set_start_time_unix_nano(Data->StartTime.NanoSeconds());
+ Data->Span.set_kind(opentelemetry::proto::trace::v1::Span::SPAN_KIND_INTERNAL);
if (name) {
Name(std::move(*name));
}
+
+ Attribute("node_id", NActors::TActivationContext::ActorSystem()->NodeId);
+ }
+ }
+
+ ~TSpan() {
+ if (Y_UNLIKELY(*this)) {
+ EndError("unterminated span");
}
}
TSpan& operator =(const TSpan&) = delete;
- TSpan& operator =(TSpan&&) = default;
- operator bool() const {
- return static_cast<bool>(Data);
+ TSpan& operator =(TSpan&& other) {
+ if (this != &other) {
+ if (Y_UNLIKELY(*this)) {
+ EndError("TSpan instance incorrectly overwritten");
+ }
+ Data = std::exchange(other.Data, nullptr);
+ }
+ return *this;
+ }
+
+ explicit operator bool() const {
+ return Data && !Data->Sent;
+ }
+
+ TSpan& Relation(ERelation /*relation*/) {
+ if (Y_UNLIKELY(*this)) {
+ // update relation in data somehow
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
+ }
+ return *this;
}
TSpan& Name(TString name) {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
Data->Span.set_name(std::move(name));
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
return *this;
}
TSpan& Attribute(TString name, TAttributeValue value) {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
SerializeKeyValue(std::move(name), std::move(value), Data->Span.add_attributes());
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
return *this;
}
TSpan& Event(TString name, TKeyValueList attributes) {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
auto *event = Data->Span.add_events();
event->set_time_unix_nano(TimeUnixNano());
event->set_name(std::move(name));
for (auto&& [key, value] : attributes) {
SerializeKeyValue(std::move(key), std::move(value), event->add_attributes());
}
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
return *this;
}
TSpan& Link(const TTraceId& traceId, TKeyValueList attributes) {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
auto *link = Data->Span.add_links();
link->set_trace_id(traceId.GetTraceIdPtr(), traceId.GetTraceIdSize());
link->set_span_id(traceId.GetSpanIdPtr(), traceId.GetSpanIdSize());
for (auto&& [key, value] : attributes) {
SerializeKeyValue(std::move(key), std::move(value), link->add_attributes());
}
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
return *this;
}
void EndOk() {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
auto *status = Data->Span.mutable_status();
status->set_code(NTraceProto::Status::STATUS_CODE_OK);
+ End();
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
- End();
}
void EndError(TString error) {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
auto *status = Data->Span.mutable_status();
status->set_code(NTraceProto::Status::STATUS_CODE_ERROR);
status->set_message(std::move(error));
+ End();
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
- End();
}
void End() {
- if (*this) {
- Data->Span.set_end_time_unix_nano(TimeUnixNano());
+ if (Y_UNLIKELY(*this)) {
Data->Span.set_trace_id(Data->TraceId.GetTraceIdPtr(), Data->TraceId.GetTraceIdSize());
Data->Span.set_span_id(Data->TraceId.GetSpanIdPtr(), Data->TraceId.GetSpanIdSize());
+ Data->Span.set_end_time_unix_nano(TimeUnixNano());
Send();
- Data.reset(); // tracing finished
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
}
- operator TTraceId() const {
+ TTraceId GetTraceId() const {
return Data ? TTraceId(Data->TraceId) : TTraceId();
}
diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h
index e3631a2403d..8c1fe05b082 100644
--- a/library/cpp/actors/wilson/wilson_trace.h
+++ b/library/cpp/actors/wilson/wilson_trace.h
@@ -1,5 +1,8 @@
#pragma once
+#include <library/cpp/actors/core/monotonic.h>
+#include <library/cpp/actors/protos/actors.pb.h>
+
#include <library/cpp/string_utils/base64/base64.h>
#include <util/stream/output.h>
@@ -28,6 +31,11 @@ namespace NWilson {
TTraceId(TTrace traceId, ui64 spanId, ui8 verbosity, ui32 timeToLive)
: TraceId(traceId)
{
+ if (timeToLive == Max<ui32>()) {
+ timeToLive = 4095;
+ }
+ Y_VERIFY(verbosity <= 15);
+ Y_VERIFY(timeToLive <= 4095);
SpanId = spanId;
Verbosity = verbosity;
TimeToLive = timeToLive;
@@ -86,7 +94,7 @@ namespace NWilson {
, Raw(other.Raw)
{
other.TraceId.fill(0);
- other.SpanId = 0;
+ other.SpanId = 1; // make it explicitly invalid
other.Raw = 0;
}
@@ -95,7 +103,10 @@ namespace NWilson {
: TraceId(other.TraceId)
, SpanId(other.SpanId)
, Raw(other.Raw)
- {}
+ {
+ // validate trace id only when we are making a copy
+ other.Validate();
+ }
TTraceId(const TSerializedTraceId& in) {
const char *p = in;
@@ -108,6 +119,17 @@ namespace NWilson {
Y_VERIFY_DEBUG(p - in == sizeof(TSerializedTraceId));
}
+ TTraceId(const NActorsProto::TTraceId& pb)
+ : TTraceId()
+ {
+ if (pb.HasData()) {
+ const auto& data = pb.GetData();
+ if (data.size() == sizeof(TSerializedTraceId)) {
+ *this = *reinterpret_cast<const TSerializedTraceId*>(data.data());
+ }
+ }
+ }
+
void Serialize(TSerializedTraceId *out) const {
char *p = *out;
memcpy(p, TraceId.data(), sizeof(TraceId));
@@ -119,13 +141,21 @@ namespace NWilson {
Y_VERIFY_DEBUG(p - *out == sizeof(TSerializedTraceId));
}
+ void Serialize(NActorsProto::TTraceId *pb) const {
+ if (*this) {
+ TSerializedTraceId data;
+ Serialize(&data);
+ pb->SetData(reinterpret_cast<const char*>(&data), sizeof(data));
+ }
+ }
+
TTraceId& operator=(TTraceId&& other) {
if (this != &other) {
TraceId = other.TraceId;
SpanId = other.SpanId;
Raw = other.Raw;
other.TraceId.fill(0);
- other.SpanId = 0;
+ other.SpanId = 1; // make it explicitly invalid
other.Raw = 0;
}
return *this;
@@ -138,11 +168,25 @@ namespace NWilson {
return TTraceId(GenerateTraceId(), 0, verbosity, timeToLive);
}
+ static TTraceId NewTraceIdThrottled(ui8 verbosity, ui32 timeToLive, std::atomic<NActors::TMonotonic>& counter,
+ NActors::TMonotonic now, TDuration periodBetweenSamples) {
+ static_assert(std::atomic<NActors::TMonotonic>::is_always_lock_free);
+ for (;;) {
+ NActors::TMonotonic ts = counter.load();
+ if (now < ts) {
+ return {};
+ } else if (counter.compare_exchange_strong(ts, now + periodBetweenSamples)) {
+ return NewTraceId(verbosity, timeToLive);
+ }
+ }
+ }
+
static TTraceId NewTraceId() { // NBS stub
return TTraceId();
}
TTraceId Span(ui8 verbosity) const {
+ Validate();
return *this && TimeToLive && verbosity <= Verbosity
? TTraceId(TraceId, GenerateSpanId(), Verbosity, TimeToLive - 1)
: TTraceId();
@@ -174,10 +218,13 @@ namespace NWilson {
const void *GetSpanIdPtr() const { return &SpanId; }
static constexpr size_t GetSpanIdSize() { return sizeof(ui64); }
+ void Validate() const {
+ Y_VERIFY_DEBUG(*this || !SpanId);
+ }
+
// for compatibility with NBS
TTraceId Clone() const { return NWilson::TTraceId(*this); }
ui64 GetTraceId() const { return 0; }
void OutputSpanId(IOutputStream&) const {}
};
-
}
diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp
index e87776717bb..6b4ef92b812 100644
--- a/library/cpp/actors/wilson/wilson_uploader.cpp
+++ b/library/cpp/actors/wilson/wilson_uploader.cpp
@@ -1,9 +1,11 @@
#include "wilson_uploader.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/wilson/protos/service.pb.h>
#include <library/cpp/actors/wilson/protos/service.grpc.pb.h>
#include <util/stream/file.h>
+#include <util/string/hex.h>
#include <grpc++/grpc++.h>
#include <chrono>
@@ -32,6 +34,8 @@ namespace NWilson {
NServiceProto::ExportTraceServiceResponse Response;
grpc::Status Status;
+ bool WakeupScheduled = false;
+
public:
TWilsonUploader(TString host, ui16 port, TString rootCA)
: Host(std::move(host))
@@ -50,6 +54,8 @@ namespace NWilson {
.pem_root_certs = TFileInput(RootCA).ReadAll(),
}));
Stub = NServiceProto::TraceService::NewStub(Channel);
+
+ LOG_INFO_S(*TlsActivationContext, 430, "TWilsonUploader::Bootstrap");
}
void Handle(TEvWilson::TPtr ev) {
@@ -67,6 +73,17 @@ namespace NWilson {
void SendRequest() {
Y_VERIFY(!Reader && !Context);
Context = std::make_unique<grpc::ClientContext>();
+ for (const auto& rs : Request.resource_spans()) {
+ for (const auto& ss : rs.scope_spans()) {
+ for (const auto& s : ss.spans()) {
+ LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span"
+ << " TraceId# " << HexEncode(s.trace_id())
+ << " SpanId# " << HexEncode(s.span_id())
+ << " ParentSpanId# " << HexEncode(s.parent_span_id())
+ << " Name# " << s.name());
+ }
+ }
+ }
Reader = Stub->AsyncExport(Context.get(), std::exchange(Request, {}), &CQ);
Reader->Finish(&Response, &Status, nullptr);
}
@@ -76,18 +93,33 @@ namespace NWilson {
void *tag;
bool ok;
if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) {
+ if (!Status.ok()) {
+ LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */,
+ "failed to commit traces: " << Status.error_message());
+ }
+
Reader.reset();
Context.reset();
if (Request.resource_spans_size()) {
SendRequest();
}
+ } else if (!WakeupScheduled) {
+ WakeupScheduled = true;
+ Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
}
}
}
+ void HandleWakeup() {
+ Y_VERIFY(WakeupScheduled);
+ WakeupScheduled = false;
+ CheckIfDone();
+ }
+
STRICT_STFUNC(StateFunc,
hFunc(TEvWilson, Handle);
+ cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
);
};