aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-06-16 14:45:38 +0300
committerAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-06-16 14:45:38 +0300
commit20d96d3531fa27af7cf21e8de55d71255b054cfd (patch)
treeab97943b524e5f222b839b4c767321538244eb36 /library/cpp
parentc0fe73f947f62476b336002f7fa85301f8a80dee (diff)
downloadydb-20d96d3531fa27af7cf21e8de55d71255b054cfd.tar.gz
Refactor Wilson KIKIMR-15105
ref:55ce6a1b08bba785ea62b3bdfea902ef7263cf57
Diffstat (limited to 'library/cpp')
-rw-r--r--library/cpp/actors/core/events_undelivered.cpp6
-rw-r--r--library/cpp/actors/interconnect/CMakeLists.darwin.txt1
-rw-r--r--library/cpp/actors/interconnect/CMakeLists.linux.txt1
-rw-r--r--library/cpp/actors/interconnect/events_local.h7
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp46
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h16
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp40
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp7
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h2
-rw-r--r--library/cpp/actors/interconnect/packet.cpp1
-rw-r--r--library/cpp/actors/interconnect/packet.h46
-rw-r--r--library/cpp/actors/interconnect/types.h26
-rw-r--r--library/cpp/actors/protos/interconnect.proto4
-rw-r--r--library/cpp/actors/wilson/CMakeLists.txt10
-rw-r--r--library/cpp/actors/wilson/protos/CMakeLists.txt33
-rw-r--r--library/cpp/actors/wilson/protos/common.proto84
-rw-r--r--library/cpp/actors/wilson/protos/resource.proto31
-rw-r--r--library/cpp/actors/wilson/protos/trace.proto326
-rw-r--r--library/cpp/actors/wilson/wilson_event.h176
-rw-r--r--library/cpp/actors/wilson/wilson_span.cpp64
-rw-r--r--library/cpp/actors/wilson/wilson_span.h160
-rw-r--r--library/cpp/actors/wilson/wilson_trace.h186
23 files changed, 955 insertions, 322 deletions
diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp
index 23deaffd106..dfd79bf96e9 100644
--- a/library/cpp/actors/core/events_undelivered.cpp
+++ b/library/cpp/actors/core/events_undelivered.cpp
@@ -44,15 +44,15 @@ namespace NActors {
const TActorId recp = OnNondeliveryHolder ? OnNondeliveryHolder->Recipient : TActorId();
if (Event)
- return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, TraceId.Clone());
+ return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, std::move(TraceId));
else
- return new IEventHandle(Type, updatedFlags, recp, Sender, Buffer, Cookie, &Recipient, TraceId.Clone());
+ return new IEventHandle(Type, updatedFlags, recp, Sender, Buffer, Cookie, &Recipient, std::move(TraceId));
}
if (Flags & FlagTrackDelivery) {
const ui32 updatedFlags = Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered);
return new IEventHandle(Sender, Recipient, new TEvents::TEvUndelivered(Type, reason, unsure), updatedFlags,
- Cookie, nullptr, TraceId.Clone());
+ Cookie, nullptr, std::move(TraceId));
}
return nullptr;
diff --git a/library/cpp/actors/interconnect/CMakeLists.darwin.txt b/library/cpp/actors/interconnect/CMakeLists.darwin.txt
index 16d15469206..9bd0c83fcea 100644
--- a/library/cpp/actors/interconnect/CMakeLists.darwin.txt
+++ b/library/cpp/actors/interconnect/CMakeLists.darwin.txt
@@ -21,6 +21,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC
cpp-actors-prof
cpp-actors-protos
cpp-actors-util
+ cpp-actors-wilson
cpp-digest-crc32c
library-cpp-json
library-cpp-lwtrace
diff --git a/library/cpp/actors/interconnect/CMakeLists.linux.txt b/library/cpp/actors/interconnect/CMakeLists.linux.txt
index 464477ce1dc..c0e1b39c45d 100644
--- a/library/cpp/actors/interconnect/CMakeLists.linux.txt
+++ b/library/cpp/actors/interconnect/CMakeLists.linux.txt
@@ -21,6 +21,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC
cpp-actors-prof
cpp-actors-protos
cpp-actors-util
+ cpp-actors-wilson
cpp-digest-crc32c
library-cpp-json
library-cpp-lwtrace
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 8a46ffd535f..474b3dba8d5 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -7,16 +7,9 @@
#include <util/network/address.h>
#include "interconnect_stream.h"
-#include "packet.h"
#include "types.h"
namespace NActors {
- struct TProgramInfo {
- ui64 PID = 0;
- ui64 StartTime = 0;
- ui64 Serial = 0;
- };
-
enum class ENetwork : ui32 {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// local messages
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index a66ba2a154d..32f015af54c 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -11,20 +11,18 @@
LWTRACE_USING(ACTORLIB_PROVIDER);
namespace NActors {
- DECLARE_WILSON_EVENT(EventSentToSocket);
- DECLARE_WILSON_EVENT(EventReceivedFromSocket);
-
bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
- const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr);
+ const size_t descrSize = Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1);
+ const size_t amount = sizeof(TChannelPart) + descrSize;
if (task.GetVirtualFreeAmount() < amount) {
return false;
}
- NWilson::TTraceId traceId(event.Descr.TraceId);
-// if (ctx) {
-// WILSON_TRACE(*ctx, &traceId, EventSentToSocket);
-// }
- traceId.Serialize(&event.Descr.TraceId);
+ auto& span = *event.Span;
+ span.EndOk();
+ const NWilson::TTraceId traceId(span);
+ event.Span.reset();
+
LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize());
task.Orbit.Take(event.Orbit);
@@ -33,8 +31,34 @@ namespace NActors {
TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea());
part->Channel = ChannelId | TChannelPart::LastPartFlag;
- part->Size = sizeof(TEventDescr);
- memcpy(part + 1, &event.Descr, sizeof(TEventDescr));
+ part->Size = descrSize;
+
+ void *descr = part + 1;
+ if (Params.UseExtendedTraceFmt) {
+ auto *p = static_cast<TEventDescr2*>(descr);
+ *p = {
+ event.Descr.Type,
+ event.Descr.Flags,
+ event.Descr.Recipient,
+ event.Descr.Sender,
+ event.Descr.Cookie,
+ {},
+ event.Descr.Checksum
+ };
+ traceId.Serialize(&p->TraceId);
+ } else {
+ auto *p = static_cast<TEventDescr1*>(descr);
+ *p = {
+ event.Descr.Type,
+ event.Descr.Flags,
+ event.Descr.Recipient,
+ event.Descr.Sender,
+ event.Descr.Cookie,
+ {},
+ event.Descr.Checksum
+ };
+ }
+
task.AppendBuf(part, amount);
*weightConsumed += amount;
OutputQueueSize -= part->Size;
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index cf68cd27fd3..e48d294420b 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -8,7 +8,7 @@
#include <util/generic/vector.h>
#include <util/generic/map.h>
#include <util/stream/walk.h>
-#include <library/cpp/actors/wilson/wilson_event.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include "interconnect_common.h"
#include "interconnect_counters.h"
@@ -55,10 +55,18 @@ namespace NActors {
~TEventOutputChannel() {
}
- std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
+ std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TInstant now) {
TEventHolder& event = Pool.Allocate(Queue);
- const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr);
+ const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1));
OutputQueueSize += bytes;
+ event.Span.emplace(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf,
+ NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue");
+ if (*event.Span) {
+ auto& span = *event.Span;
+ span
+ .Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
+ .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize));
+ }
return std::make_pair(bytes, &event);
}
@@ -102,8 +110,6 @@ namespace NActors {
};
EState State = EState::INITIAL;
- static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr);
-
protected:
ui64 OutputQueueSize = 0;
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 9ede998d8e7..78e114a574f 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -496,6 +496,7 @@ namespace NActors {
request.SetRequestModernFrame(true);
request.SetRequestAuthOnly(Common->Settings.TlsAuthOnly);
+ request.SetRequestExtendedTraceFmt(true);
SendExBlock(request, "ExRequest");
@@ -526,6 +527,7 @@ namespace NActors {
Params.Encryption = success.GetStartEncryption();
Params.UseModernFrame = success.GetUseModernFrame();
Params.AuthOnly = Params.Encryption && success.GetAuthOnly();
+ Params.UseExtendedTraceFmt = success.GetUseExtendedTraceFmt();
if (success.HasServerScopeId()) {
ParsePeerScopeId(success.GetServerScopeId());
}
@@ -681,6 +683,7 @@ namespace NActors {
Params.UseModernFrame = request.GetRequestModernFrame();
Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly;
+ Params.UseExtendedTraceFmt = request.GetRequestExtendedTraceFmt();
if (request.HasClientScopeId()) {
ParsePeerScopeId(request.GetClientScopeId());
@@ -706,6 +709,7 @@ namespace NActors {
}
success.SetUseModernFrame(Params.UseModernFrame);
success.SetAuthOnly(Params.AuthOnly);
+ success.SetUseExtendedTraceFmt(Params.UseExtendedTraceFmt);
SendExBlock(record, "ExReply");
// extract sender actor id (self virtual id)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 65bb956e584..cbb2d16e466 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -270,13 +270,43 @@ namespace NActors {
Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size);
- TEventDescr descr;
+ char buffer[Max(sizeof(TEventDescr1), sizeof(TEventDescr2))];
+ auto& v1 = reinterpret_cast<TEventDescr1&>(buffer);
+ auto& v2 = reinterpret_cast<TEventDescr2&>(buffer);
if (~part.Channel & TChannelPart::LastPartFlag) {
Payload.ExtractFront(part.Size, eventData);
- } else if (part.Size != sizeof(descr)) {
+ } else if (part.Size != sizeof(v1) && part.Size != sizeof(v2)) {
LOG_CRIT_IC_SESSION("ICIS11", "incorrect last part of an event");
return DestroySession(TDisconnectReason::FormatError());
- } else if (Payload.ExtractFrontPlain(&descr, sizeof(descr))) {
+ } else if (Payload.ExtractFrontPlain(buffer, part.Size)) {
+ TEventData descr;
+
+ switch (part.Size) {
+ case sizeof(TEventDescr1):
+ descr = {
+ v1.Type,
+ v1.Flags,
+ v1.Recipient,
+ v1.Sender,
+ v1.Cookie,
+ NWilson::TTraceId(), // do not accept traces with old format
+ v1.Checksum
+ };
+ break;
+
+ case sizeof(TEventDescr2):
+ descr = {
+ v2.Type,
+ v2.Flags,
+ v2.Recipient,
+ v2.Sender,
+ v2.Cookie,
+ NWilson::TTraceId(v2.TraceId),
+ v2.Checksum
+ };
+ break;
+ }
+
Metrics->IncInputChannelsIncomingEvents(channel);
ProcessEvent(*eventData, descr);
*eventData = TRope();
@@ -286,7 +316,7 @@ namespace NActors {
}
}
- void TInputSessionTCP::ProcessEvent(TRope& data, TEventDescr& descr) {
+ void TInputSessionTCP::ProcessEvent(TRope& data, TEventData& descr) {
if (!Params.UseModernFrame || descr.Checksum) {
ui32 checksum = 0;
for (const auto&& [data, size] : data) {
@@ -305,7 +335,7 @@ namespace NActors {
MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)),
descr.Cookie,
Params.PeerScopeId,
- NWilson::TTraceId(descr.TraceId));
+ std::move(descr.TraceId));
if (Common->EventFilter && !Common->EventFilter->CheckIncomingEvent(*ev, Common->LocalScopeId)) {
LOG_CRIT_IC_SESSION("ICIC03", "Event dropped due to scope error LocalScopeId# %s PeerScopeId# %s Type# 0x%08" PRIx32,
ScopeIdToString(Common->LocalScopeId).data(), ScopeIdToString(Params.PeerScopeId).data(), descr.Type);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 2ded7f9f537..1602f4b8b2f 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -12,8 +12,6 @@
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
- DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes));
-
template<typename T>
T Coalesce(T&& x) {
return x;
@@ -128,7 +126,7 @@ namespace NActors {
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
- const auto [dataSize, event] = oChannel.Push(*ev);
+ const auto [dataSize, event] = oChannel.Push(*ev, TActivationContext::Now());
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
TotalOutputQueueSize += dataSize;
@@ -142,9 +140,6 @@ namespace NActors {
++NumEventsInReadyChannels;
LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
- WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush,
- QueueSizeInEvents = oChannel.GetQueueSize(),
- QueueSizeInBytes = oChannel.GetBufferedAmountOfData());
// check for overloaded queues
ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 5d4a381e1f5..9933bd489ed 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -249,7 +249,7 @@ namespace NActors {
void ReceiveData();
void ProcessHeader(size_t headerLen);
void ProcessPayload(ui64& numDataBytes);
- void ProcessEvent(TRope& data, TEventDescr& descr);
+ void ProcessEvent(TRope& data, TEventData& descr);
bool ReadMore();
void ReestablishConnection(TDisconnectReason reason);
diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp
index e2c289ed592..9ba173e3302 100644
--- a/library/cpp/actors/interconnect/packet.cpp
+++ b/library/cpp/actors/interconnect/packet.cpp
@@ -13,7 +13,6 @@ ui32 TEventHolder::Fill(IEventHandle& ev) {
Descr.Recipient = ev.Recipient;
Descr.Sender = ev.Sender;
Descr.Cookie = ev.Cookie;
- ev.TraceId.Serialize(&Descr.TraceId);
ForwardRecipient = ev.GetForwardOnNondeliveryRecipient();
EventActuallySerialized = 0;
Descr.Checksum = 0;
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 4ba50a2b5f4..3a6aadfb9f8 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -7,21 +7,18 @@
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/prof/tag.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/digest/crc32c/crc32c.h>
#include <library/cpp/lwtrace/shuttle.h>
#include <util/generic/string.h>
#include <util/generic/list.h>
+#include "types.h"
+
#ifndef FORCE_EVENT_CHECKSUM
#define FORCE_EVENT_CHECKSUM 0
#endif
-using NActors::IEventBase;
-using NActors::IEventHandle;
-using NActors::TActorId;
-using NActors::TConstIoVec;
-using NActors::TEventSerializedData;
-
Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, size_t len) {
if constexpr (NSan::MSanIsOn()) {
const char *begin = static_cast<const char*>(data);
@@ -33,14 +30,6 @@ Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data,
return Crc32cExtend(checksum, data, len);
}
-struct TSessionParams {
- bool Encryption = {};
- bool UseModernFrame = {};
- bool AuthOnly = {};
- TString AuthCN;
- NActors::TScopeId PeerScopeId;
-};
-
struct TTcpPacketHeader_v1 {
ui32 HeaderCRC32;
ui32 PayloadCRC32;
@@ -87,21 +76,40 @@ union TTcpPacketBuf {
} v2;
};
+struct TEventData {
+ ui32 Type;
+ ui32 Flags;
+ TActorId Recipient;
+ TActorId Sender;
+ ui64 Cookie;
+ NWilson::TTraceId TraceId;
+ ui32 Checksum;
+};
+
#pragma pack(push, 1)
-struct TEventDescr {
+struct TEventDescr1 {
+ ui32 Type;
+ ui32 Flags;
+ TActorId Recipient;
+ TActorId Sender;
+ ui64 Cookie;
+ char TraceId[16]; // obsolete trace id format
+ ui32 Checksum;
+};
+
+struct TEventDescr2 {
ui32 Type;
ui32 Flags;
TActorId Recipient;
TActorId Sender;
ui64 Cookie;
- // wilson trace id is stored as a serialized entity to avoid using complex object with prohibited copy ctor
NWilson::TTraceId::TSerializedTraceId TraceId;
ui32 Checksum;
};
#pragma pack(pop)
struct TEventHolder : TNonCopyable {
- TEventDescr Descr;
+ TEventData Descr;
TActorId ForwardRecipient;
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
@@ -109,6 +117,7 @@ struct TEventHolder : TNonCopyable {
ui32 EventSerializedSize;
ui32 EventActuallySerialized;
mutable NLWTrace::TOrbit Orbit;
+ std::optional<NWilson::TSpan> Span;
ui32 Fill(IEventHandle& ev);
@@ -123,7 +132,7 @@ struct TEventHolder : TNonCopyable {
}
void ForwardOnNondelivery(bool unsure) {
- TEventDescr& d = Descr;
+ TEventData& d = Descr;
const TActorId& r = d.Recipient;
const TActorId& s = d.Sender;
const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr;
@@ -137,6 +146,7 @@ struct TEventHolder : TNonCopyable {
Event.Reset();
Buffer.Reset();
Orbit.Reset();
+ Span.reset();
}
};
diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h
index 2662c50c220..e0965d78079 100644
--- a/library/cpp/actors/interconnect/types.h
+++ b/library/cpp/actors/interconnect/types.h
@@ -1,5 +1,9 @@
#pragma once
+#include <library/cpp/actors/core/defs.h>
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/event.h>
+
#include <util/generic/string.h>
namespace NActors {
@@ -40,4 +44,26 @@ namespace NActors {
static TVector<const char*> Reasons;
};
+ struct TProgramInfo {
+ ui64 PID = 0;
+ ui64 StartTime = 0;
+ ui64 Serial = 0;
+ };
+
+ struct TSessionParams {
+ bool Encryption = {};
+ bool UseModernFrame = {};
+ bool AuthOnly = {};
+ bool UseExtendedTraceFmt = {};
+ TString AuthCN;
+ NActors::TScopeId PeerScopeId;
+ };
+
} // NActors
+
+using NActors::IEventBase;
+using NActors::IEventHandle;
+using NActors::TActorId;
+using NActors::TConstIoVec;
+using NActors::TEventSerializedData;
+using NActors::TSessionParams;
diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto
index 2e3b0d0d15d..69721b1e065 100644
--- a/library/cpp/actors/protos/interconnect.proto
+++ b/library/cpp/actors/protos/interconnect.proto
@@ -70,8 +70,8 @@ message THandshakeRequest {
optional bool DoCheckCookie = 17;
optional bool RequestModernFrame = 18;
-
optional bool RequestAuthOnly = 19;
+ optional bool RequestExtendedTraceFmt = 20;
}
message THandshakeSuccess {
@@ -92,8 +92,8 @@ message THandshakeSuccess {
optional TScopeId ServerScopeId = 10;
optional bool UseModernFrame = 11;
-
optional bool AuthOnly = 12;
+ optional bool UseExtendedTraceFmt = 13;
}
message THandshakeReply {
diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt
index 03d8c542ff7..65b2e32d87e 100644
--- a/library/cpp/actors/wilson/CMakeLists.txt
+++ b/library/cpp/actors/wilson/CMakeLists.txt
@@ -7,9 +7,13 @@
-add_library(cpp-actors-wilson INTERFACE)
-target_link_libraries(cpp-actors-wilson INTERFACE
+add_library(cpp-actors-wilson)
+target_link_libraries(cpp-actors-wilson PUBLIC
contrib-libs-cxxsupp
yutil
- cpp-string_utils-base64
+ cpp-actors-core
+ actors-wilson-protos
+)
+target_sources(cpp-actors-wilson PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_span.cpp
)
diff --git a/library/cpp/actors/wilson/protos/CMakeLists.txt b/library/cpp/actors/wilson/protos/CMakeLists.txt
new file mode 100644
index 00000000000..a7cc2e94ffd
--- /dev/null
+++ b/library/cpp/actors/wilson/protos/CMakeLists.txt
@@ -0,0 +1,33 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(actors-wilson-protos)
+target_link_libraries(actors-wilson-protos PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+)
+target_proto_messages(actors-wilson-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/common.proto
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/resource.proto
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/trace.proto
+)
+target_proto_addincls(actors-wilson-protos
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(actors-wilson-protos
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/library/cpp/actors/wilson/protos/common.proto b/library/cpp/actors/wilson/protos/common.proto
new file mode 100644
index 00000000000..8562ee6d1e3
--- /dev/null
+++ b/library/cpp/actors/wilson/protos/common.proto
@@ -0,0 +1,84 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.common.v1;
+
+// AnyValue is used to represent any type of attribute value. AnyValue may contain a
+// primitive value such as a string or integer or it may contain an arbitrary nested
+// object containing arrays, key-value lists and primitives.
+message AnyValue {
+ // The value is one of the listed fields. It is valid for all values to be unspecified
+ // in which case this AnyValue is considered to be "empty".
+ oneof value {
+ string string_value = 1;
+ bool bool_value = 2;
+ int64 int_value = 3;
+ double double_value = 4;
+ ArrayValue array_value = 5;
+ KeyValueList kvlist_value = 6;
+ bytes bytes_value = 7;
+ }
+}
+
+// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message
+// since oneof in AnyValue does not allow repeated fields.
+message ArrayValue {
+ // Array of values. The array may be empty (contain 0 elements).
+ repeated AnyValue values = 1;
+}
+
+// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message
+// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need
+// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to
+// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches
+// are semantically equivalent.
+message KeyValueList {
+ // A collection of key/value pairs of key-value pairs. The list may be empty (may
+ // contain 0 elements).
+ // The keys MUST be unique (it is not allowed to have more than one
+ // value with the same key).
+ repeated KeyValue values = 1;
+}
+
+// KeyValue is a key-value pair that is used to store Span attributes, Link
+// attributes, etc.
+message KeyValue {
+ string key = 1;
+ AnyValue value = 2;
+}
+
+// InstrumentationLibrary is a message representing the instrumentation library information
+// such as the fully qualified name and version.
+// InstrumentationLibrary is wire-compatible with InstrumentationScope for binary
+// Protobuf format.
+// This message is deprecated and will be removed on June 15, 2022.
+message InstrumentationLibrary {
+ option deprecated = true;
+
+ // An empty instrumentation library name means the name is unknown.
+ string name = 1;
+ string version = 2;
+}
+
+// InstrumentationScope is a message representing the instrumentation scope information
+// such as the fully qualified name and version.
+message InstrumentationScope {
+ // An empty instrumentation scope name means the name is unknown.
+ string name = 1;
+ string version = 2;
+ repeated KeyValue attributes = 3;
+ uint32 dropped_attributes_count = 4;
+}
diff --git a/library/cpp/actors/wilson/protos/resource.proto b/library/cpp/actors/wilson/protos/resource.proto
new file mode 100644
index 00000000000..752bf287ea8
--- /dev/null
+++ b/library/cpp/actors/wilson/protos/resource.proto
@@ -0,0 +1,31 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.resource.v1;
+
+import "library/cpp/actors/wilson/protos/common.proto";
+
+// Resource information.
+message Resource {
+ // Set of attributes that describe the resource.
+ // Attribute keys MUST be unique (it is not allowed to have more than one
+ // attribute with the same key).
+ repeated opentelemetry.proto.common.v1.KeyValue attributes = 1;
+
+ // dropped_attributes_count is the number of dropped attributes. If the value is 0, then
+ // no attributes were dropped.
+ uint32 dropped_attributes_count = 2;
+}
diff --git a/library/cpp/actors/wilson/protos/trace.proto b/library/cpp/actors/wilson/protos/trace.proto
new file mode 100644
index 00000000000..0b645cf8adb
--- /dev/null
+++ b/library/cpp/actors/wilson/protos/trace.proto
@@ -0,0 +1,326 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.trace.v1;
+
+import "library/cpp/actors/wilson/protos/common.proto";
+import "library/cpp/actors/wilson/protos/resource.proto";
+
+// TracesData represents the traces data that can be stored in a persistent storage,
+// OR can be embedded by other protocols that transfer OTLP traces data but do
+// not implement the OTLP protocol.
+//
+// The main difference between this message and collector protocol is that
+// in this message there will not be any "control" or "metadata" specific to
+// OTLP protocol.
+//
+// When new fields are added into this message, the OTLP request MUST be updated
+// as well.
+message TracesData {
+ // An array of ResourceSpans.
+ // For data coming from a single resource this array will typically contain
+ // one element. Intermediary nodes that receive data from multiple origins
+ // typically batch the data before forwarding further and in that case this
+ // array will contain multiple elements.
+ repeated ResourceSpans resource_spans = 1;
+}
+
+// A collection of ScopeSpans from a Resource.
+message ResourceSpans {
+ // The resource for the spans in this message.
+ // If this field is not set then no resource info is known.
+ opentelemetry.proto.resource.v1.Resource resource = 1;
+
+ // A list of ScopeSpans that originate from a resource.
+ repeated ScopeSpans scope_spans = 2;
+
+ // A list of InstrumentationLibrarySpans that originate from a resource.
+ // This field is deprecated and will be removed after grace period expires on June 15, 2022.
+ //
+ // During the grace period the following rules SHOULD be followed:
+ //
+ // For Binary Protobufs
+ // ====================
+ // Binary Protobuf senders SHOULD NOT set instrumentation_library_spans. Instead
+ // scope_spans SHOULD be set.
+ //
+ // Binary Protobuf receivers SHOULD check if instrumentation_library_spans is set
+ // and scope_spans is not set then the value in instrumentation_library_spans
+ // SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans.
+ // If scope_spans is set then instrumentation_library_spans SHOULD be ignored.
+ //
+ // For JSON
+ // ========
+ // JSON senders that set instrumentation_library_spans field MAY also set
+ // scope_spans to carry the same spans, essentially double-publishing the same data.
+ // Such double-publishing MAY be controlled by a user-settable option.
+ // If double-publishing is not used then the senders SHOULD set scope_spans and
+ // SHOULD NOT set instrumentation_library_spans.
+ //
+ // JSON receivers SHOULD check if instrumentation_library_spans is set and
+ // scope_spans is not set then the value in instrumentation_library_spans
+ // SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans.
+ // If scope_spans is set then instrumentation_library_spans field SHOULD be ignored.
+ repeated InstrumentationLibrarySpans instrumentation_library_spans = 1000 [deprecated = true];
+
+ // This schema_url applies to the data in the "resource" field. It does not apply
+ // to the data in the "scope_spans" field which have their own schema_url field.
+ string schema_url = 3;
+}
+
+// A collection of Spans produced by an InstrumentationScope.
+message ScopeSpans {
+ // The instrumentation scope information for the spans in this message.
+ // Semantically when InstrumentationScope isn't set, it is equivalent with
+ // an empty instrumentation scope name (unknown).
+ opentelemetry.proto.common.v1.InstrumentationScope scope = 1;
+
+ // A list of Spans that originate from an instrumentation scope.
+ repeated Span spans = 2;
+
+ // This schema_url applies to all spans and span events in the "spans" field.
+ string schema_url = 3;
+}
+
+// A collection of Spans produced by an InstrumentationLibrary.
+// InstrumentationLibrarySpans is wire-compatible with ScopeSpans for binary
+// Protobuf format.
+// This message is deprecated and will be removed on June 15, 2022.
+message InstrumentationLibrarySpans {
+ option deprecated = true;
+
+ // The instrumentation library information for the spans in this message.
+ // Semantically when InstrumentationLibrary isn't set, it is equivalent with
+ // an empty instrumentation library name (unknown).
+ opentelemetry.proto.common.v1.InstrumentationLibrary instrumentation_library = 1;
+
+ // A list of Spans that originate from an instrumentation library.
+ repeated Span spans = 2;
+
+ // This schema_url applies to all spans and span events in the "spans" field.
+ string schema_url = 3;
+}
+
+// Span represents a single operation within a trace. Spans can be
+// nested to form a trace tree. Spans may also be linked to other spans
+// from the same or different trace and form graphs. Often, a trace
+// contains a root span that describes the end-to-end latency, and one
+// or more subspans for its sub-operations. A trace can also contain
+// multiple root spans, or none at all. Spans do not need to be
+// contiguous - there may be gaps or overlaps between spans in a trace.
+//
+// The next available field id is 17.
+message Span {
+ // A unique identifier for a trace. All spans from the same trace share
+ // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes
+ // is considered invalid.
+ //
+ // This field is semantically required. Receiver should generate new
+ // random trace_id if empty or invalid trace_id was received.
+ //
+ // This field is required.
+ bytes trace_id = 1;
+
+ // A unique identifier for a span within a trace, assigned when the span
+ // is created. The ID is an 8-byte array. An ID with all zeroes is considered
+ // invalid.
+ //
+ // This field is semantically required. Receiver should generate new
+ // random span_id if empty or invalid span_id was received.
+ //
+ // This field is required.
+ bytes span_id = 2;
+
+ // trace_state conveys information about request position in multiple distributed tracing graphs.
+ // It is a trace_state in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header
+ // See also https://github.com/w3c/distributed-tracing for more details about this field.
+ string trace_state = 3;
+
+ // The `span_id` of this span's parent span. If this is a root span, then this
+ // field must be empty. The ID is an 8-byte array.
+ bytes parent_span_id = 4;
+
+ // A description of the span's operation.
+ //
+ // For example, the name can be a qualified method name or a file name
+ // and a line number where the operation is called. A best practice is to use
+ // the same display name at the same call point in an application.
+ // This makes it easier to correlate spans in different traces.
+ //
+ // This field is semantically required to be set to non-empty string.
+ // Empty value is equivalent to an unknown span name.
+ //
+ // This field is required.
+ string name = 5;
+
+ // SpanKind is the type of span. Can be used to specify additional relationships between spans
+ // in addition to a parent/child relationship.
+ enum SpanKind {
+ // Unspecified. Do NOT use as default.
+ // Implementations MAY assume SpanKind to be INTERNAL when receiving UNSPECIFIED.
+ SPAN_KIND_UNSPECIFIED = 0;
+
+ // Indicates that the span represents an internal operation within an application,
+ // as opposed to an operation happening at the boundaries. Default value.
+ SPAN_KIND_INTERNAL = 1;
+
+ // Indicates that the span covers server-side handling of an RPC or other
+ // remote network request.
+ SPAN_KIND_SERVER = 2;
+
+ // Indicates that the span describes a request to some remote service.
+ SPAN_KIND_CLIENT = 3;
+
+ // Indicates that the span describes a producer sending a message to a broker.
+ // Unlike CLIENT and SERVER, there is often no direct critical path latency relationship
+ // between producer and consumer spans. A PRODUCER span ends when the message was accepted
+ // by the broker while the logical processing of the message might span a much longer time.
+ SPAN_KIND_PRODUCER = 4;
+
+ // Indicates that the span describes consumer receiving a message from a broker.
+ // Like the PRODUCER kind, there is often no direct critical path latency relationship
+ // between producer and consumer spans.
+ SPAN_KIND_CONSUMER = 5;
+ }
+
+ // Distinguishes between spans generated in a particular context. For example,
+ // two spans with the same name may be distinguished using `CLIENT` (caller)
+ // and `SERVER` (callee) to identify queueing latency associated with the span.
+ SpanKind kind = 6;
+
+ // start_time_unix_nano is the start time of the span. On the client side, this is the time
+ // kept by the local machine where the span execution starts. On the server side, this
+ // is the time when the server's application handler starts running.
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
+ //
+ // This field is semantically required and it is expected that end_time >= start_time.
+ fixed64 start_time_unix_nano = 7;
+
+ // end_time_unix_nano is the end time of the span. On the client side, this is the time
+ // kept by the local machine where the span execution ends. On the server side, this
+ // is the time when the server application handler stops running.
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
+ //
+ // This field is semantically required and it is expected that end_time >= start_time.
+ fixed64 end_time_unix_nano = 8;
+
+ // attributes is a collection of key/value pairs. Note, global attributes
+ // like server name can be set using the resource API. Examples of attributes:
+ //
+ // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
+ // "/http/server_latency": 300
+ // "abc.com/myattribute": true
+ // "abc.com/score": 10.239
+ //
+ // The OpenTelemetry API specification further restricts the allowed value types:
+ // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/common.md#attributes
+ // Attribute keys MUST be unique (it is not allowed to have more than one
+ // attribute with the same key).
+ repeated opentelemetry.proto.common.v1.KeyValue attributes = 9;
+
+ // dropped_attributes_count is the number of attributes that were discarded. Attributes
+ // can be discarded because their keys are too long or because there are too many
+ // attributes. If this value is 0, then no attributes were dropped.
+ uint32 dropped_attributes_count = 10;
+
+ // Event is a time-stamped annotation of the span, consisting of user-supplied
+ // text description and key-value pairs.
+ message Event {
+ // time_unix_nano is the time the event occurred.
+ fixed64 time_unix_nano = 1;
+
+ // name of the event.
+ // This field is semantically required to be set to non-empty string.
+ string name = 2;
+
+ // attributes is a collection of attribute key/value pairs on the event.
+ // Attribute keys MUST be unique (it is not allowed to have more than one
+ // attribute with the same key).
+ repeated opentelemetry.proto.common.v1.KeyValue attributes = 3;
+
+ // dropped_attributes_count is the number of dropped attributes. If the value is 0,
+ // then no attributes were dropped.
+ uint32 dropped_attributes_count = 4;
+ }
+
+ // events is a collection of Event items.
+ repeated Event events = 11;
+
+ // dropped_events_count is the number of dropped events. If the value is 0, then no
+ // events were dropped.
+ uint32 dropped_events_count = 12;
+
+ // A pointer from the current span to another span in the same trace or in a
+ // different trace. For example, this can be used in batching operations,
+ // where a single batch handler processes multiple requests from different
+ // traces or when the handler receives a request from a different project.
+ message Link {
+ // A unique identifier of a trace that this linked span is part of. The ID is a
+ // 16-byte array.
+ bytes trace_id = 1;
+
+ // A unique identifier for the linked span. The ID is an 8-byte array.
+ bytes span_id = 2;
+
+ // The trace_state associated with the link.
+ string trace_state = 3;
+
+ // attributes is a collection of attribute key/value pairs on the link.
+ // Attribute keys MUST be unique (it is not allowed to have more than one
+ // attribute with the same key).
+ repeated opentelemetry.proto.common.v1.KeyValue attributes = 4;
+
+ // dropped_attributes_count is the number of dropped attributes. If the value is 0,
+ // then no attributes were dropped.
+ uint32 dropped_attributes_count = 5;
+ }
+
+ // links is a collection of Links, which are references from this span to a span
+ // in the same or different trace.
+ repeated Link links = 13;
+
+ // dropped_links_count is the number of dropped links after the maximum size was
+ // enforced. If this value is 0, then no links were dropped.
+ uint32 dropped_links_count = 14;
+
+ // An optional final status for this span. Semantically when Status isn't set, it means
+ // span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0).
+ Status status = 15;
+}
+
+// The Status type defines a logical error model that is suitable for different
+// programming environments, including REST APIs and RPC APIs.
+message Status {
+ reserved 1;
+
+ // A developer-facing human readable error message.
+ string message = 2;
+
+ // For the semantics of status codes see
+ // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#set-status
+ enum StatusCode {
+ // The default status.
+ STATUS_CODE_UNSET = 0;
+ // The Span has been validated by an Application developers or Operator to have
+ // completed successfully.
+ STATUS_CODE_OK = 1;
+ // The Span contains an error.
+ STATUS_CODE_ERROR = 2;
+ };
+
+ // The status code.
+ StatusCode code = 3;
+}
diff --git a/library/cpp/actors/wilson/wilson_event.h b/library/cpp/actors/wilson/wilson_event.h
index 7d89c33b518..4b6a7612c05 100644
--- a/library/cpp/actors/wilson/wilson_event.h
+++ b/library/cpp/actors/wilson/wilson_event.h
@@ -3,179 +3,19 @@
#include "wilson_trace.h"
#include <library/cpp/string_utils/base64/base64.h>
-
+#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/log.h>
namespace NWilson {
-#if !defined(_win_)
-// works only for those compilers, who trait C++ as ISO IEC 14882, not their own standard
-
-#define __UNROLL_PARAMS_8(N, F, X, ...) \
- F(X, N - 8) \
- __UNROLL_PARAMS_7(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_7(N, F, X, ...) \
- F(X, N - 7) \
- __UNROLL_PARAMS_6(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_6(N, F, X, ...) \
- F(X, N - 6) \
- __UNROLL_PARAMS_5(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_5(N, F, X, ...) \
- F(X, N - 5) \
- __UNROLL_PARAMS_4(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_4(N, F, X, ...) \
- F(X, N - 4) \
- __UNROLL_PARAMS_3(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_3(N, F, X, ...) \
- F(X, N - 3) \
- __UNROLL_PARAMS_2(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_2(N, F, X, ...) \
- F(X, N - 2) \
- __UNROLL_PARAMS_1(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_1(N, F, X) F(X, N - 1)
-#define __UNROLL_PARAMS_0(N, F)
-#define __EX(...) __VA_ARGS__
-#define __NUM_PARAMS(...) __NUM_PARAMS_SELECT_N(__VA_ARGS__, __NUM_PARAMS_SEQ)
-#define __NUM_PARAMS_SELECT_N(...) __EX(__NUM_PARAMS_SELECT(__VA_ARGS__))
-#define __NUM_PARAMS_SELECT(X, _1, _2, _3, _4, _5, _6, _7, _8, N, ...) N
-#define __NUM_PARAMS_SEQ 8, 7, 6, 5, 4, 3, 2, 1, 0, ERROR
-#define __CAT(X, Y) X##Y
-#define __UNROLL_PARAMS_N(N, F, ...) __EX(__CAT(__UNROLL_PARAMS_, N)(N, F, ##__VA_ARGS__))
-#define __UNROLL_PARAMS(F, ...) __UNROLL_PARAMS_N(__NUM_PARAMS(X, ##__VA_ARGS__), F, ##__VA_ARGS__)
-#define __EX2(F, X, INDEX) __INVOKE(F, __EX X, INDEX)
-#define __INVOKE(F, ...) F(__VA_ARGS__)
-
-#define __DECLARE_PARAM(X, INDEX) __EX2(__DECLARE_PARAM_X, X, INDEX)
-#define __DECLARE_PARAM_X(TYPE, NAME, INDEX) \
- static const struct T##NAME##Param \
- : ::NWilson::TParamBinder<INDEX, TYPE> { \
- T##NAME##Param() { \
- } \
- using ::NWilson::TParamBinder<INDEX, TYPE>::operator=; \
- } NAME;
-
-#define __TUPLE_PARAM(X, INDEX) __EX2(__TUPLE_PARAM_X, X, INDEX)
-#define __TUPLE_PARAM_X(TYPE, NAME, INDEX) TYPE,
-
-#define __OUTPUT_PARAM(X, INDEX) __EX2(__OUTPUT_PARAM_X, X, INDEX)
-#define __OUTPUT_PARAM_X(TYPE, NAME, INDEX) str << (INDEX ? ", " : "") << #NAME << "# " << std::get<INDEX>(ParamPack);
-
-#define __FILL_PARAM(P, INDEX) \
- do { \
- const auto& boundParam = (NParams::P); \
- boundParam.Apply(event.ParamPack); \
- } while (false);
-
-#define DECLARE_WILSON_EVENT(EVENT_NAME, ...) \
- namespace N##EVENT_NAME##Params { \
- __UNROLL_PARAMS(__DECLARE_PARAM, ##__VA_ARGS__) \
- \
- using TParamPack = std::tuple< \
- __UNROLL_PARAMS(__TUPLE_PARAM, ##__VA_ARGS__) char>; \
- } \
- struct T##EVENT_NAME { \
- using TParamPack = N##EVENT_NAME##Params::TParamPack; \
- TParamPack ParamPack; \
- \
- void Output(IOutputStream& str) { \
- str << #EVENT_NAME << "{"; \
- __UNROLL_PARAMS(__OUTPUT_PARAM, ##__VA_ARGS__) \
- str << "}"; \
- } \
- };
-
- template <size_t INDEX, typename T>
- class TBoundParam {
- mutable T Value;
-
- public:
- TBoundParam(T&& value)
- : Value(std::move(value))
- {
- }
-
- template <typename TParamPack>
- void Apply(TParamPack& pack) const {
- std::get<INDEX>(pack) = std::move(Value);
- }
- };
-
- template <size_t INDEX, typename T>
- struct TParamBinder {
- template <typename TValue>
- TBoundParam<INDEX, T> operator=(const TValue& value) const {
- return TBoundParam<INDEX, T>(TValue(value));
- }
-
- template <typename TValue>
- TBoundParam<INDEX, T> operator=(TValue&& value) const {
- return TBoundParam<INDEX, T>(std::move(value));
- }
- };
-
-// generate wilson event having parent TRACE_ID and span TRACE_ID to become parent of logged event
-#define WILSON_TRACE(CTX, TRACE_ID, EVENT_NAME, ...) \
- if (::NWilson::TraceEnabled(CTX)) { \
- ::NWilson::TTraceId* __traceId = (TRACE_ID); \
- if (__traceId && *__traceId) { \
- TInstant now = Now(); \
- T##EVENT_NAME event; \
- namespace NParams = N##EVENT_NAME##Params; \
- __UNROLL_PARAMS(__FILL_PARAM, ##__VA_ARGS__) \
- ::NWilson::TraceEvent((CTX), __traceId, event, now); \
- } \
- }
- inline ui32 GetNodeId(const NActors::TActorSystem& actorSystem) {
- return actorSystem.NodeId;
+ // stub for NBS
+ template<typename TActorSystem>
+ inline bool TraceEnabled(const TActorSystem&) {
+ return false;
}
- inline ui32 GetNodeId(const NActors::TActivationContext& ac) {
- return GetNodeId(*ac.ExecutorThread.ActorSystem);
- }
-
- constexpr ui32 WilsonComponentId = 430; // kikimrservices: wilson
-
- template <typename TActorSystem>
- bool TraceEnabled(const TActorSystem& ctx) {
- const auto* loggerSettings = ctx.LoggerSettings();
- return loggerSettings && loggerSettings->Satisfies(NActors::NLog::PRI_DEBUG, WilsonComponentId);
- }
-
- template <typename TActorSystem, typename TEvent>
- void TraceEvent(const TActorSystem& actorSystem, TTraceId* traceId, TEvent&& event, TInstant timestamp) {
- // ensure that we are not using obsolete TraceId
- traceId->CheckConsistency();
-
- // store parent id (for logging) and generate child trace id
- TTraceId parentTraceId(std::move(*traceId));
- *traceId = parentTraceId.Span();
-
- // create encoded string buffer containing timestamp
- const ui64 timestampValue = timestamp.GetValue();
- const size_t base64size = Base64EncodeBufSize(sizeof(timestampValue));
- char base64[base64size];
- char* end = Base64Encode(base64, reinterpret_cast<const ui8*>(&timestampValue), sizeof(timestampValue));
-
- // cut trailing padding character to save some space
- Y_VERIFY(end > base64 && end[-1] == '=');
- --end;
-
- // generate log record
- TString finalMessage;
- TStringOutput s(finalMessage);
- s << GetNodeId(actorSystem) << " " << TStringBuf(base64, end) << " ";
- traceId->Output(s, parentTraceId);
- s << " ";
- event.Output(s);
-
- // output wilson event FIXME: special facility for wilson events w/binary serialization
- NActors::MemLogAdapter(actorSystem, NActors::NLog::PRI_DEBUG, WilsonComponentId, std::move(finalMessage));
- }
-
-#else
-
-#define DECLARE_WILSON_EVENT(...)
-#define WILSON_TRACE(...)
-#endif
+ template<typename TActorSystem, typename TEvent>
+ inline void TraceEvent(const TActorSystem&, TTraceId*, TEvent&&, TInstant)
+ {}
} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp
new file mode 100644
index 00000000000..6b6ea03ccf6
--- /dev/null
+++ b/library/cpp/actors/wilson/wilson_span.cpp
@@ -0,0 +1,64 @@
+#include "wilson_span.h"
+#include <library/cpp/actors/core/log.h>
+#include <google/protobuf/text_format.h>
+
+namespace NWilson {
+
+ using namespace NActors;
+
+ void SerializeValue(TAttributeValue value, NCommonProto::AnyValue *pb) {
+ switch (value.index()) {
+ case 0:
+ pb->set_string_value(std::get<0>(std::move(value)));
+ break;
+
+ case 1:
+ pb->set_bool_value(std::get<1>(value));
+ break;
+
+ case 2:
+ pb->set_int_value(std::get<2>(value));
+ break;
+
+ case 3:
+ pb->set_double_value(std::get<3>(std::move(value)));
+ break;
+
+ case 4: {
+ auto *array = pb->mutable_array_value();
+ for (auto&& item : std::get<4>(std::move(value))) {
+ SerializeValue(std::move(item), array->add_values());
+ }
+ break;
+ }
+
+ case 5: {
+ auto *kv = pb->mutable_kvlist_value();
+ for (auto&& [key, value] : std::get<5>(std::move(value))) {
+ SerializeKeyValue(std::move(key), std::move(value), kv->add_values());
+ }
+ break;
+ }
+
+ case 6:
+ pb->set_bytes_value(std::get<6>(std::move(value)));
+ break;
+ }
+ }
+
+ void SerializeKeyValue(TString key, TAttributeValue value, NCommonProto::KeyValue *pb) {
+ pb->set_key(std::move(key));
+ SerializeValue(std::move(value), pb->mutable_value());
+ }
+
+ void TSpan::Send() {
+ if (TlsActivationContext) {
+ NProtoBuf::TextFormat::Printer p;
+ p.SetSingleLineMode(true);
+ TString s;
+ p.PrintToString(Data->Span, &s);
+ LOG_DEBUG_S(*TlsActivationContext, 430 /* WILSON */, s);
+ }
+ }
+
+} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_span.h b/library/cpp/actors/wilson/wilson_span.h
new file mode 100644
index 00000000000..c2de2f0b68c
--- /dev/null
+++ b/library/cpp/actors/wilson/wilson_span.h
@@ -0,0 +1,160 @@
+#pragma once
+
+#include <library/cpp/actors/wilson/protos/trace.pb.h>
+#include <util/generic/hash.h>
+#include <util/datetime/cputimer.h>
+
+#include "wilson_trace.h"
+
+namespace NWilson {
+
+ enum class ERelation {
+ FollowsFrom,
+ ChildOf,
+ };
+
+ namespace NTraceProto = opentelemetry::proto::trace::v1;
+ namespace NCommonProto = opentelemetry::proto::common::v1;
+
+ struct TArrayValue;
+ struct TKeyValueList;
+ struct TBytes;
+
+ using TAttributeValue = std::variant<
+ TString,
+ bool,
+ i64,
+ double,
+ TArrayValue,
+ TKeyValueList,
+ TBytes
+ >;
+
+ struct TArrayValue : std::vector<TAttributeValue> {};
+ struct TKeyValueList : THashMap<TString, TAttributeValue> {};
+ struct TBytes : TString {};
+
+ void SerializeKeyValue(TString key, TAttributeValue value, NCommonProto::KeyValue *pb);
+
+ class TSpan {
+ struct TData {
+ const TInstant StartTime;
+ const ui64 StartCycles;
+ const TTraceId TraceId;
+ NTraceProto::Span Span;
+
+ TData(TInstant startTime, ui64 startCycles, TTraceId traceId)
+ : StartTime(startTime)
+ , StartCycles(startCycles)
+ , TraceId(std::move(traceId))
+ {}
+ };
+
+ std::unique_ptr<TData> Data;
+
+ public:
+ TSpan() = default;
+ TSpan(const TSpan&) = delete;
+ TSpan(TSpan&&) = default;
+
+ TSpan(ui8 verbosity, ERelation /*relation*/, TTraceId parentId, TInstant now, std::optional<TString> name)
+ : Data(parentId ? std::make_unique<TData>(now, GetCycleCount(), parentId.Span(verbosity)) : nullptr)
+ {
+ if (*this) {
+ if (!parentId.IsRoot()) {
+ Data->Span.set_parent_span_id(parentId.GetSpanIdPtr(), parentId.GetSpanIdSize());
+ }
+ Data->Span.set_start_time_unix_nano(now.NanoSeconds());
+
+ if (name) {
+ Name(std::move(*name));
+ }
+ }
+ }
+
+ TSpan& operator =(const TSpan&) = delete;
+ TSpan& operator =(TSpan&&) = default;
+
+ operator bool() const {
+ return static_cast<bool>(Data);
+ }
+
+ TSpan& Name(TString name) {
+ if (*this) {
+ Data->Span.set_name(std::move(name));
+ }
+ return *this;
+ }
+
+ TSpan& Attribute(TString name, TAttributeValue value) {
+ if (*this) {
+ SerializeKeyValue(std::move(name), std::move(value), Data->Span.add_attributes());
+ }
+ return *this;
+ }
+
+ TSpan& Event(TString name, TKeyValueList attributes) {
+ if (*this) {
+ auto *event = Data->Span.add_events();
+ event->set_time_unix_nano(TimeUnixNano());
+ event->set_name(std::move(name));
+ for (auto&& [key, value] : attributes) {
+ SerializeKeyValue(std::move(key), std::move(value), event->add_attributes());
+ }
+ }
+ return *this;
+ }
+
+ TSpan& Link(const TTraceId& traceId, TKeyValueList attributes) {
+ if (*this) {
+ auto *link = Data->Span.add_links();
+ link->set_trace_id(traceId.GetTraceIdPtr(), traceId.GetTraceIdSize());
+ link->set_span_id(traceId.GetSpanIdPtr(), traceId.GetSpanIdSize());
+ for (auto&& [key, value] : attributes) {
+ SerializeKeyValue(std::move(key), std::move(value), link->add_attributes());
+ }
+ }
+ return *this;
+ }
+
+ void EndOk() {
+ if (*this) {
+ auto *status = Data->Span.mutable_status();
+ status->set_code(NTraceProto::Status::STATUS_CODE_OK);
+ }
+ End();
+ }
+
+ void EndError(TString error) {
+ if (*this) {
+ auto *status = Data->Span.mutable_status();
+ status->set_code(NTraceProto::Status::STATUS_CODE_ERROR);
+ status->set_message(std::move(error));
+ }
+ End();
+ }
+
+ void End() {
+ if (*this) {
+ Data->Span.set_end_time_unix_nano(TimeUnixNano());
+ Data->Span.set_trace_id(Data->TraceId.GetTraceIdPtr(), Data->TraceId.GetTraceIdSize());
+ Data->Span.set_span_id(Data->TraceId.GetSpanIdPtr(), Data->TraceId.GetSpanIdSize());
+ Send();
+ Data.reset(); // tracing finished
+ }
+ }
+
+ operator TTraceId() const {
+ return Data ? TTraceId(Data->TraceId) : TTraceId();
+ }
+
+ private:
+ void Send();
+
+ ui64 TimeUnixNano() const {
+ const TInstant now = Data->StartTime + CyclesToDuration(GetCycleCount() - Data->StartCycles);
+ return now.NanoSeconds();
+ }
+ };
+
+} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h
index cfbf93059b0..9937c1f8075 100644
--- a/library/cpp/actors/wilson/wilson_trace.h
+++ b/library/cpp/actors/wilson/wilson_trace.h
@@ -4,158 +4,160 @@
#include <util/stream/output.h>
#include <util/random/random.h>
+#include <util/random/fast.h>
#include <util/string/printf.h>
+#include <array>
+
namespace NWilson {
class TTraceId {
- ui64 TraceId; // Random id of topmost client request
- ui64 SpanId; // Span id of part of request currently being executed
+ using TTrace = std::array<ui64, 2>;
+
+ TTrace TraceId; // Random id of topmost client request
+ union {
+ struct {
+ ui64 SpanId : 48; // Span id of part of request currently being executed
+ ui64 Verbosity : 4;
+ ui64 TimeToLive : 12;
+ };
+ ui64 Raw;
+ };
private:
- TTraceId(ui64 traceId, ui64 spanId)
+ TTraceId(TTrace traceId, ui64 spanId, ui8 verbosity, ui32 timeToLive)
: TraceId(traceId)
- , SpanId(spanId)
{
+ SpanId = spanId;
+ Verbosity = verbosity;
+ TimeToLive = timeToLive;
}
- static ui64 GenerateTraceId() {
- ui64 traceId = 0;
- while (!traceId) {
- traceId = RandomNumber<ui64>();
+ static TTrace GenerateTraceId() {
+ for (;;) {
+ TTrace res;
+ ui32 *p = reinterpret_cast<ui32*>(res.data());
+
+ TReallyFastRng32 rng(RandomNumber<ui64>());
+ p[0] = rng();
+ p[1] = rng();
+ p[2] = rng();
+ p[3] = rng();
+
+ if (res[0] || res[1]) {
+ return res;
+ }
}
- return traceId;
}
static ui64 GenerateSpanId() {
- return RandomNumber<ui64>();
+ for (;;) {
+ if (const ui64 res = RandomNumber<ui64>(); res) { // SpanId can't be zero
+ return res;
+ }
+ }
}
public:
- using TSerializedTraceId = char[2 * sizeof(ui64)];
+ using TSerializedTraceId = char[sizeof(TTrace) + sizeof(ui64)];
public:
- TTraceId()
- : TraceId(0)
- , SpanId(0)
- {
- }
+ TTraceId(ui64) // NBS stub
+ : TTraceId()
+ {}
- explicit TTraceId(ui64 traceId)
- : TraceId(traceId)
- , SpanId(0)
- {
+ TTraceId() {
+ TraceId.fill(0);
+ Raw = 0;
}
- TTraceId(const TSerializedTraceId& in)
- : TraceId(reinterpret_cast<const ui64*>(in)[0])
- , SpanId(reinterpret_cast<const ui64*>(in)[1])
+ explicit TTraceId(TTrace traceId)
+ : TraceId(traceId)
{
+ Raw = 0;
}
// allow move semantic
TTraceId(TTraceId&& other)
: TraceId(other.TraceId)
- , SpanId(other.SpanId)
+ , Raw(other.Raw)
{
- other.TraceId = 0;
- other.SpanId = 1; // explicitly mark invalid
+ other.TraceId.fill(0);
+ }
+
+ // explicit copy
+ explicit TTraceId(const TTraceId& other)
+ : TraceId(other.TraceId)
+ , Raw(other.Raw)
+ {}
+
+ TTraceId(const TSerializedTraceId& in) {
+ auto p = reinterpret_cast<const ui64*>(in);
+ TraceId = {p[0], p[1]};
+ Raw = p[2];
+ }
+
+ void Serialize(TSerializedTraceId* out) const {
+ auto p = reinterpret_cast<ui64*>(*out);
+ p[0] = TraceId[0];
+ p[1] = TraceId[1];
+ p[2] = Raw;
}
TTraceId& operator=(TTraceId&& other) {
TraceId = other.TraceId;
- SpanId = other.SpanId;
- other.TraceId = 0;
- other.SpanId = 1; // explicitly mark invalid
+ other.TraceId.fill(0);
+ Raw = other.Raw;
return *this;
}
// do not allow implicit copy of trace id
- TTraceId(const TTraceId& other) = delete;
TTraceId& operator=(const TTraceId& other) = delete;
- static TTraceId NewTraceId() {
- return TTraceId(GenerateTraceId(), 0);
- }
-
- // create separate branch from this point
- TTraceId SeparateBranch() const {
- return Clone();
+ static TTraceId NewTraceId(ui8 verbosity, ui32 timeToLive) {
+ return TTraceId(GenerateTraceId(), 0, verbosity, timeToLive);
}
- TTraceId Clone() const {
- return TTraceId(TraceId, SpanId);
+ static TTraceId NewTraceId() { // NBS stub
+ return TTraceId();
}
- TTraceId Span() const {
- return *this ? TTraceId(TraceId, GenerateSpanId()) : TTraceId();
+ TTraceId Span(ui8 verbosity) const {
+ return *this && TimeToLive && verbosity <= Verbosity
+ ? TTraceId(TraceId, GenerateSpanId(), Verbosity, TimeToLive - 1)
+ : TTraceId();
}
- ui64 GetTraceId() const {
- return TraceId;
+ TTraceId Span() const { // compatibility stub
+ return {};
}
// Check if request tracing is enabled
explicit operator bool() const {
- return TraceId != 0;
- }
-
- // Output trace id into a string stream
- void Output(IOutputStream& s, const TTraceId& parentTraceId) const {
- union {
- ui8 buffer[3 * sizeof(ui64)];
- struct {
- ui64 traceId;
- ui64 spanId;
- ui64 parentSpanId;
- } x;
- };
-
- x.traceId = TraceId;
- x.spanId = SpanId;
- x.parentSpanId = parentTraceId.SpanId;
-
- const size_t base64size = Base64EncodeBufSize(sizeof(x));
- char base64[base64size];
- char* end = Base64Encode(base64, buffer, sizeof(x));
- s << TStringBuf(base64, end);
+ return TraceId[0] || TraceId[1];
}
- // output just span id into stream
- void OutputSpanId(IOutputStream& s) const {
- const size_t base64size = Base64EncodeBufSize(sizeof(SpanId));
- char base64[base64size];
- char* end = Base64Encode(base64, reinterpret_cast<const ui8*>(&SpanId), sizeof(SpanId));
-
- // cut trailing padding character
- Y_VERIFY(end > base64 && end[-1] == '=');
- --end;
-
- s << TStringBuf(base64, end);
- }
-
- void CheckConsistency() {
- // if TraceId is zero, then SpanId must be zero too
- Y_VERIFY_DEBUG(*this || !SpanId);
+ bool IsRoot() const {
+ return !SpanId;
}
friend bool operator==(const TTraceId& x, const TTraceId& y) {
- return x.TraceId == y.TraceId && x.SpanId == y.SpanId;
+ return x.TraceId == y.TraceId && x.Raw == y.Raw;
}
- TString ToString() const {
- return Sprintf("%" PRIu64 ":%" PRIu64, TraceId, SpanId);
+ ui8 GetVerbosity() const {
+ return Verbosity;
}
- bool IsFromSameTree(const TTraceId& other) const {
- return TraceId == other.TraceId;
- }
+ const void *GetTraceIdPtr() const { return TraceId.data(); }
+ static constexpr size_t GetTraceIdSize() { return sizeof(TTrace); }
+ const void *GetSpanIdPtr() const { return &Raw; }
+ static constexpr size_t GetSpanIdSize() { return sizeof(ui64); }
- void Serialize(TSerializedTraceId* out) const {
- ui64* p = reinterpret_cast<ui64*>(*out);
- p[0] = TraceId;
- p[1] = SpanId;
- }
+ // for compatibility with NBS
+ TTraceId Clone() const { return NWilson::TTraceId(*this); }
+ ui64 GetTraceId() const { return 0; }
+ void OutputSpanId(IOutputStream&) const {}
};
}