diff options
author | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-16 14:45:38 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-16 14:45:38 +0300 |
commit | 20d96d3531fa27af7cf21e8de55d71255b054cfd (patch) | |
tree | ab97943b524e5f222b839b4c767321538244eb36 /library/cpp | |
parent | c0fe73f947f62476b336002f7fa85301f8a80dee (diff) | |
download | ydb-20d96d3531fa27af7cf21e8de55d71255b054cfd.tar.gz |
Refactor Wilson KIKIMR-15105
ref:55ce6a1b08bba785ea62b3bdfea902ef7263cf57
Diffstat (limited to 'library/cpp')
23 files changed, 955 insertions, 322 deletions
diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp index 23deaffd106..dfd79bf96e9 100644 --- a/library/cpp/actors/core/events_undelivered.cpp +++ b/library/cpp/actors/core/events_undelivered.cpp @@ -44,15 +44,15 @@ namespace NActors { const TActorId recp = OnNondeliveryHolder ? OnNondeliveryHolder->Recipient : TActorId(); if (Event) - return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, TraceId.Clone()); + return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, std::move(TraceId)); else - return new IEventHandle(Type, updatedFlags, recp, Sender, Buffer, Cookie, &Recipient, TraceId.Clone()); + return new IEventHandle(Type, updatedFlags, recp, Sender, Buffer, Cookie, &Recipient, std::move(TraceId)); } if (Flags & FlagTrackDelivery) { const ui32 updatedFlags = Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered); return new IEventHandle(Sender, Recipient, new TEvents::TEvUndelivered(Type, reason, unsure), updatedFlags, - Cookie, nullptr, TraceId.Clone()); + Cookie, nullptr, std::move(TraceId)); } return nullptr; diff --git a/library/cpp/actors/interconnect/CMakeLists.darwin.txt b/library/cpp/actors/interconnect/CMakeLists.darwin.txt index 16d15469206..9bd0c83fcea 100644 --- a/library/cpp/actors/interconnect/CMakeLists.darwin.txt +++ b/library/cpp/actors/interconnect/CMakeLists.darwin.txt @@ -21,6 +21,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC cpp-actors-prof cpp-actors-protos cpp-actors-util + cpp-actors-wilson cpp-digest-crc32c library-cpp-json library-cpp-lwtrace diff --git a/library/cpp/actors/interconnect/CMakeLists.linux.txt b/library/cpp/actors/interconnect/CMakeLists.linux.txt index 464477ce1dc..c0e1b39c45d 100644 --- a/library/cpp/actors/interconnect/CMakeLists.linux.txt +++ b/library/cpp/actors/interconnect/CMakeLists.linux.txt @@ -21,6 +21,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC cpp-actors-prof cpp-actors-protos cpp-actors-util + cpp-actors-wilson cpp-digest-crc32c library-cpp-json library-cpp-lwtrace diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 8a46ffd535f..474b3dba8d5 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -7,16 +7,9 @@ #include <util/network/address.h> #include "interconnect_stream.h" -#include "packet.h" #include "types.h" namespace NActors { - struct TProgramInfo { - ui64 PID = 0; - ui64 StartTime = 0; - ui64 Serial = 0; - }; - enum class ENetwork : ui32 { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // local messages diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index a66ba2a154d..32f015af54c 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -11,20 +11,18 @@ LWTRACE_USING(ACTORLIB_PROVIDER); namespace NActors { - DECLARE_WILSON_EVENT(EventSentToSocket); - DECLARE_WILSON_EVENT(EventReceivedFromSocket); - bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { - const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr); + const size_t descrSize = Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1); + const size_t amount = sizeof(TChannelPart) + descrSize; if (task.GetVirtualFreeAmount() < amount) { return false; } - NWilson::TTraceId traceId(event.Descr.TraceId); -// if (ctx) { -// WILSON_TRACE(*ctx, &traceId, EventSentToSocket); -// } - traceId.Serialize(&event.Descr.TraceId); + auto& span = *event.Span; + span.EndOk(); + const NWilson::TTraceId traceId(span); + event.Span.reset(); + LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); task.Orbit.Take(event.Orbit); @@ -33,8 +31,34 @@ namespace NActors { TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); part->Channel = ChannelId | TChannelPart::LastPartFlag; - part->Size = sizeof(TEventDescr); - memcpy(part + 1, &event.Descr, sizeof(TEventDescr)); + part->Size = descrSize; + + void *descr = part + 1; + if (Params.UseExtendedTraceFmt) { + auto *p = static_cast<TEventDescr2*>(descr); + *p = { + event.Descr.Type, + event.Descr.Flags, + event.Descr.Recipient, + event.Descr.Sender, + event.Descr.Cookie, + {}, + event.Descr.Checksum + }; + traceId.Serialize(&p->TraceId); + } else { + auto *p = static_cast<TEventDescr1*>(descr); + *p = { + event.Descr.Type, + event.Descr.Flags, + event.Descr.Recipient, + event.Descr.Sender, + event.Descr.Cookie, + {}, + event.Descr.Checksum + }; + } + task.AppendBuf(part, amount); *weightConsumed += amount; OutputQueueSize -= part->Size; diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index cf68cd27fd3..e48d294420b 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -8,7 +8,7 @@ #include <util/generic/vector.h> #include <util/generic/map.h> #include <util/stream/walk.h> -#include <library/cpp/actors/wilson/wilson_event.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include "interconnect_common.h" #include "interconnect_counters.h" @@ -55,10 +55,18 @@ namespace NActors { ~TEventOutputChannel() { } - std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) { + std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TInstant now) { TEventHolder& event = Pool.Allocate(Queue); - const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr); + const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1)); OutputQueueSize += bytes; + event.Span.emplace(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf, + NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue"); + if (*event.Span) { + auto& span = *event.Span; + span + .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) + .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)); + } return std::make_pair(bytes, &event); } @@ -102,8 +110,6 @@ namespace NActors { }; EState State = EState::INITIAL; - static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr); - protected: ui64 OutputQueueSize = 0; diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 9ede998d8e7..78e114a574f 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -496,6 +496,7 @@ namespace NActors { request.SetRequestModernFrame(true); request.SetRequestAuthOnly(Common->Settings.TlsAuthOnly); + request.SetRequestExtendedTraceFmt(true); SendExBlock(request, "ExRequest"); @@ -526,6 +527,7 @@ namespace NActors { Params.Encryption = success.GetStartEncryption(); Params.UseModernFrame = success.GetUseModernFrame(); Params.AuthOnly = Params.Encryption && success.GetAuthOnly(); + Params.UseExtendedTraceFmt = success.GetUseExtendedTraceFmt(); if (success.HasServerScopeId()) { ParsePeerScopeId(success.GetServerScopeId()); } @@ -681,6 +683,7 @@ namespace NActors { Params.UseModernFrame = request.GetRequestModernFrame(); Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly; + Params.UseExtendedTraceFmt = request.GetRequestExtendedTraceFmt(); if (request.HasClientScopeId()) { ParsePeerScopeId(request.GetClientScopeId()); @@ -706,6 +709,7 @@ namespace NActors { } success.SetUseModernFrame(Params.UseModernFrame); success.SetAuthOnly(Params.AuthOnly); + success.SetUseExtendedTraceFmt(Params.UseExtendedTraceFmt); SendExBlock(record, "ExReply"); // extract sender actor id (self virtual id) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 65bb956e584..cbb2d16e466 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -270,13 +270,43 @@ namespace NActors { Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size); - TEventDescr descr; + char buffer[Max(sizeof(TEventDescr1), sizeof(TEventDescr2))]; + auto& v1 = reinterpret_cast<TEventDescr1&>(buffer); + auto& v2 = reinterpret_cast<TEventDescr2&>(buffer); if (~part.Channel & TChannelPart::LastPartFlag) { Payload.ExtractFront(part.Size, eventData); - } else if (part.Size != sizeof(descr)) { + } else if (part.Size != sizeof(v1) && part.Size != sizeof(v2)) { LOG_CRIT_IC_SESSION("ICIS11", "incorrect last part of an event"); return DestroySession(TDisconnectReason::FormatError()); - } else if (Payload.ExtractFrontPlain(&descr, sizeof(descr))) { + } else if (Payload.ExtractFrontPlain(buffer, part.Size)) { + TEventData descr; + + switch (part.Size) { + case sizeof(TEventDescr1): + descr = { + v1.Type, + v1.Flags, + v1.Recipient, + v1.Sender, + v1.Cookie, + NWilson::TTraceId(), // do not accept traces with old format + v1.Checksum + }; + break; + + case sizeof(TEventDescr2): + descr = { + v2.Type, + v2.Flags, + v2.Recipient, + v2.Sender, + v2.Cookie, + NWilson::TTraceId(v2.TraceId), + v2.Checksum + }; + break; + } + Metrics->IncInputChannelsIncomingEvents(channel); ProcessEvent(*eventData, descr); *eventData = TRope(); @@ -286,7 +316,7 @@ namespace NActors { } } - void TInputSessionTCP::ProcessEvent(TRope& data, TEventDescr& descr) { + void TInputSessionTCP::ProcessEvent(TRope& data, TEventData& descr) { if (!Params.UseModernFrame || descr.Checksum) { ui32 checksum = 0; for (const auto&& [data, size] : data) { @@ -305,7 +335,7 @@ namespace NActors { MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)), descr.Cookie, Params.PeerScopeId, - NWilson::TTraceId(descr.TraceId)); + std::move(descr.TraceId)); if (Common->EventFilter && !Common->EventFilter->CheckIncomingEvent(*ev, Common->LocalScopeId)) { LOG_CRIT_IC_SESSION("ICIC03", "Event dropped due to scope error LocalScopeId# %s PeerScopeId# %s Type# 0x%08" PRIx32, ScopeIdToString(Common->LocalScopeId).data(), ScopeIdToString(Params.PeerScopeId).data(), descr.Type); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 2ded7f9f537..1602f4b8b2f 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -12,8 +12,6 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); - DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes)); - template<typename T> T Coalesce(T&& x) { return x; @@ -128,7 +126,7 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev); + const auto [dataSize, event] = oChannel.Push(*ev, TActivationContext::Now()); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; @@ -142,9 +140,6 @@ namespace NActors { ++NumEventsInReadyChannels; LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); - WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush, - QueueSizeInEvents = oChannel.GetQueueSize(), - QueueSizeInBytes = oChannel.GetBufferedAmountOfData()); // check for overloaded queues ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 5d4a381e1f5..9933bd489ed 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -249,7 +249,7 @@ namespace NActors { void ReceiveData(); void ProcessHeader(size_t headerLen); void ProcessPayload(ui64& numDataBytes); - void ProcessEvent(TRope& data, TEventDescr& descr); + void ProcessEvent(TRope& data, TEventData& descr); bool ReadMore(); void ReestablishConnection(TDisconnectReason reason); diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp index e2c289ed592..9ba173e3302 100644 --- a/library/cpp/actors/interconnect/packet.cpp +++ b/library/cpp/actors/interconnect/packet.cpp @@ -13,7 +13,6 @@ ui32 TEventHolder::Fill(IEventHandle& ev) { Descr.Recipient = ev.Recipient; Descr.Sender = ev.Sender; Descr.Cookie = ev.Cookie; - ev.TraceId.Serialize(&Descr.TraceId); ForwardRecipient = ev.GetForwardOnNondeliveryRecipient(); EventActuallySerialized = 0; Descr.Checksum = 0; diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 4ba50a2b5f4..3a6aadfb9f8 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -7,21 +7,18 @@ #include <library/cpp/containers/stack_vector/stack_vec.h> #include <library/cpp/actors/util/rope.h> #include <library/cpp/actors/prof/tag.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <library/cpp/digest/crc32c/crc32c.h> #include <library/cpp/lwtrace/shuttle.h> #include <util/generic/string.h> #include <util/generic/list.h> +#include "types.h" + #ifndef FORCE_EVENT_CHECKSUM #define FORCE_EVENT_CHECKSUM 0 #endif -using NActors::IEventBase; -using NActors::IEventHandle; -using NActors::TActorId; -using NActors::TConstIoVec; -using NActors::TEventSerializedData; - Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, size_t len) { if constexpr (NSan::MSanIsOn()) { const char *begin = static_cast<const char*>(data); @@ -33,14 +30,6 @@ Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, return Crc32cExtend(checksum, data, len); } -struct TSessionParams { - bool Encryption = {}; - bool UseModernFrame = {}; - bool AuthOnly = {}; - TString AuthCN; - NActors::TScopeId PeerScopeId; -}; - struct TTcpPacketHeader_v1 { ui32 HeaderCRC32; ui32 PayloadCRC32; @@ -87,21 +76,40 @@ union TTcpPacketBuf { } v2; }; +struct TEventData { + ui32 Type; + ui32 Flags; + TActorId Recipient; + TActorId Sender; + ui64 Cookie; + NWilson::TTraceId TraceId; + ui32 Checksum; +}; + #pragma pack(push, 1) -struct TEventDescr { +struct TEventDescr1 { + ui32 Type; + ui32 Flags; + TActorId Recipient; + TActorId Sender; + ui64 Cookie; + char TraceId[16]; // obsolete trace id format + ui32 Checksum; +}; + +struct TEventDescr2 { ui32 Type; ui32 Flags; TActorId Recipient; TActorId Sender; ui64 Cookie; - // wilson trace id is stored as a serialized entity to avoid using complex object with prohibited copy ctor NWilson::TTraceId::TSerializedTraceId TraceId; ui32 Checksum; }; #pragma pack(pop) struct TEventHolder : TNonCopyable { - TEventDescr Descr; + TEventData Descr; TActorId ForwardRecipient; THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; @@ -109,6 +117,7 @@ struct TEventHolder : TNonCopyable { ui32 EventSerializedSize; ui32 EventActuallySerialized; mutable NLWTrace::TOrbit Orbit; + std::optional<NWilson::TSpan> Span; ui32 Fill(IEventHandle& ev); @@ -123,7 +132,7 @@ struct TEventHolder : TNonCopyable { } void ForwardOnNondelivery(bool unsure) { - TEventDescr& d = Descr; + TEventData& d = Descr; const TActorId& r = d.Recipient; const TActorId& s = d.Sender; const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr; @@ -137,6 +146,7 @@ struct TEventHolder : TNonCopyable { Event.Reset(); Buffer.Reset(); Orbit.Reset(); + Span.reset(); } }; diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h index 2662c50c220..e0965d78079 100644 --- a/library/cpp/actors/interconnect/types.h +++ b/library/cpp/actors/interconnect/types.h @@ -1,5 +1,9 @@ #pragma once +#include <library/cpp/actors/core/defs.h> +#include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/event.h> + #include <util/generic/string.h> namespace NActors { @@ -40,4 +44,26 @@ namespace NActors { static TVector<const char*> Reasons; }; + struct TProgramInfo { + ui64 PID = 0; + ui64 StartTime = 0; + ui64 Serial = 0; + }; + + struct TSessionParams { + bool Encryption = {}; + bool UseModernFrame = {}; + bool AuthOnly = {}; + bool UseExtendedTraceFmt = {}; + TString AuthCN; + NActors::TScopeId PeerScopeId; + }; + } // NActors + +using NActors::IEventBase; +using NActors::IEventHandle; +using NActors::TActorId; +using NActors::TConstIoVec; +using NActors::TEventSerializedData; +using NActors::TSessionParams; diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto index 2e3b0d0d15d..69721b1e065 100644 --- a/library/cpp/actors/protos/interconnect.proto +++ b/library/cpp/actors/protos/interconnect.proto @@ -70,8 +70,8 @@ message THandshakeRequest { optional bool DoCheckCookie = 17; optional bool RequestModernFrame = 18; - optional bool RequestAuthOnly = 19; + optional bool RequestExtendedTraceFmt = 20; } message THandshakeSuccess { @@ -92,8 +92,8 @@ message THandshakeSuccess { optional TScopeId ServerScopeId = 10; optional bool UseModernFrame = 11; - optional bool AuthOnly = 12; + optional bool UseExtendedTraceFmt = 13; } message THandshakeReply { diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt index 03d8c542ff7..65b2e32d87e 100644 --- a/library/cpp/actors/wilson/CMakeLists.txt +++ b/library/cpp/actors/wilson/CMakeLists.txt @@ -7,9 +7,13 @@ -add_library(cpp-actors-wilson INTERFACE) -target_link_libraries(cpp-actors-wilson INTERFACE +add_library(cpp-actors-wilson) +target_link_libraries(cpp-actors-wilson PUBLIC contrib-libs-cxxsupp yutil - cpp-string_utils-base64 + cpp-actors-core + actors-wilson-protos +) +target_sources(cpp-actors-wilson PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_span.cpp ) diff --git a/library/cpp/actors/wilson/protos/CMakeLists.txt b/library/cpp/actors/wilson/protos/CMakeLists.txt new file mode 100644 index 00000000000..a7cc2e94ffd --- /dev/null +++ b/library/cpp/actors/wilson/protos/CMakeLists.txt @@ -0,0 +1,33 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(actors-wilson-protos) +target_link_libraries(actors-wilson-protos PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_proto_messages(actors-wilson-protos PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/common.proto + ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/resource.proto + ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/trace.proto +) +target_proto_addincls(actors-wilson-protos + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(actors-wilson-protos + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/library/cpp/actors/wilson/protos/common.proto b/library/cpp/actors/wilson/protos/common.proto new file mode 100644 index 00000000000..8562ee6d1e3 --- /dev/null +++ b/library/cpp/actors/wilson/protos/common.proto @@ -0,0 +1,84 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.common.v1; + +// AnyValue is used to represent any type of attribute value. AnyValue may contain a +// primitive value such as a string or integer or it may contain an arbitrary nested +// object containing arrays, key-value lists and primitives. +message AnyValue { + // The value is one of the listed fields. It is valid for all values to be unspecified + // in which case this AnyValue is considered to be "empty". + oneof value { + string string_value = 1; + bool bool_value = 2; + int64 int_value = 3; + double double_value = 4; + ArrayValue array_value = 5; + KeyValueList kvlist_value = 6; + bytes bytes_value = 7; + } +} + +// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message +// since oneof in AnyValue does not allow repeated fields. +message ArrayValue { + // Array of values. The array may be empty (contain 0 elements). + repeated AnyValue values = 1; +} + +// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message +// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need +// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to +// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches +// are semantically equivalent. +message KeyValueList { + // A collection of key/value pairs of key-value pairs. The list may be empty (may + // contain 0 elements). + // The keys MUST be unique (it is not allowed to have more than one + // value with the same key). + repeated KeyValue values = 1; +} + +// KeyValue is a key-value pair that is used to store Span attributes, Link +// attributes, etc. +message KeyValue { + string key = 1; + AnyValue value = 2; +} + +// InstrumentationLibrary is a message representing the instrumentation library information +// such as the fully qualified name and version. +// InstrumentationLibrary is wire-compatible with InstrumentationScope for binary +// Protobuf format. +// This message is deprecated and will be removed on June 15, 2022. +message InstrumentationLibrary { + option deprecated = true; + + // An empty instrumentation library name means the name is unknown. + string name = 1; + string version = 2; +} + +// InstrumentationScope is a message representing the instrumentation scope information +// such as the fully qualified name and version. +message InstrumentationScope { + // An empty instrumentation scope name means the name is unknown. + string name = 1; + string version = 2; + repeated KeyValue attributes = 3; + uint32 dropped_attributes_count = 4; +} diff --git a/library/cpp/actors/wilson/protos/resource.proto b/library/cpp/actors/wilson/protos/resource.proto new file mode 100644 index 00000000000..752bf287ea8 --- /dev/null +++ b/library/cpp/actors/wilson/protos/resource.proto @@ -0,0 +1,31 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.resource.v1; + +import "library/cpp/actors/wilson/protos/common.proto"; + +// Resource information. +message Resource { + // Set of attributes that describe the resource. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated opentelemetry.proto.common.v1.KeyValue attributes = 1; + + // dropped_attributes_count is the number of dropped attributes. If the value is 0, then + // no attributes were dropped. + uint32 dropped_attributes_count = 2; +} diff --git a/library/cpp/actors/wilson/protos/trace.proto b/library/cpp/actors/wilson/protos/trace.proto new file mode 100644 index 00000000000..0b645cf8adb --- /dev/null +++ b/library/cpp/actors/wilson/protos/trace.proto @@ -0,0 +1,326 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.trace.v1; + +import "library/cpp/actors/wilson/protos/common.proto"; +import "library/cpp/actors/wilson/protos/resource.proto"; + +// TracesData represents the traces data that can be stored in a persistent storage, +// OR can be embedded by other protocols that transfer OTLP traces data but do +// not implement the OTLP protocol. +// +// The main difference between this message and collector protocol is that +// in this message there will not be any "control" or "metadata" specific to +// OTLP protocol. +// +// When new fields are added into this message, the OTLP request MUST be updated +// as well. +message TracesData { + // An array of ResourceSpans. + // For data coming from a single resource this array will typically contain + // one element. Intermediary nodes that receive data from multiple origins + // typically batch the data before forwarding further and in that case this + // array will contain multiple elements. + repeated ResourceSpans resource_spans = 1; +} + +// A collection of ScopeSpans from a Resource. +message ResourceSpans { + // The resource for the spans in this message. + // If this field is not set then no resource info is known. + opentelemetry.proto.resource.v1.Resource resource = 1; + + // A list of ScopeSpans that originate from a resource. + repeated ScopeSpans scope_spans = 2; + + // A list of InstrumentationLibrarySpans that originate from a resource. + // This field is deprecated and will be removed after grace period expires on June 15, 2022. + // + // During the grace period the following rules SHOULD be followed: + // + // For Binary Protobufs + // ==================== + // Binary Protobuf senders SHOULD NOT set instrumentation_library_spans. Instead + // scope_spans SHOULD be set. + // + // Binary Protobuf receivers SHOULD check if instrumentation_library_spans is set + // and scope_spans is not set then the value in instrumentation_library_spans + // SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans. + // If scope_spans is set then instrumentation_library_spans SHOULD be ignored. + // + // For JSON + // ======== + // JSON senders that set instrumentation_library_spans field MAY also set + // scope_spans to carry the same spans, essentially double-publishing the same data. + // Such double-publishing MAY be controlled by a user-settable option. + // If double-publishing is not used then the senders SHOULD set scope_spans and + // SHOULD NOT set instrumentation_library_spans. + // + // JSON receivers SHOULD check if instrumentation_library_spans is set and + // scope_spans is not set then the value in instrumentation_library_spans + // SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans. + // If scope_spans is set then instrumentation_library_spans field SHOULD be ignored. + repeated InstrumentationLibrarySpans instrumentation_library_spans = 1000 [deprecated = true]; + + // This schema_url applies to the data in the "resource" field. It does not apply + // to the data in the "scope_spans" field which have their own schema_url field. + string schema_url = 3; +} + +// A collection of Spans produced by an InstrumentationScope. +message ScopeSpans { + // The instrumentation scope information for the spans in this message. + // Semantically when InstrumentationScope isn't set, it is equivalent with + // an empty instrumentation scope name (unknown). + opentelemetry.proto.common.v1.InstrumentationScope scope = 1; + + // A list of Spans that originate from an instrumentation scope. + repeated Span spans = 2; + + // This schema_url applies to all spans and span events in the "spans" field. + string schema_url = 3; +} + +// A collection of Spans produced by an InstrumentationLibrary. +// InstrumentationLibrarySpans is wire-compatible with ScopeSpans for binary +// Protobuf format. +// This message is deprecated and will be removed on June 15, 2022. +message InstrumentationLibrarySpans { + option deprecated = true; + + // The instrumentation library information for the spans in this message. + // Semantically when InstrumentationLibrary isn't set, it is equivalent with + // an empty instrumentation library name (unknown). + opentelemetry.proto.common.v1.InstrumentationLibrary instrumentation_library = 1; + + // A list of Spans that originate from an instrumentation library. + repeated Span spans = 2; + + // This schema_url applies to all spans and span events in the "spans" field. + string schema_url = 3; +} + +// Span represents a single operation within a trace. Spans can be +// nested to form a trace tree. Spans may also be linked to other spans +// from the same or different trace and form graphs. Often, a trace +// contains a root span that describes the end-to-end latency, and one +// or more subspans for its sub-operations. A trace can also contain +// multiple root spans, or none at all. Spans do not need to be +// contiguous - there may be gaps or overlaps between spans in a trace. +// +// The next available field id is 17. +message Span { + // A unique identifier for a trace. All spans from the same trace share + // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes + // is considered invalid. + // + // This field is semantically required. Receiver should generate new + // random trace_id if empty or invalid trace_id was received. + // + // This field is required. + bytes trace_id = 1; + + // A unique identifier for a span within a trace, assigned when the span + // is created. The ID is an 8-byte array. An ID with all zeroes is considered + // invalid. + // + // This field is semantically required. Receiver should generate new + // random span_id if empty or invalid span_id was received. + // + // This field is required. + bytes span_id = 2; + + // trace_state conveys information about request position in multiple distributed tracing graphs. + // It is a trace_state in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header + // See also https://github.com/w3c/distributed-tracing for more details about this field. + string trace_state = 3; + + // The `span_id` of this span's parent span. If this is a root span, then this + // field must be empty. The ID is an 8-byte array. + bytes parent_span_id = 4; + + // A description of the span's operation. + // + // For example, the name can be a qualified method name or a file name + // and a line number where the operation is called. A best practice is to use + // the same display name at the same call point in an application. + // This makes it easier to correlate spans in different traces. + // + // This field is semantically required to be set to non-empty string. + // Empty value is equivalent to an unknown span name. + // + // This field is required. + string name = 5; + + // SpanKind is the type of span. Can be used to specify additional relationships between spans + // in addition to a parent/child relationship. + enum SpanKind { + // Unspecified. Do NOT use as default. + // Implementations MAY assume SpanKind to be INTERNAL when receiving UNSPECIFIED. + SPAN_KIND_UNSPECIFIED = 0; + + // Indicates that the span represents an internal operation within an application, + // as opposed to an operation happening at the boundaries. Default value. + SPAN_KIND_INTERNAL = 1; + + // Indicates that the span covers server-side handling of an RPC or other + // remote network request. + SPAN_KIND_SERVER = 2; + + // Indicates that the span describes a request to some remote service. + SPAN_KIND_CLIENT = 3; + + // Indicates that the span describes a producer sending a message to a broker. + // Unlike CLIENT and SERVER, there is often no direct critical path latency relationship + // between producer and consumer spans. A PRODUCER span ends when the message was accepted + // by the broker while the logical processing of the message might span a much longer time. + SPAN_KIND_PRODUCER = 4; + + // Indicates that the span describes consumer receiving a message from a broker. + // Like the PRODUCER kind, there is often no direct critical path latency relationship + // between producer and consumer spans. + SPAN_KIND_CONSUMER = 5; + } + + // Distinguishes between spans generated in a particular context. For example, + // two spans with the same name may be distinguished using `CLIENT` (caller) + // and `SERVER` (callee) to identify queueing latency associated with the span. + SpanKind kind = 6; + + // start_time_unix_nano is the start time of the span. On the client side, this is the time + // kept by the local machine where the span execution starts. On the server side, this + // is the time when the server's application handler starts running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 start_time_unix_nano = 7; + + // end_time_unix_nano is the end time of the span. On the client side, this is the time + // kept by the local machine where the span execution ends. On the server side, this + // is the time when the server application handler stops running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 end_time_unix_nano = 8; + + // attributes is a collection of key/value pairs. Note, global attributes + // like server name can be set using the resource API. Examples of attributes: + // + // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + // "/http/server_latency": 300 + // "abc.com/myattribute": true + // "abc.com/score": 10.239 + // + // The OpenTelemetry API specification further restricts the allowed value types: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/common.md#attributes + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated opentelemetry.proto.common.v1.KeyValue attributes = 9; + + // dropped_attributes_count is the number of attributes that were discarded. Attributes + // can be discarded because their keys are too long or because there are too many + // attributes. If this value is 0, then no attributes were dropped. + uint32 dropped_attributes_count = 10; + + // Event is a time-stamped annotation of the span, consisting of user-supplied + // text description and key-value pairs. + message Event { + // time_unix_nano is the time the event occurred. + fixed64 time_unix_nano = 1; + + // name of the event. + // This field is semantically required to be set to non-empty string. + string name = 2; + + // attributes is a collection of attribute key/value pairs on the event. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated opentelemetry.proto.common.v1.KeyValue attributes = 3; + + // dropped_attributes_count is the number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 4; + } + + // events is a collection of Event items. + repeated Event events = 11; + + // dropped_events_count is the number of dropped events. If the value is 0, then no + // events were dropped. + uint32 dropped_events_count = 12; + + // A pointer from the current span to another span in the same trace or in a + // different trace. For example, this can be used in batching operations, + // where a single batch handler processes multiple requests from different + // traces or when the handler receives a request from a different project. + message Link { + // A unique identifier of a trace that this linked span is part of. The ID is a + // 16-byte array. + bytes trace_id = 1; + + // A unique identifier for the linked span. The ID is an 8-byte array. + bytes span_id = 2; + + // The trace_state associated with the link. + string trace_state = 3; + + // attributes is a collection of attribute key/value pairs on the link. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated opentelemetry.proto.common.v1.KeyValue attributes = 4; + + // dropped_attributes_count is the number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 5; + } + + // links is a collection of Links, which are references from this span to a span + // in the same or different trace. + repeated Link links = 13; + + // dropped_links_count is the number of dropped links after the maximum size was + // enforced. If this value is 0, then no links were dropped. + uint32 dropped_links_count = 14; + + // An optional final status for this span. Semantically when Status isn't set, it means + // span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0). + Status status = 15; +} + +// The Status type defines a logical error model that is suitable for different +// programming environments, including REST APIs and RPC APIs. +message Status { + reserved 1; + + // A developer-facing human readable error message. + string message = 2; + + // For the semantics of status codes see + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#set-status + enum StatusCode { + // The default status. + STATUS_CODE_UNSET = 0; + // The Span has been validated by an Application developers or Operator to have + // completed successfully. + STATUS_CODE_OK = 1; + // The Span contains an error. + STATUS_CODE_ERROR = 2; + }; + + // The status code. + StatusCode code = 3; +} diff --git a/library/cpp/actors/wilson/wilson_event.h b/library/cpp/actors/wilson/wilson_event.h index 7d89c33b518..4b6a7612c05 100644 --- a/library/cpp/actors/wilson/wilson_event.h +++ b/library/cpp/actors/wilson/wilson_event.h @@ -3,179 +3,19 @@ #include "wilson_trace.h" #include <library/cpp/string_utils/base64/base64.h> - +#include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/log.h> namespace NWilson { -#if !defined(_win_) -// works only for those compilers, who trait C++ as ISO IEC 14882, not their own standard - -#define __UNROLL_PARAMS_8(N, F, X, ...) \ - F(X, N - 8) \ - __UNROLL_PARAMS_7(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_7(N, F, X, ...) \ - F(X, N - 7) \ - __UNROLL_PARAMS_6(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_6(N, F, X, ...) \ - F(X, N - 6) \ - __UNROLL_PARAMS_5(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_5(N, F, X, ...) \ - F(X, N - 5) \ - __UNROLL_PARAMS_4(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_4(N, F, X, ...) \ - F(X, N - 4) \ - __UNROLL_PARAMS_3(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_3(N, F, X, ...) \ - F(X, N - 3) \ - __UNROLL_PARAMS_2(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_2(N, F, X, ...) \ - F(X, N - 2) \ - __UNROLL_PARAMS_1(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_1(N, F, X) F(X, N - 1) -#define __UNROLL_PARAMS_0(N, F) -#define __EX(...) __VA_ARGS__ -#define __NUM_PARAMS(...) __NUM_PARAMS_SELECT_N(__VA_ARGS__, __NUM_PARAMS_SEQ) -#define __NUM_PARAMS_SELECT_N(...) __EX(__NUM_PARAMS_SELECT(__VA_ARGS__)) -#define __NUM_PARAMS_SELECT(X, _1, _2, _3, _4, _5, _6, _7, _8, N, ...) N -#define __NUM_PARAMS_SEQ 8, 7, 6, 5, 4, 3, 2, 1, 0, ERROR -#define __CAT(X, Y) X##Y -#define __UNROLL_PARAMS_N(N, F, ...) __EX(__CAT(__UNROLL_PARAMS_, N)(N, F, ##__VA_ARGS__)) -#define __UNROLL_PARAMS(F, ...) __UNROLL_PARAMS_N(__NUM_PARAMS(X, ##__VA_ARGS__), F, ##__VA_ARGS__) -#define __EX2(F, X, INDEX) __INVOKE(F, __EX X, INDEX) -#define __INVOKE(F, ...) F(__VA_ARGS__) - -#define __DECLARE_PARAM(X, INDEX) __EX2(__DECLARE_PARAM_X, X, INDEX) -#define __DECLARE_PARAM_X(TYPE, NAME, INDEX) \ - static const struct T##NAME##Param \ - : ::NWilson::TParamBinder<INDEX, TYPE> { \ - T##NAME##Param() { \ - } \ - using ::NWilson::TParamBinder<INDEX, TYPE>::operator=; \ - } NAME; - -#define __TUPLE_PARAM(X, INDEX) __EX2(__TUPLE_PARAM_X, X, INDEX) -#define __TUPLE_PARAM_X(TYPE, NAME, INDEX) TYPE, - -#define __OUTPUT_PARAM(X, INDEX) __EX2(__OUTPUT_PARAM_X, X, INDEX) -#define __OUTPUT_PARAM_X(TYPE, NAME, INDEX) str << (INDEX ? ", " : "") << #NAME << "# " << std::get<INDEX>(ParamPack); - -#define __FILL_PARAM(P, INDEX) \ - do { \ - const auto& boundParam = (NParams::P); \ - boundParam.Apply(event.ParamPack); \ - } while (false); - -#define DECLARE_WILSON_EVENT(EVENT_NAME, ...) \ - namespace N##EVENT_NAME##Params { \ - __UNROLL_PARAMS(__DECLARE_PARAM, ##__VA_ARGS__) \ - \ - using TParamPack = std::tuple< \ - __UNROLL_PARAMS(__TUPLE_PARAM, ##__VA_ARGS__) char>; \ - } \ - struct T##EVENT_NAME { \ - using TParamPack = N##EVENT_NAME##Params::TParamPack; \ - TParamPack ParamPack; \ - \ - void Output(IOutputStream& str) { \ - str << #EVENT_NAME << "{"; \ - __UNROLL_PARAMS(__OUTPUT_PARAM, ##__VA_ARGS__) \ - str << "}"; \ - } \ - }; - - template <size_t INDEX, typename T> - class TBoundParam { - mutable T Value; - - public: - TBoundParam(T&& value) - : Value(std::move(value)) - { - } - - template <typename TParamPack> - void Apply(TParamPack& pack) const { - std::get<INDEX>(pack) = std::move(Value); - } - }; - - template <size_t INDEX, typename T> - struct TParamBinder { - template <typename TValue> - TBoundParam<INDEX, T> operator=(const TValue& value) const { - return TBoundParam<INDEX, T>(TValue(value)); - } - - template <typename TValue> - TBoundParam<INDEX, T> operator=(TValue&& value) const { - return TBoundParam<INDEX, T>(std::move(value)); - } - }; - -// generate wilson event having parent TRACE_ID and span TRACE_ID to become parent of logged event -#define WILSON_TRACE(CTX, TRACE_ID, EVENT_NAME, ...) \ - if (::NWilson::TraceEnabled(CTX)) { \ - ::NWilson::TTraceId* __traceId = (TRACE_ID); \ - if (__traceId && *__traceId) { \ - TInstant now = Now(); \ - T##EVENT_NAME event; \ - namespace NParams = N##EVENT_NAME##Params; \ - __UNROLL_PARAMS(__FILL_PARAM, ##__VA_ARGS__) \ - ::NWilson::TraceEvent((CTX), __traceId, event, now); \ - } \ - } - inline ui32 GetNodeId(const NActors::TActorSystem& actorSystem) { - return actorSystem.NodeId; + // stub for NBS + template<typename TActorSystem> + inline bool TraceEnabled(const TActorSystem&) { + return false; } - inline ui32 GetNodeId(const NActors::TActivationContext& ac) { - return GetNodeId(*ac.ExecutorThread.ActorSystem); - } - - constexpr ui32 WilsonComponentId = 430; // kikimrservices: wilson - - template <typename TActorSystem> - bool TraceEnabled(const TActorSystem& ctx) { - const auto* loggerSettings = ctx.LoggerSettings(); - return loggerSettings && loggerSettings->Satisfies(NActors::NLog::PRI_DEBUG, WilsonComponentId); - } - - template <typename TActorSystem, typename TEvent> - void TraceEvent(const TActorSystem& actorSystem, TTraceId* traceId, TEvent&& event, TInstant timestamp) { - // ensure that we are not using obsolete TraceId - traceId->CheckConsistency(); - - // store parent id (for logging) and generate child trace id - TTraceId parentTraceId(std::move(*traceId)); - *traceId = parentTraceId.Span(); - - // create encoded string buffer containing timestamp - const ui64 timestampValue = timestamp.GetValue(); - const size_t base64size = Base64EncodeBufSize(sizeof(timestampValue)); - char base64[base64size]; - char* end = Base64Encode(base64, reinterpret_cast<const ui8*>(×tampValue), sizeof(timestampValue)); - - // cut trailing padding character to save some space - Y_VERIFY(end > base64 && end[-1] == '='); - --end; - - // generate log record - TString finalMessage; - TStringOutput s(finalMessage); - s << GetNodeId(actorSystem) << " " << TStringBuf(base64, end) << " "; - traceId->Output(s, parentTraceId); - s << " "; - event.Output(s); - - // output wilson event FIXME: special facility for wilson events w/binary serialization - NActors::MemLogAdapter(actorSystem, NActors::NLog::PRI_DEBUG, WilsonComponentId, std::move(finalMessage)); - } - -#else - -#define DECLARE_WILSON_EVENT(...) -#define WILSON_TRACE(...) -#endif + template<typename TActorSystem, typename TEvent> + inline void TraceEvent(const TActorSystem&, TTraceId*, TEvent&&, TInstant) + {} } // NWilson diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp new file mode 100644 index 00000000000..6b6ea03ccf6 --- /dev/null +++ b/library/cpp/actors/wilson/wilson_span.cpp @@ -0,0 +1,64 @@ +#include "wilson_span.h" +#include <library/cpp/actors/core/log.h> +#include <google/protobuf/text_format.h> + +namespace NWilson { + + using namespace NActors; + + void SerializeValue(TAttributeValue value, NCommonProto::AnyValue *pb) { + switch (value.index()) { + case 0: + pb->set_string_value(std::get<0>(std::move(value))); + break; + + case 1: + pb->set_bool_value(std::get<1>(value)); + break; + + case 2: + pb->set_int_value(std::get<2>(value)); + break; + + case 3: + pb->set_double_value(std::get<3>(std::move(value))); + break; + + case 4: { + auto *array = pb->mutable_array_value(); + for (auto&& item : std::get<4>(std::move(value))) { + SerializeValue(std::move(item), array->add_values()); + } + break; + } + + case 5: { + auto *kv = pb->mutable_kvlist_value(); + for (auto&& [key, value] : std::get<5>(std::move(value))) { + SerializeKeyValue(std::move(key), std::move(value), kv->add_values()); + } + break; + } + + case 6: + pb->set_bytes_value(std::get<6>(std::move(value))); + break; + } + } + + void SerializeKeyValue(TString key, TAttributeValue value, NCommonProto::KeyValue *pb) { + pb->set_key(std::move(key)); + SerializeValue(std::move(value), pb->mutable_value()); + } + + void TSpan::Send() { + if (TlsActivationContext) { + NProtoBuf::TextFormat::Printer p; + p.SetSingleLineMode(true); + TString s; + p.PrintToString(Data->Span, &s); + LOG_DEBUG_S(*TlsActivationContext, 430 /* WILSON */, s); + } + } + +} // NWilson diff --git a/library/cpp/actors/wilson/wilson_span.h b/library/cpp/actors/wilson/wilson_span.h new file mode 100644 index 00000000000..c2de2f0b68c --- /dev/null +++ b/library/cpp/actors/wilson/wilson_span.h @@ -0,0 +1,160 @@ +#pragma once + +#include <library/cpp/actors/wilson/protos/trace.pb.h> +#include <util/generic/hash.h> +#include <util/datetime/cputimer.h> + +#include "wilson_trace.h" + +namespace NWilson { + + enum class ERelation { + FollowsFrom, + ChildOf, + }; + + namespace NTraceProto = opentelemetry::proto::trace::v1; + namespace NCommonProto = opentelemetry::proto::common::v1; + + struct TArrayValue; + struct TKeyValueList; + struct TBytes; + + using TAttributeValue = std::variant< + TString, + bool, + i64, + double, + TArrayValue, + TKeyValueList, + TBytes + >; + + struct TArrayValue : std::vector<TAttributeValue> {}; + struct TKeyValueList : THashMap<TString, TAttributeValue> {}; + struct TBytes : TString {}; + + void SerializeKeyValue(TString key, TAttributeValue value, NCommonProto::KeyValue *pb); + + class TSpan { + struct TData { + const TInstant StartTime; + const ui64 StartCycles; + const TTraceId TraceId; + NTraceProto::Span Span; + + TData(TInstant startTime, ui64 startCycles, TTraceId traceId) + : StartTime(startTime) + , StartCycles(startCycles) + , TraceId(std::move(traceId)) + {} + }; + + std::unique_ptr<TData> Data; + + public: + TSpan() = default; + TSpan(const TSpan&) = delete; + TSpan(TSpan&&) = default; + + TSpan(ui8 verbosity, ERelation /*relation*/, TTraceId parentId, TInstant now, std::optional<TString> name) + : Data(parentId ? std::make_unique<TData>(now, GetCycleCount(), parentId.Span(verbosity)) : nullptr) + { + if (*this) { + if (!parentId.IsRoot()) { + Data->Span.set_parent_span_id(parentId.GetSpanIdPtr(), parentId.GetSpanIdSize()); + } + Data->Span.set_start_time_unix_nano(now.NanoSeconds()); + + if (name) { + Name(std::move(*name)); + } + } + } + + TSpan& operator =(const TSpan&) = delete; + TSpan& operator =(TSpan&&) = default; + + operator bool() const { + return static_cast<bool>(Data); + } + + TSpan& Name(TString name) { + if (*this) { + Data->Span.set_name(std::move(name)); + } + return *this; + } + + TSpan& Attribute(TString name, TAttributeValue value) { + if (*this) { + SerializeKeyValue(std::move(name), std::move(value), Data->Span.add_attributes()); + } + return *this; + } + + TSpan& Event(TString name, TKeyValueList attributes) { + if (*this) { + auto *event = Data->Span.add_events(); + event->set_time_unix_nano(TimeUnixNano()); + event->set_name(std::move(name)); + for (auto&& [key, value] : attributes) { + SerializeKeyValue(std::move(key), std::move(value), event->add_attributes()); + } + } + return *this; + } + + TSpan& Link(const TTraceId& traceId, TKeyValueList attributes) { + if (*this) { + auto *link = Data->Span.add_links(); + link->set_trace_id(traceId.GetTraceIdPtr(), traceId.GetTraceIdSize()); + link->set_span_id(traceId.GetSpanIdPtr(), traceId.GetSpanIdSize()); + for (auto&& [key, value] : attributes) { + SerializeKeyValue(std::move(key), std::move(value), link->add_attributes()); + } + } + return *this; + } + + void EndOk() { + if (*this) { + auto *status = Data->Span.mutable_status(); + status->set_code(NTraceProto::Status::STATUS_CODE_OK); + } + End(); + } + + void EndError(TString error) { + if (*this) { + auto *status = Data->Span.mutable_status(); + status->set_code(NTraceProto::Status::STATUS_CODE_ERROR); + status->set_message(std::move(error)); + } + End(); + } + + void End() { + if (*this) { + Data->Span.set_end_time_unix_nano(TimeUnixNano()); + Data->Span.set_trace_id(Data->TraceId.GetTraceIdPtr(), Data->TraceId.GetTraceIdSize()); + Data->Span.set_span_id(Data->TraceId.GetSpanIdPtr(), Data->TraceId.GetSpanIdSize()); + Send(); + Data.reset(); // tracing finished + } + } + + operator TTraceId() const { + return Data ? TTraceId(Data->TraceId) : TTraceId(); + } + + private: + void Send(); + + ui64 TimeUnixNano() const { + const TInstant now = Data->StartTime + CyclesToDuration(GetCycleCount() - Data->StartCycles); + return now.NanoSeconds(); + } + }; + +} // NWilson diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h index cfbf93059b0..9937c1f8075 100644 --- a/library/cpp/actors/wilson/wilson_trace.h +++ b/library/cpp/actors/wilson/wilson_trace.h @@ -4,158 +4,160 @@ #include <util/stream/output.h> #include <util/random/random.h> +#include <util/random/fast.h> #include <util/string/printf.h> +#include <array> + namespace NWilson { class TTraceId { - ui64 TraceId; // Random id of topmost client request - ui64 SpanId; // Span id of part of request currently being executed + using TTrace = std::array<ui64, 2>; + + TTrace TraceId; // Random id of topmost client request + union { + struct { + ui64 SpanId : 48; // Span id of part of request currently being executed + ui64 Verbosity : 4; + ui64 TimeToLive : 12; + }; + ui64 Raw; + }; private: - TTraceId(ui64 traceId, ui64 spanId) + TTraceId(TTrace traceId, ui64 spanId, ui8 verbosity, ui32 timeToLive) : TraceId(traceId) - , SpanId(spanId) { + SpanId = spanId; + Verbosity = verbosity; + TimeToLive = timeToLive; } - static ui64 GenerateTraceId() { - ui64 traceId = 0; - while (!traceId) { - traceId = RandomNumber<ui64>(); + static TTrace GenerateTraceId() { + for (;;) { + TTrace res; + ui32 *p = reinterpret_cast<ui32*>(res.data()); + + TReallyFastRng32 rng(RandomNumber<ui64>()); + p[0] = rng(); + p[1] = rng(); + p[2] = rng(); + p[3] = rng(); + + if (res[0] || res[1]) { + return res; + } } - return traceId; } static ui64 GenerateSpanId() { - return RandomNumber<ui64>(); + for (;;) { + if (const ui64 res = RandomNumber<ui64>(); res) { // SpanId can't be zero + return res; + } + } } public: - using TSerializedTraceId = char[2 * sizeof(ui64)]; + using TSerializedTraceId = char[sizeof(TTrace) + sizeof(ui64)]; public: - TTraceId() - : TraceId(0) - , SpanId(0) - { - } + TTraceId(ui64) // NBS stub + : TTraceId() + {} - explicit TTraceId(ui64 traceId) - : TraceId(traceId) - , SpanId(0) - { + TTraceId() { + TraceId.fill(0); + Raw = 0; } - TTraceId(const TSerializedTraceId& in) - : TraceId(reinterpret_cast<const ui64*>(in)[0]) - , SpanId(reinterpret_cast<const ui64*>(in)[1]) + explicit TTraceId(TTrace traceId) + : TraceId(traceId) { + Raw = 0; } // allow move semantic TTraceId(TTraceId&& other) : TraceId(other.TraceId) - , SpanId(other.SpanId) + , Raw(other.Raw) { - other.TraceId = 0; - other.SpanId = 1; // explicitly mark invalid + other.TraceId.fill(0); + } + + // explicit copy + explicit TTraceId(const TTraceId& other) + : TraceId(other.TraceId) + , Raw(other.Raw) + {} + + TTraceId(const TSerializedTraceId& in) { + auto p = reinterpret_cast<const ui64*>(in); + TraceId = {p[0], p[1]}; + Raw = p[2]; + } + + void Serialize(TSerializedTraceId* out) const { + auto p = reinterpret_cast<ui64*>(*out); + p[0] = TraceId[0]; + p[1] = TraceId[1]; + p[2] = Raw; } TTraceId& operator=(TTraceId&& other) { TraceId = other.TraceId; - SpanId = other.SpanId; - other.TraceId = 0; - other.SpanId = 1; // explicitly mark invalid + other.TraceId.fill(0); + Raw = other.Raw; return *this; } // do not allow implicit copy of trace id - TTraceId(const TTraceId& other) = delete; TTraceId& operator=(const TTraceId& other) = delete; - static TTraceId NewTraceId() { - return TTraceId(GenerateTraceId(), 0); - } - - // create separate branch from this point - TTraceId SeparateBranch() const { - return Clone(); + static TTraceId NewTraceId(ui8 verbosity, ui32 timeToLive) { + return TTraceId(GenerateTraceId(), 0, verbosity, timeToLive); } - TTraceId Clone() const { - return TTraceId(TraceId, SpanId); + static TTraceId NewTraceId() { // NBS stub + return TTraceId(); } - TTraceId Span() const { - return *this ? TTraceId(TraceId, GenerateSpanId()) : TTraceId(); + TTraceId Span(ui8 verbosity) const { + return *this && TimeToLive && verbosity <= Verbosity + ? TTraceId(TraceId, GenerateSpanId(), Verbosity, TimeToLive - 1) + : TTraceId(); } - ui64 GetTraceId() const { - return TraceId; + TTraceId Span() const { // compatibility stub + return {}; } // Check if request tracing is enabled explicit operator bool() const { - return TraceId != 0; - } - - // Output trace id into a string stream - void Output(IOutputStream& s, const TTraceId& parentTraceId) const { - union { - ui8 buffer[3 * sizeof(ui64)]; - struct { - ui64 traceId; - ui64 spanId; - ui64 parentSpanId; - } x; - }; - - x.traceId = TraceId; - x.spanId = SpanId; - x.parentSpanId = parentTraceId.SpanId; - - const size_t base64size = Base64EncodeBufSize(sizeof(x)); - char base64[base64size]; - char* end = Base64Encode(base64, buffer, sizeof(x)); - s << TStringBuf(base64, end); + return TraceId[0] || TraceId[1]; } - // output just span id into stream - void OutputSpanId(IOutputStream& s) const { - const size_t base64size = Base64EncodeBufSize(sizeof(SpanId)); - char base64[base64size]; - char* end = Base64Encode(base64, reinterpret_cast<const ui8*>(&SpanId), sizeof(SpanId)); - - // cut trailing padding character - Y_VERIFY(end > base64 && end[-1] == '='); - --end; - - s << TStringBuf(base64, end); - } - - void CheckConsistency() { - // if TraceId is zero, then SpanId must be zero too - Y_VERIFY_DEBUG(*this || !SpanId); + bool IsRoot() const { + return !SpanId; } friend bool operator==(const TTraceId& x, const TTraceId& y) { - return x.TraceId == y.TraceId && x.SpanId == y.SpanId; + return x.TraceId == y.TraceId && x.Raw == y.Raw; } - TString ToString() const { - return Sprintf("%" PRIu64 ":%" PRIu64, TraceId, SpanId); + ui8 GetVerbosity() const { + return Verbosity; } - bool IsFromSameTree(const TTraceId& other) const { - return TraceId == other.TraceId; - } + const void *GetTraceIdPtr() const { return TraceId.data(); } + static constexpr size_t GetTraceIdSize() { return sizeof(TTrace); } + const void *GetSpanIdPtr() const { return &Raw; } + static constexpr size_t GetSpanIdSize() { return sizeof(ui64); } - void Serialize(TSerializedTraceId* out) const { - ui64* p = reinterpret_cast<ui64*>(*out); - p[0] = TraceId; - p[1] = SpanId; - } + // for compatibility with NBS + TTraceId Clone() const { return NWilson::TTraceId(*this); } + ui64 GetTraceId() const { return 0; } + void OutputSpanId(IOutputStream&) const {} }; } |