diff options
author | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-16 14:45:38 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-16 14:45:38 +0300 |
commit | 20d96d3531fa27af7cf21e8de55d71255b054cfd (patch) | |
tree | ab97943b524e5f222b839b4c767321538244eb36 | |
parent | c0fe73f947f62476b336002f7fa85301f8a80dee (diff) | |
download | ydb-20d96d3531fa27af7cf21e8de55d71255b054cfd.tar.gz |
Refactor Wilson KIKIMR-15105
ref:55ce6a1b08bba785ea62b3bdfea902ef7263cf57
102 files changed, 1259 insertions, 889 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index aefec545c0..dd95749e37 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -293,6 +293,8 @@ add_subdirectory(library/cpp/actors/dnsresolver) add_subdirectory(library/cpp/actors/interconnect) add_subdirectory(library/cpp/actors/dnscachelib) add_subdirectory(library/cpp/actors/helpers) +add_subdirectory(library/cpp/actors/wilson) +add_subdirectory(library/cpp/actors/wilson/protos) add_subdirectory(library/cpp/digest/crc32c) add_subdirectory(contrib/libs/crcutil) add_subdirectory(library/cpp/monlib/service/pages/tablesorter) @@ -394,10 +396,7 @@ add_subdirectory(library/cpp/digest/argonish/internal/proxies/sse41) add_subdirectory(library/cpp/digest/argonish/internal/proxies/ssse3) add_subdirectory(ydb/library/pdisk_io) add_subdirectory(ydb/library/pdisk_io/protos) -add_subdirectory(ydb/library/wilson) -add_subdirectory(library/cpp/actors/wilson) add_subdirectory(ydb/library/pretty_types_print/protobuf) -add_subdirectory(ydb/library/pretty_types_print/wilson) add_subdirectory(ydb/public/api/protos/out) add_subdirectory(ydb/core/mon) add_subdirectory(library/cpp/string_utils/url) @@ -997,6 +996,7 @@ add_subdirectory(ydb/library/keys/ut) add_subdirectory(ydb/library/login/ut) add_subdirectory(ydb/library/mkql_proto/ut) add_subdirectory(ydb/library/naming_conventions/ut) +add_subdirectory(ydb/library/pretty_types_print/wilson) add_subdirectory(ydb/library/protobuf_printer/ut) add_subdirectory(ydb/library/schlab/ut) add_subdirectory(ydb/library/security/ut) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 428284f5c8..fd309c8d8b 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -373,6 +373,8 @@ add_subdirectory(library/cpp/actors/dnsresolver) add_subdirectory(library/cpp/actors/interconnect) add_subdirectory(library/cpp/actors/dnscachelib) add_subdirectory(library/cpp/actors/helpers) +add_subdirectory(library/cpp/actors/wilson) +add_subdirectory(library/cpp/actors/wilson/protos) add_subdirectory(library/cpp/digest/crc32c) add_subdirectory(contrib/libs/crcutil) add_subdirectory(library/cpp/monlib/service/pages/tablesorter) @@ -474,10 +476,7 @@ add_subdirectory(library/cpp/digest/argonish/internal/proxies/sse41) add_subdirectory(library/cpp/digest/argonish/internal/proxies/ssse3) add_subdirectory(ydb/library/pdisk_io) add_subdirectory(ydb/library/pdisk_io/protos) -add_subdirectory(ydb/library/wilson) -add_subdirectory(library/cpp/actors/wilson) add_subdirectory(ydb/library/pretty_types_print/protobuf) -add_subdirectory(ydb/library/pretty_types_print/wilson) add_subdirectory(ydb/public/api/protos/out) add_subdirectory(ydb/core/mon) add_subdirectory(library/cpp/string_utils/url) @@ -1092,6 +1091,7 @@ add_subdirectory(ydb/library/keys/ut) add_subdirectory(ydb/library/login/ut) add_subdirectory(ydb/library/mkql_proto/ut) add_subdirectory(ydb/library/naming_conventions/ut) +add_subdirectory(ydb/library/pretty_types_print/wilson) add_subdirectory(ydb/library/protobuf_printer/ut) add_subdirectory(ydb/library/schlab/ut) add_subdirectory(ydb/library/security/ut) diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp index 23deaffd10..dfd79bf96e 100644 --- a/library/cpp/actors/core/events_undelivered.cpp +++ b/library/cpp/actors/core/events_undelivered.cpp @@ -44,15 +44,15 @@ namespace NActors { const TActorId recp = OnNondeliveryHolder ? OnNondeliveryHolder->Recipient : TActorId(); if (Event) - return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, TraceId.Clone()); + return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, std::move(TraceId)); else - return new IEventHandle(Type, updatedFlags, recp, Sender, Buffer, Cookie, &Recipient, TraceId.Clone()); + return new IEventHandle(Type, updatedFlags, recp, Sender, Buffer, Cookie, &Recipient, std::move(TraceId)); } if (Flags & FlagTrackDelivery) { const ui32 updatedFlags = Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered); return new IEventHandle(Sender, Recipient, new TEvents::TEvUndelivered(Type, reason, unsure), updatedFlags, - Cookie, nullptr, TraceId.Clone()); + Cookie, nullptr, std::move(TraceId)); } return nullptr; diff --git a/library/cpp/actors/interconnect/CMakeLists.darwin.txt b/library/cpp/actors/interconnect/CMakeLists.darwin.txt index 16d1546920..9bd0c83fce 100644 --- a/library/cpp/actors/interconnect/CMakeLists.darwin.txt +++ b/library/cpp/actors/interconnect/CMakeLists.darwin.txt @@ -21,6 +21,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC cpp-actors-prof cpp-actors-protos cpp-actors-util + cpp-actors-wilson cpp-digest-crc32c library-cpp-json library-cpp-lwtrace diff --git a/library/cpp/actors/interconnect/CMakeLists.linux.txt b/library/cpp/actors/interconnect/CMakeLists.linux.txt index 464477ce1d..c0e1b39c45 100644 --- a/library/cpp/actors/interconnect/CMakeLists.linux.txt +++ b/library/cpp/actors/interconnect/CMakeLists.linux.txt @@ -21,6 +21,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC cpp-actors-prof cpp-actors-protos cpp-actors-util + cpp-actors-wilson cpp-digest-crc32c library-cpp-json library-cpp-lwtrace diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h index 8a46ffd535..474b3dba8d 100644 --- a/library/cpp/actors/interconnect/events_local.h +++ b/library/cpp/actors/interconnect/events_local.h @@ -7,16 +7,9 @@ #include <util/network/address.h> #include "interconnect_stream.h" -#include "packet.h" #include "types.h" namespace NActors { - struct TProgramInfo { - ui64 PID = 0; - ui64 StartTime = 0; - ui64 Serial = 0; - }; - enum class ENetwork : ui32 { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // local messages diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index a66ba2a154..32f015af54 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -11,20 +11,18 @@ LWTRACE_USING(ACTORLIB_PROVIDER); namespace NActors { - DECLARE_WILSON_EVENT(EventSentToSocket); - DECLARE_WILSON_EVENT(EventReceivedFromSocket); - bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) { - const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr); + const size_t descrSize = Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1); + const size_t amount = sizeof(TChannelPart) + descrSize; if (task.GetVirtualFreeAmount() < amount) { return false; } - NWilson::TTraceId traceId(event.Descr.TraceId); -// if (ctx) { -// WILSON_TRACE(*ctx, &traceId, EventSentToSocket); -// } - traceId.Serialize(&event.Descr.TraceId); + auto& span = *event.Span; + span.EndOk(); + const NWilson::TTraceId traceId(span); + event.Span.reset(); + LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); task.Orbit.Take(event.Orbit); @@ -33,8 +31,34 @@ namespace NActors { TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea()); part->Channel = ChannelId | TChannelPart::LastPartFlag; - part->Size = sizeof(TEventDescr); - memcpy(part + 1, &event.Descr, sizeof(TEventDescr)); + part->Size = descrSize; + + void *descr = part + 1; + if (Params.UseExtendedTraceFmt) { + auto *p = static_cast<TEventDescr2*>(descr); + *p = { + event.Descr.Type, + event.Descr.Flags, + event.Descr.Recipient, + event.Descr.Sender, + event.Descr.Cookie, + {}, + event.Descr.Checksum + }; + traceId.Serialize(&p->TraceId); + } else { + auto *p = static_cast<TEventDescr1*>(descr); + *p = { + event.Descr.Type, + event.Descr.Flags, + event.Descr.Recipient, + event.Descr.Sender, + event.Descr.Cookie, + {}, + event.Descr.Checksum + }; + } + task.AppendBuf(part, amount); *weightConsumed += amount; OutputQueueSize -= part->Size; diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index cf68cd27fd..e48d294420 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -8,7 +8,7 @@ #include <util/generic/vector.h> #include <util/generic/map.h> #include <util/stream/walk.h> -#include <library/cpp/actors/wilson/wilson_event.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include "interconnect_common.h" #include "interconnect_counters.h" @@ -55,10 +55,18 @@ namespace NActors { ~TEventOutputChannel() { } - std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) { + std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TInstant now) { TEventHolder& event = Pool.Allocate(Queue); - const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr); + const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1)); OutputQueueSize += bytes; + event.Span.emplace(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf, + NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue"); + if (*event.Span) { + auto& span = *event.Span; + span + .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) + .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)); + } return std::make_pair(bytes, &event); } @@ -102,8 +110,6 @@ namespace NActors { }; EState State = EState::INITIAL; - static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr); - protected: ui64 OutputQueueSize = 0; diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp index 9ede998d8e..78e114a574 100644 --- a/library/cpp/actors/interconnect/interconnect_handshake.cpp +++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp @@ -496,6 +496,7 @@ namespace NActors { request.SetRequestModernFrame(true); request.SetRequestAuthOnly(Common->Settings.TlsAuthOnly); + request.SetRequestExtendedTraceFmt(true); SendExBlock(request, "ExRequest"); @@ -526,6 +527,7 @@ namespace NActors { Params.Encryption = success.GetStartEncryption(); Params.UseModernFrame = success.GetUseModernFrame(); Params.AuthOnly = Params.Encryption && success.GetAuthOnly(); + Params.UseExtendedTraceFmt = success.GetUseExtendedTraceFmt(); if (success.HasServerScopeId()) { ParsePeerScopeId(success.GetServerScopeId()); } @@ -681,6 +683,7 @@ namespace NActors { Params.UseModernFrame = request.GetRequestModernFrame(); Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly; + Params.UseExtendedTraceFmt = request.GetRequestExtendedTraceFmt(); if (request.HasClientScopeId()) { ParsePeerScopeId(request.GetClientScopeId()); @@ -706,6 +709,7 @@ namespace NActors { } success.SetUseModernFrame(Params.UseModernFrame); success.SetAuthOnly(Params.AuthOnly); + success.SetUseExtendedTraceFmt(Params.UseExtendedTraceFmt); SendExBlock(record, "ExReply"); // extract sender actor id (self virtual id) diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp index 65bb956e58..cbb2d16e46 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp @@ -270,13 +270,43 @@ namespace NActors { Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size); - TEventDescr descr; + char buffer[Max(sizeof(TEventDescr1), sizeof(TEventDescr2))]; + auto& v1 = reinterpret_cast<TEventDescr1&>(buffer); + auto& v2 = reinterpret_cast<TEventDescr2&>(buffer); if (~part.Channel & TChannelPart::LastPartFlag) { Payload.ExtractFront(part.Size, eventData); - } else if (part.Size != sizeof(descr)) { + } else if (part.Size != sizeof(v1) && part.Size != sizeof(v2)) { LOG_CRIT_IC_SESSION("ICIS11", "incorrect last part of an event"); return DestroySession(TDisconnectReason::FormatError()); - } else if (Payload.ExtractFrontPlain(&descr, sizeof(descr))) { + } else if (Payload.ExtractFrontPlain(buffer, part.Size)) { + TEventData descr; + + switch (part.Size) { + case sizeof(TEventDescr1): + descr = { + v1.Type, + v1.Flags, + v1.Recipient, + v1.Sender, + v1.Cookie, + NWilson::TTraceId(), // do not accept traces with old format + v1.Checksum + }; + break; + + case sizeof(TEventDescr2): + descr = { + v2.Type, + v2.Flags, + v2.Recipient, + v2.Sender, + v2.Cookie, + NWilson::TTraceId(v2.TraceId), + v2.Checksum + }; + break; + } + Metrics->IncInputChannelsIncomingEvents(channel); ProcessEvent(*eventData, descr); *eventData = TRope(); @@ -286,7 +316,7 @@ namespace NActors { } } - void TInputSessionTCP::ProcessEvent(TRope& data, TEventDescr& descr) { + void TInputSessionTCP::ProcessEvent(TRope& data, TEventData& descr) { if (!Params.UseModernFrame || descr.Checksum) { ui32 checksum = 0; for (const auto&& [data, size] : data) { @@ -305,7 +335,7 @@ namespace NActors { MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)), descr.Cookie, Params.PeerScopeId, - NWilson::TTraceId(descr.TraceId)); + std::move(descr.TraceId)); if (Common->EventFilter && !Common->EventFilter->CheckIncomingEvent(*ev, Common->LocalScopeId)) { LOG_CRIT_IC_SESSION("ICIC03", "Event dropped due to scope error LocalScopeId# %s PeerScopeId# %s Type# 0x%08" PRIx32, ScopeIdToString(Common->LocalScopeId).data(), ScopeIdToString(Params.PeerScopeId).data(), descr.Type); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 2ded7f9f53..1602f4b8b2 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -12,8 +12,6 @@ namespace NActors { LWTRACE_USING(ACTORLIB_PROVIDER); - DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes)); - template<typename T> T Coalesce(T&& x) { return x; @@ -128,7 +126,7 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev); + const auto [dataSize, event] = oChannel.Push(*ev, TActivationContext::Now()); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; @@ -142,9 +140,6 @@ namespace NActors { ++NumEventsInReadyChannels; LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData()); - WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush, - QueueSizeInEvents = oChannel.GetQueueSize(), - QueueSizeInBytes = oChannel.GetBufferedAmountOfData()); // check for overloaded queues ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h index 5d4a381e1f..9933bd489e 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.h +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h @@ -249,7 +249,7 @@ namespace NActors { void ReceiveData(); void ProcessHeader(size_t headerLen); void ProcessPayload(ui64& numDataBytes); - void ProcessEvent(TRope& data, TEventDescr& descr); + void ProcessEvent(TRope& data, TEventData& descr); bool ReadMore(); void ReestablishConnection(TDisconnectReason reason); diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp index e2c289ed59..9ba173e330 100644 --- a/library/cpp/actors/interconnect/packet.cpp +++ b/library/cpp/actors/interconnect/packet.cpp @@ -13,7 +13,6 @@ ui32 TEventHolder::Fill(IEventHandle& ev) { Descr.Recipient = ev.Recipient; Descr.Sender = ev.Sender; Descr.Cookie = ev.Cookie; - ev.TraceId.Serialize(&Descr.TraceId); ForwardRecipient = ev.GetForwardOnNondeliveryRecipient(); EventActuallySerialized = 0; Descr.Checksum = 0; diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 4ba50a2b5f..3a6aadfb9f 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -7,21 +7,18 @@ #include <library/cpp/containers/stack_vector/stack_vec.h> #include <library/cpp/actors/util/rope.h> #include <library/cpp/actors/prof/tag.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <library/cpp/digest/crc32c/crc32c.h> #include <library/cpp/lwtrace/shuttle.h> #include <util/generic/string.h> #include <util/generic/list.h> +#include "types.h" + #ifndef FORCE_EVENT_CHECKSUM #define FORCE_EVENT_CHECKSUM 0 #endif -using NActors::IEventBase; -using NActors::IEventHandle; -using NActors::TActorId; -using NActors::TConstIoVec; -using NActors::TEventSerializedData; - Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, size_t len) { if constexpr (NSan::MSanIsOn()) { const char *begin = static_cast<const char*>(data); @@ -33,14 +30,6 @@ Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, return Crc32cExtend(checksum, data, len); } -struct TSessionParams { - bool Encryption = {}; - bool UseModernFrame = {}; - bool AuthOnly = {}; - TString AuthCN; - NActors::TScopeId PeerScopeId; -}; - struct TTcpPacketHeader_v1 { ui32 HeaderCRC32; ui32 PayloadCRC32; @@ -87,21 +76,40 @@ union TTcpPacketBuf { } v2; }; +struct TEventData { + ui32 Type; + ui32 Flags; + TActorId Recipient; + TActorId Sender; + ui64 Cookie; + NWilson::TTraceId TraceId; + ui32 Checksum; +}; + #pragma pack(push, 1) -struct TEventDescr { +struct TEventDescr1 { + ui32 Type; + ui32 Flags; + TActorId Recipient; + TActorId Sender; + ui64 Cookie; + char TraceId[16]; // obsolete trace id format + ui32 Checksum; +}; + +struct TEventDescr2 { ui32 Type; ui32 Flags; TActorId Recipient; TActorId Sender; ui64 Cookie; - // wilson trace id is stored as a serialized entity to avoid using complex object with prohibited copy ctor NWilson::TTraceId::TSerializedTraceId TraceId; ui32 Checksum; }; #pragma pack(pop) struct TEventHolder : TNonCopyable { - TEventDescr Descr; + TEventData Descr; TActorId ForwardRecipient; THolder<IEventBase> Event; TIntrusivePtr<TEventSerializedData> Buffer; @@ -109,6 +117,7 @@ struct TEventHolder : TNonCopyable { ui32 EventSerializedSize; ui32 EventActuallySerialized; mutable NLWTrace::TOrbit Orbit; + std::optional<NWilson::TSpan> Span; ui32 Fill(IEventHandle& ev); @@ -123,7 +132,7 @@ struct TEventHolder : TNonCopyable { } void ForwardOnNondelivery(bool unsure) { - TEventDescr& d = Descr; + TEventData& d = Descr; const TActorId& r = d.Recipient; const TActorId& s = d.Sender; const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr; @@ -137,6 +146,7 @@ struct TEventHolder : TNonCopyable { Event.Reset(); Buffer.Reset(); Orbit.Reset(); + Span.reset(); } }; diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h index 2662c50c22..e0965d7807 100644 --- a/library/cpp/actors/interconnect/types.h +++ b/library/cpp/actors/interconnect/types.h @@ -1,5 +1,9 @@ #pragma once +#include <library/cpp/actors/core/defs.h> +#include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/event.h> + #include <util/generic/string.h> namespace NActors { @@ -40,4 +44,26 @@ namespace NActors { static TVector<const char*> Reasons; }; + struct TProgramInfo { + ui64 PID = 0; + ui64 StartTime = 0; + ui64 Serial = 0; + }; + + struct TSessionParams { + bool Encryption = {}; + bool UseModernFrame = {}; + bool AuthOnly = {}; + bool UseExtendedTraceFmt = {}; + TString AuthCN; + NActors::TScopeId PeerScopeId; + }; + } // NActors + +using NActors::IEventBase; +using NActors::IEventHandle; +using NActors::TActorId; +using NActors::TConstIoVec; +using NActors::TEventSerializedData; +using NActors::TSessionParams; diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto index 2e3b0d0d15..69721b1e06 100644 --- a/library/cpp/actors/protos/interconnect.proto +++ b/library/cpp/actors/protos/interconnect.proto @@ -70,8 +70,8 @@ message THandshakeRequest { optional bool DoCheckCookie = 17; optional bool RequestModernFrame = 18; - optional bool RequestAuthOnly = 19; + optional bool RequestExtendedTraceFmt = 20; } message THandshakeSuccess { @@ -92,8 +92,8 @@ message THandshakeSuccess { optional TScopeId ServerScopeId = 10; optional bool UseModernFrame = 11; - optional bool AuthOnly = 12; + optional bool UseExtendedTraceFmt = 13; } message THandshakeReply { diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt index 03d8c542ff..65b2e32d87 100644 --- a/library/cpp/actors/wilson/CMakeLists.txt +++ b/library/cpp/actors/wilson/CMakeLists.txt @@ -7,9 +7,13 @@ -add_library(cpp-actors-wilson INTERFACE) -target_link_libraries(cpp-actors-wilson INTERFACE +add_library(cpp-actors-wilson) +target_link_libraries(cpp-actors-wilson PUBLIC contrib-libs-cxxsupp yutil - cpp-string_utils-base64 + cpp-actors-core + actors-wilson-protos +) +target_sources(cpp-actors-wilson PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_span.cpp ) diff --git a/library/cpp/actors/wilson/protos/CMakeLists.txt b/library/cpp/actors/wilson/protos/CMakeLists.txt new file mode 100644 index 0000000000..a7cc2e94ff --- /dev/null +++ b/library/cpp/actors/wilson/protos/CMakeLists.txt @@ -0,0 +1,33 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(actors-wilson-protos) +target_link_libraries(actors-wilson-protos PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-protobuf +) +target_proto_messages(actors-wilson-protos PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/common.proto + ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/resource.proto + ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/trace.proto +) +target_proto_addincls(actors-wilson-protos + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(actors-wilson-protos + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) diff --git a/library/cpp/actors/wilson/protos/common.proto b/library/cpp/actors/wilson/protos/common.proto new file mode 100644 index 0000000000..8562ee6d1e --- /dev/null +++ b/library/cpp/actors/wilson/protos/common.proto @@ -0,0 +1,84 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.common.v1; + +// AnyValue is used to represent any type of attribute value. AnyValue may contain a +// primitive value such as a string or integer or it may contain an arbitrary nested +// object containing arrays, key-value lists and primitives. +message AnyValue { + // The value is one of the listed fields. It is valid for all values to be unspecified + // in which case this AnyValue is considered to be "empty". + oneof value { + string string_value = 1; + bool bool_value = 2; + int64 int_value = 3; + double double_value = 4; + ArrayValue array_value = 5; + KeyValueList kvlist_value = 6; + bytes bytes_value = 7; + } +} + +// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message +// since oneof in AnyValue does not allow repeated fields. +message ArrayValue { + // Array of values. The array may be empty (contain 0 elements). + repeated AnyValue values = 1; +} + +// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message +// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need +// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to +// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches +// are semantically equivalent. +message KeyValueList { + // A collection of key/value pairs of key-value pairs. The list may be empty (may + // contain 0 elements). + // The keys MUST be unique (it is not allowed to have more than one + // value with the same key). + repeated KeyValue values = 1; +} + +// KeyValue is a key-value pair that is used to store Span attributes, Link +// attributes, etc. +message KeyValue { + string key = 1; + AnyValue value = 2; +} + +// InstrumentationLibrary is a message representing the instrumentation library information +// such as the fully qualified name and version. +// InstrumentationLibrary is wire-compatible with InstrumentationScope for binary +// Protobuf format. +// This message is deprecated and will be removed on June 15, 2022. +message InstrumentationLibrary { + option deprecated = true; + + // An empty instrumentation library name means the name is unknown. + string name = 1; + string version = 2; +} + +// InstrumentationScope is a message representing the instrumentation scope information +// such as the fully qualified name and version. +message InstrumentationScope { + // An empty instrumentation scope name means the name is unknown. + string name = 1; + string version = 2; + repeated KeyValue attributes = 3; + uint32 dropped_attributes_count = 4; +} diff --git a/library/cpp/actors/wilson/protos/resource.proto b/library/cpp/actors/wilson/protos/resource.proto new file mode 100644 index 0000000000..752bf287ea --- /dev/null +++ b/library/cpp/actors/wilson/protos/resource.proto @@ -0,0 +1,31 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.resource.v1; + +import "library/cpp/actors/wilson/protos/common.proto"; + +// Resource information. +message Resource { + // Set of attributes that describe the resource. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated opentelemetry.proto.common.v1.KeyValue attributes = 1; + + // dropped_attributes_count is the number of dropped attributes. If the value is 0, then + // no attributes were dropped. + uint32 dropped_attributes_count = 2; +} diff --git a/library/cpp/actors/wilson/protos/trace.proto b/library/cpp/actors/wilson/protos/trace.proto new file mode 100644 index 0000000000..0b645cf8ad --- /dev/null +++ b/library/cpp/actors/wilson/protos/trace.proto @@ -0,0 +1,326 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package opentelemetry.proto.trace.v1; + +import "library/cpp/actors/wilson/protos/common.proto"; +import "library/cpp/actors/wilson/protos/resource.proto"; + +// TracesData represents the traces data that can be stored in a persistent storage, +// OR can be embedded by other protocols that transfer OTLP traces data but do +// not implement the OTLP protocol. +// +// The main difference between this message and collector protocol is that +// in this message there will not be any "control" or "metadata" specific to +// OTLP protocol. +// +// When new fields are added into this message, the OTLP request MUST be updated +// as well. +message TracesData { + // An array of ResourceSpans. + // For data coming from a single resource this array will typically contain + // one element. Intermediary nodes that receive data from multiple origins + // typically batch the data before forwarding further and in that case this + // array will contain multiple elements. + repeated ResourceSpans resource_spans = 1; +} + +// A collection of ScopeSpans from a Resource. +message ResourceSpans { + // The resource for the spans in this message. + // If this field is not set then no resource info is known. + opentelemetry.proto.resource.v1.Resource resource = 1; + + // A list of ScopeSpans that originate from a resource. + repeated ScopeSpans scope_spans = 2; + + // A list of InstrumentationLibrarySpans that originate from a resource. + // This field is deprecated and will be removed after grace period expires on June 15, 2022. + // + // During the grace period the following rules SHOULD be followed: + // + // For Binary Protobufs + // ==================== + // Binary Protobuf senders SHOULD NOT set instrumentation_library_spans. Instead + // scope_spans SHOULD be set. + // + // Binary Protobuf receivers SHOULD check if instrumentation_library_spans is set + // and scope_spans is not set then the value in instrumentation_library_spans + // SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans. + // If scope_spans is set then instrumentation_library_spans SHOULD be ignored. + // + // For JSON + // ======== + // JSON senders that set instrumentation_library_spans field MAY also set + // scope_spans to carry the same spans, essentially double-publishing the same data. + // Such double-publishing MAY be controlled by a user-settable option. + // If double-publishing is not used then the senders SHOULD set scope_spans and + // SHOULD NOT set instrumentation_library_spans. + // + // JSON receivers SHOULD check if instrumentation_library_spans is set and + // scope_spans is not set then the value in instrumentation_library_spans + // SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans. + // If scope_spans is set then instrumentation_library_spans field SHOULD be ignored. + repeated InstrumentationLibrarySpans instrumentation_library_spans = 1000 [deprecated = true]; + + // This schema_url applies to the data in the "resource" field. It does not apply + // to the data in the "scope_spans" field which have their own schema_url field. + string schema_url = 3; +} + +// A collection of Spans produced by an InstrumentationScope. +message ScopeSpans { + // The instrumentation scope information for the spans in this message. + // Semantically when InstrumentationScope isn't set, it is equivalent with + // an empty instrumentation scope name (unknown). + opentelemetry.proto.common.v1.InstrumentationScope scope = 1; + + // A list of Spans that originate from an instrumentation scope. + repeated Span spans = 2; + + // This schema_url applies to all spans and span events in the "spans" field. + string schema_url = 3; +} + +// A collection of Spans produced by an InstrumentationLibrary. +// InstrumentationLibrarySpans is wire-compatible with ScopeSpans for binary +// Protobuf format. +// This message is deprecated and will be removed on June 15, 2022. +message InstrumentationLibrarySpans { + option deprecated = true; + + // The instrumentation library information for the spans in this message. + // Semantically when InstrumentationLibrary isn't set, it is equivalent with + // an empty instrumentation library name (unknown). + opentelemetry.proto.common.v1.InstrumentationLibrary instrumentation_library = 1; + + // A list of Spans that originate from an instrumentation library. + repeated Span spans = 2; + + // This schema_url applies to all spans and span events in the "spans" field. + string schema_url = 3; +} + +// Span represents a single operation within a trace. Spans can be +// nested to form a trace tree. Spans may also be linked to other spans +// from the same or different trace and form graphs. Often, a trace +// contains a root span that describes the end-to-end latency, and one +// or more subspans for its sub-operations. A trace can also contain +// multiple root spans, or none at all. Spans do not need to be +// contiguous - there may be gaps or overlaps between spans in a trace. +// +// The next available field id is 17. +message Span { + // A unique identifier for a trace. All spans from the same trace share + // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes + // is considered invalid. + // + // This field is semantically required. Receiver should generate new + // random trace_id if empty or invalid trace_id was received. + // + // This field is required. + bytes trace_id = 1; + + // A unique identifier for a span within a trace, assigned when the span + // is created. The ID is an 8-byte array. An ID with all zeroes is considered + // invalid. + // + // This field is semantically required. Receiver should generate new + // random span_id if empty or invalid span_id was received. + // + // This field is required. + bytes span_id = 2; + + // trace_state conveys information about request position in multiple distributed tracing graphs. + // It is a trace_state in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header + // See also https://github.com/w3c/distributed-tracing for more details about this field. + string trace_state = 3; + + // The `span_id` of this span's parent span. If this is a root span, then this + // field must be empty. The ID is an 8-byte array. + bytes parent_span_id = 4; + + // A description of the span's operation. + // + // For example, the name can be a qualified method name or a file name + // and a line number where the operation is called. A best practice is to use + // the same display name at the same call point in an application. + // This makes it easier to correlate spans in different traces. + // + // This field is semantically required to be set to non-empty string. + // Empty value is equivalent to an unknown span name. + // + // This field is required. + string name = 5; + + // SpanKind is the type of span. Can be used to specify additional relationships between spans + // in addition to a parent/child relationship. + enum SpanKind { + // Unspecified. Do NOT use as default. + // Implementations MAY assume SpanKind to be INTERNAL when receiving UNSPECIFIED. + SPAN_KIND_UNSPECIFIED = 0; + + // Indicates that the span represents an internal operation within an application, + // as opposed to an operation happening at the boundaries. Default value. + SPAN_KIND_INTERNAL = 1; + + // Indicates that the span covers server-side handling of an RPC or other + // remote network request. + SPAN_KIND_SERVER = 2; + + // Indicates that the span describes a request to some remote service. + SPAN_KIND_CLIENT = 3; + + // Indicates that the span describes a producer sending a message to a broker. + // Unlike CLIENT and SERVER, there is often no direct critical path latency relationship + // between producer and consumer spans. A PRODUCER span ends when the message was accepted + // by the broker while the logical processing of the message might span a much longer time. + SPAN_KIND_PRODUCER = 4; + + // Indicates that the span describes consumer receiving a message from a broker. + // Like the PRODUCER kind, there is often no direct critical path latency relationship + // between producer and consumer spans. + SPAN_KIND_CONSUMER = 5; + } + + // Distinguishes between spans generated in a particular context. For example, + // two spans with the same name may be distinguished using `CLIENT` (caller) + // and `SERVER` (callee) to identify queueing latency associated with the span. + SpanKind kind = 6; + + // start_time_unix_nano is the start time of the span. On the client side, this is the time + // kept by the local machine where the span execution starts. On the server side, this + // is the time when the server's application handler starts running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 start_time_unix_nano = 7; + + // end_time_unix_nano is the end time of the span. On the client side, this is the time + // kept by the local machine where the span execution ends. On the server side, this + // is the time when the server application handler stops running. + // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + // + // This field is semantically required and it is expected that end_time >= start_time. + fixed64 end_time_unix_nano = 8; + + // attributes is a collection of key/value pairs. Note, global attributes + // like server name can be set using the resource API. Examples of attributes: + // + // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + // "/http/server_latency": 300 + // "abc.com/myattribute": true + // "abc.com/score": 10.239 + // + // The OpenTelemetry API specification further restricts the allowed value types: + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/common.md#attributes + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated opentelemetry.proto.common.v1.KeyValue attributes = 9; + + // dropped_attributes_count is the number of attributes that were discarded. Attributes + // can be discarded because their keys are too long or because there are too many + // attributes. If this value is 0, then no attributes were dropped. + uint32 dropped_attributes_count = 10; + + // Event is a time-stamped annotation of the span, consisting of user-supplied + // text description and key-value pairs. + message Event { + // time_unix_nano is the time the event occurred. + fixed64 time_unix_nano = 1; + + // name of the event. + // This field is semantically required to be set to non-empty string. + string name = 2; + + // attributes is a collection of attribute key/value pairs on the event. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated opentelemetry.proto.common.v1.KeyValue attributes = 3; + + // dropped_attributes_count is the number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 4; + } + + // events is a collection of Event items. + repeated Event events = 11; + + // dropped_events_count is the number of dropped events. If the value is 0, then no + // events were dropped. + uint32 dropped_events_count = 12; + + // A pointer from the current span to another span in the same trace or in a + // different trace. For example, this can be used in batching operations, + // where a single batch handler processes multiple requests from different + // traces or when the handler receives a request from a different project. + message Link { + // A unique identifier of a trace that this linked span is part of. The ID is a + // 16-byte array. + bytes trace_id = 1; + + // A unique identifier for the linked span. The ID is an 8-byte array. + bytes span_id = 2; + + // The trace_state associated with the link. + string trace_state = 3; + + // attributes is a collection of attribute key/value pairs on the link. + // Attribute keys MUST be unique (it is not allowed to have more than one + // attribute with the same key). + repeated opentelemetry.proto.common.v1.KeyValue attributes = 4; + + // dropped_attributes_count is the number of dropped attributes. If the value is 0, + // then no attributes were dropped. + uint32 dropped_attributes_count = 5; + } + + // links is a collection of Links, which are references from this span to a span + // in the same or different trace. + repeated Link links = 13; + + // dropped_links_count is the number of dropped links after the maximum size was + // enforced. If this value is 0, then no links were dropped. + uint32 dropped_links_count = 14; + + // An optional final status for this span. Semantically when Status isn't set, it means + // span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0). + Status status = 15; +} + +// The Status type defines a logical error model that is suitable for different +// programming environments, including REST APIs and RPC APIs. +message Status { + reserved 1; + + // A developer-facing human readable error message. + string message = 2; + + // For the semantics of status codes see + // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#set-status + enum StatusCode { + // The default status. + STATUS_CODE_UNSET = 0; + // The Span has been validated by an Application developers or Operator to have + // completed successfully. + STATUS_CODE_OK = 1; + // The Span contains an error. + STATUS_CODE_ERROR = 2; + }; + + // The status code. + StatusCode code = 3; +} diff --git a/library/cpp/actors/wilson/wilson_event.h b/library/cpp/actors/wilson/wilson_event.h index 7d89c33b51..4b6a7612c0 100644 --- a/library/cpp/actors/wilson/wilson_event.h +++ b/library/cpp/actors/wilson/wilson_event.h @@ -3,179 +3,19 @@ #include "wilson_trace.h" #include <library/cpp/string_utils/base64/base64.h> - +#include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/log.h> namespace NWilson { -#if !defined(_win_) -// works only for those compilers, who trait C++ as ISO IEC 14882, not their own standard - -#define __UNROLL_PARAMS_8(N, F, X, ...) \ - F(X, N - 8) \ - __UNROLL_PARAMS_7(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_7(N, F, X, ...) \ - F(X, N - 7) \ - __UNROLL_PARAMS_6(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_6(N, F, X, ...) \ - F(X, N - 6) \ - __UNROLL_PARAMS_5(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_5(N, F, X, ...) \ - F(X, N - 5) \ - __UNROLL_PARAMS_4(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_4(N, F, X, ...) \ - F(X, N - 4) \ - __UNROLL_PARAMS_3(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_3(N, F, X, ...) \ - F(X, N - 3) \ - __UNROLL_PARAMS_2(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_2(N, F, X, ...) \ - F(X, N - 2) \ - __UNROLL_PARAMS_1(N, F, ##__VA_ARGS__) -#define __UNROLL_PARAMS_1(N, F, X) F(X, N - 1) -#define __UNROLL_PARAMS_0(N, F) -#define __EX(...) __VA_ARGS__ -#define __NUM_PARAMS(...) __NUM_PARAMS_SELECT_N(__VA_ARGS__, __NUM_PARAMS_SEQ) -#define __NUM_PARAMS_SELECT_N(...) __EX(__NUM_PARAMS_SELECT(__VA_ARGS__)) -#define __NUM_PARAMS_SELECT(X, _1, _2, _3, _4, _5, _6, _7, _8, N, ...) N -#define __NUM_PARAMS_SEQ 8, 7, 6, 5, 4, 3, 2, 1, 0, ERROR -#define __CAT(X, Y) X##Y -#define __UNROLL_PARAMS_N(N, F, ...) __EX(__CAT(__UNROLL_PARAMS_, N)(N, F, ##__VA_ARGS__)) -#define __UNROLL_PARAMS(F, ...) __UNROLL_PARAMS_N(__NUM_PARAMS(X, ##__VA_ARGS__), F, ##__VA_ARGS__) -#define __EX2(F, X, INDEX) __INVOKE(F, __EX X, INDEX) -#define __INVOKE(F, ...) F(__VA_ARGS__) - -#define __DECLARE_PARAM(X, INDEX) __EX2(__DECLARE_PARAM_X, X, INDEX) -#define __DECLARE_PARAM_X(TYPE, NAME, INDEX) \ - static const struct T##NAME##Param \ - : ::NWilson::TParamBinder<INDEX, TYPE> { \ - T##NAME##Param() { \ - } \ - using ::NWilson::TParamBinder<INDEX, TYPE>::operator=; \ - } NAME; - -#define __TUPLE_PARAM(X, INDEX) __EX2(__TUPLE_PARAM_X, X, INDEX) -#define __TUPLE_PARAM_X(TYPE, NAME, INDEX) TYPE, - -#define __OUTPUT_PARAM(X, INDEX) __EX2(__OUTPUT_PARAM_X, X, INDEX) -#define __OUTPUT_PARAM_X(TYPE, NAME, INDEX) str << (INDEX ? ", " : "") << #NAME << "# " << std::get<INDEX>(ParamPack); - -#define __FILL_PARAM(P, INDEX) \ - do { \ - const auto& boundParam = (NParams::P); \ - boundParam.Apply(event.ParamPack); \ - } while (false); - -#define DECLARE_WILSON_EVENT(EVENT_NAME, ...) \ - namespace N##EVENT_NAME##Params { \ - __UNROLL_PARAMS(__DECLARE_PARAM, ##__VA_ARGS__) \ - \ - using TParamPack = std::tuple< \ - __UNROLL_PARAMS(__TUPLE_PARAM, ##__VA_ARGS__) char>; \ - } \ - struct T##EVENT_NAME { \ - using TParamPack = N##EVENT_NAME##Params::TParamPack; \ - TParamPack ParamPack; \ - \ - void Output(IOutputStream& str) { \ - str << #EVENT_NAME << "{"; \ - __UNROLL_PARAMS(__OUTPUT_PARAM, ##__VA_ARGS__) \ - str << "}"; \ - } \ - }; - - template <size_t INDEX, typename T> - class TBoundParam { - mutable T Value; - - public: - TBoundParam(T&& value) - : Value(std::move(value)) - { - } - - template <typename TParamPack> - void Apply(TParamPack& pack) const { - std::get<INDEX>(pack) = std::move(Value); - } - }; - - template <size_t INDEX, typename T> - struct TParamBinder { - template <typename TValue> - TBoundParam<INDEX, T> operator=(const TValue& value) const { - return TBoundParam<INDEX, T>(TValue(value)); - } - - template <typename TValue> - TBoundParam<INDEX, T> operator=(TValue&& value) const { - return TBoundParam<INDEX, T>(std::move(value)); - } - }; - -// generate wilson event having parent TRACE_ID and span TRACE_ID to become parent of logged event -#define WILSON_TRACE(CTX, TRACE_ID, EVENT_NAME, ...) \ - if (::NWilson::TraceEnabled(CTX)) { \ - ::NWilson::TTraceId* __traceId = (TRACE_ID); \ - if (__traceId && *__traceId) { \ - TInstant now = Now(); \ - T##EVENT_NAME event; \ - namespace NParams = N##EVENT_NAME##Params; \ - __UNROLL_PARAMS(__FILL_PARAM, ##__VA_ARGS__) \ - ::NWilson::TraceEvent((CTX), __traceId, event, now); \ - } \ - } - inline ui32 GetNodeId(const NActors::TActorSystem& actorSystem) { - return actorSystem.NodeId; + // stub for NBS + template<typename TActorSystem> + inline bool TraceEnabled(const TActorSystem&) { + return false; } - inline ui32 GetNodeId(const NActors::TActivationContext& ac) { - return GetNodeId(*ac.ExecutorThread.ActorSystem); - } - - constexpr ui32 WilsonComponentId = 430; // kikimrservices: wilson - - template <typename TActorSystem> - bool TraceEnabled(const TActorSystem& ctx) { - const auto* loggerSettings = ctx.LoggerSettings(); - return loggerSettings && loggerSettings->Satisfies(NActors::NLog::PRI_DEBUG, WilsonComponentId); - } - - template <typename TActorSystem, typename TEvent> - void TraceEvent(const TActorSystem& actorSystem, TTraceId* traceId, TEvent&& event, TInstant timestamp) { - // ensure that we are not using obsolete TraceId - traceId->CheckConsistency(); - - // store parent id (for logging) and generate child trace id - TTraceId parentTraceId(std::move(*traceId)); - *traceId = parentTraceId.Span(); - - // create encoded string buffer containing timestamp - const ui64 timestampValue = timestamp.GetValue(); - const size_t base64size = Base64EncodeBufSize(sizeof(timestampValue)); - char base64[base64size]; - char* end = Base64Encode(base64, reinterpret_cast<const ui8*>(×tampValue), sizeof(timestampValue)); - - // cut trailing padding character to save some space - Y_VERIFY(end > base64 && end[-1] == '='); - --end; - - // generate log record - TString finalMessage; - TStringOutput s(finalMessage); - s << GetNodeId(actorSystem) << " " << TStringBuf(base64, end) << " "; - traceId->Output(s, parentTraceId); - s << " "; - event.Output(s); - - // output wilson event FIXME: special facility for wilson events w/binary serialization - NActors::MemLogAdapter(actorSystem, NActors::NLog::PRI_DEBUG, WilsonComponentId, std::move(finalMessage)); - } - -#else - -#define DECLARE_WILSON_EVENT(...) -#define WILSON_TRACE(...) -#endif + template<typename TActorSystem, typename TEvent> + inline void TraceEvent(const TActorSystem&, TTraceId*, TEvent&&, TInstant) + {} } // NWilson diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp new file mode 100644 index 0000000000..6b6ea03ccf --- /dev/null +++ b/library/cpp/actors/wilson/wilson_span.cpp @@ -0,0 +1,64 @@ +#include "wilson_span.h" +#include <library/cpp/actors/core/log.h> +#include <google/protobuf/text_format.h> + +namespace NWilson { + + using namespace NActors; + + void SerializeValue(TAttributeValue value, NCommonProto::AnyValue *pb) { + switch (value.index()) { + case 0: + pb->set_string_value(std::get<0>(std::move(value))); + break; + + case 1: + pb->set_bool_value(std::get<1>(value)); + break; + + case 2: + pb->set_int_value(std::get<2>(value)); + break; + + case 3: + pb->set_double_value(std::get<3>(std::move(value))); + break; + + case 4: { + auto *array = pb->mutable_array_value(); + for (auto&& item : std::get<4>(std::move(value))) { + SerializeValue(std::move(item), array->add_values()); + } + break; + } + + case 5: { + auto *kv = pb->mutable_kvlist_value(); + for (auto&& [key, value] : std::get<5>(std::move(value))) { + SerializeKeyValue(std::move(key), std::move(value), kv->add_values()); + } + break; + } + + case 6: + pb->set_bytes_value(std::get<6>(std::move(value))); + break; + } + } + + void SerializeKeyValue(TString key, TAttributeValue value, NCommonProto::KeyValue *pb) { + pb->set_key(std::move(key)); + SerializeValue(std::move(value), pb->mutable_value()); + } + + void TSpan::Send() { + if (TlsActivationContext) { + NProtoBuf::TextFormat::Printer p; + p.SetSingleLineMode(true); + TString s; + p.PrintToString(Data->Span, &s); + LOG_DEBUG_S(*TlsActivationContext, 430 /* WILSON */, s); + } + } + +} // NWilson diff --git a/library/cpp/actors/wilson/wilson_span.h b/library/cpp/actors/wilson/wilson_span.h new file mode 100644 index 0000000000..c2de2f0b68 --- /dev/null +++ b/library/cpp/actors/wilson/wilson_span.h @@ -0,0 +1,160 @@ +#pragma once + +#include <library/cpp/actors/wilson/protos/trace.pb.h> +#include <util/generic/hash.h> +#include <util/datetime/cputimer.h> + +#include "wilson_trace.h" + +namespace NWilson { + + enum class ERelation { + FollowsFrom, + ChildOf, + }; + + namespace NTraceProto = opentelemetry::proto::trace::v1; + namespace NCommonProto = opentelemetry::proto::common::v1; + + struct TArrayValue; + struct TKeyValueList; + struct TBytes; + + using TAttributeValue = std::variant< + TString, + bool, + i64, + double, + TArrayValue, + TKeyValueList, + TBytes + >; + + struct TArrayValue : std::vector<TAttributeValue> {}; + struct TKeyValueList : THashMap<TString, TAttributeValue> {}; + struct TBytes : TString {}; + + void SerializeKeyValue(TString key, TAttributeValue value, NCommonProto::KeyValue *pb); + + class TSpan { + struct TData { + const TInstant StartTime; + const ui64 StartCycles; + const TTraceId TraceId; + NTraceProto::Span Span; + + TData(TInstant startTime, ui64 startCycles, TTraceId traceId) + : StartTime(startTime) + , StartCycles(startCycles) + , TraceId(std::move(traceId)) + {} + }; + + std::unique_ptr<TData> Data; + + public: + TSpan() = default; + TSpan(const TSpan&) = delete; + TSpan(TSpan&&) = default; + + TSpan(ui8 verbosity, ERelation /*relation*/, TTraceId parentId, TInstant now, std::optional<TString> name) + : Data(parentId ? std::make_unique<TData>(now, GetCycleCount(), parentId.Span(verbosity)) : nullptr) + { + if (*this) { + if (!parentId.IsRoot()) { + Data->Span.set_parent_span_id(parentId.GetSpanIdPtr(), parentId.GetSpanIdSize()); + } + Data->Span.set_start_time_unix_nano(now.NanoSeconds()); + + if (name) { + Name(std::move(*name)); + } + } + } + + TSpan& operator =(const TSpan&) = delete; + TSpan& operator =(TSpan&&) = default; + + operator bool() const { + return static_cast<bool>(Data); + } + + TSpan& Name(TString name) { + if (*this) { + Data->Span.set_name(std::move(name)); + } + return *this; + } + + TSpan& Attribute(TString name, TAttributeValue value) { + if (*this) { + SerializeKeyValue(std::move(name), std::move(value), Data->Span.add_attributes()); + } + return *this; + } + + TSpan& Event(TString name, TKeyValueList attributes) { + if (*this) { + auto *event = Data->Span.add_events(); + event->set_time_unix_nano(TimeUnixNano()); + event->set_name(std::move(name)); + for (auto&& [key, value] : attributes) { + SerializeKeyValue(std::move(key), std::move(value), event->add_attributes()); + } + } + return *this; + } + + TSpan& Link(const TTraceId& traceId, TKeyValueList attributes) { + if (*this) { + auto *link = Data->Span.add_links(); + link->set_trace_id(traceId.GetTraceIdPtr(), traceId.GetTraceIdSize()); + link->set_span_id(traceId.GetSpanIdPtr(), traceId.GetSpanIdSize()); + for (auto&& [key, value] : attributes) { + SerializeKeyValue(std::move(key), std::move(value), link->add_attributes()); + } + } + return *this; + } + + void EndOk() { + if (*this) { + auto *status = Data->Span.mutable_status(); + status->set_code(NTraceProto::Status::STATUS_CODE_OK); + } + End(); + } + + void EndError(TString error) { + if (*this) { + auto *status = Data->Span.mutable_status(); + status->set_code(NTraceProto::Status::STATUS_CODE_ERROR); + status->set_message(std::move(error)); + } + End(); + } + + void End() { + if (*this) { + Data->Span.set_end_time_unix_nano(TimeUnixNano()); + Data->Span.set_trace_id(Data->TraceId.GetTraceIdPtr(), Data->TraceId.GetTraceIdSize()); + Data->Span.set_span_id(Data->TraceId.GetSpanIdPtr(), Data->TraceId.GetSpanIdSize()); + Send(); + Data.reset(); // tracing finished + } + } + + operator TTraceId() const { + return Data ? TTraceId(Data->TraceId) : TTraceId(); + } + + private: + void Send(); + + ui64 TimeUnixNano() const { + const TInstant now = Data->StartTime + CyclesToDuration(GetCycleCount() - Data->StartCycles); + return now.NanoSeconds(); + } + }; + +} // NWilson diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h index cfbf93059b..9937c1f807 100644 --- a/library/cpp/actors/wilson/wilson_trace.h +++ b/library/cpp/actors/wilson/wilson_trace.h @@ -4,158 +4,160 @@ #include <util/stream/output.h> #include <util/random/random.h> +#include <util/random/fast.h> #include <util/string/printf.h> +#include <array> + namespace NWilson { class TTraceId { - ui64 TraceId; // Random id of topmost client request - ui64 SpanId; // Span id of part of request currently being executed + using TTrace = std::array<ui64, 2>; + + TTrace TraceId; // Random id of topmost client request + union { + struct { + ui64 SpanId : 48; // Span id of part of request currently being executed + ui64 Verbosity : 4; + ui64 TimeToLive : 12; + }; + ui64 Raw; + }; private: - TTraceId(ui64 traceId, ui64 spanId) + TTraceId(TTrace traceId, ui64 spanId, ui8 verbosity, ui32 timeToLive) : TraceId(traceId) - , SpanId(spanId) { + SpanId = spanId; + Verbosity = verbosity; + TimeToLive = timeToLive; } - static ui64 GenerateTraceId() { - ui64 traceId = 0; - while (!traceId) { - traceId = RandomNumber<ui64>(); + static TTrace GenerateTraceId() { + for (;;) { + TTrace res; + ui32 *p = reinterpret_cast<ui32*>(res.data()); + + TReallyFastRng32 rng(RandomNumber<ui64>()); + p[0] = rng(); + p[1] = rng(); + p[2] = rng(); + p[3] = rng(); + + if (res[0] || res[1]) { + return res; + } } - return traceId; } static ui64 GenerateSpanId() { - return RandomNumber<ui64>(); + for (;;) { + if (const ui64 res = RandomNumber<ui64>(); res) { // SpanId can't be zero + return res; + } + } } public: - using TSerializedTraceId = char[2 * sizeof(ui64)]; + using TSerializedTraceId = char[sizeof(TTrace) + sizeof(ui64)]; public: - TTraceId() - : TraceId(0) - , SpanId(0) - { - } + TTraceId(ui64) // NBS stub + : TTraceId() + {} - explicit TTraceId(ui64 traceId) - : TraceId(traceId) - , SpanId(0) - { + TTraceId() { + TraceId.fill(0); + Raw = 0; } - TTraceId(const TSerializedTraceId& in) - : TraceId(reinterpret_cast<const ui64*>(in)[0]) - , SpanId(reinterpret_cast<const ui64*>(in)[1]) + explicit TTraceId(TTrace traceId) + : TraceId(traceId) { + Raw = 0; } // allow move semantic TTraceId(TTraceId&& other) : TraceId(other.TraceId) - , SpanId(other.SpanId) + , Raw(other.Raw) { - other.TraceId = 0; - other.SpanId = 1; // explicitly mark invalid + other.TraceId.fill(0); + } + + // explicit copy + explicit TTraceId(const TTraceId& other) + : TraceId(other.TraceId) + , Raw(other.Raw) + {} + + TTraceId(const TSerializedTraceId& in) { + auto p = reinterpret_cast<const ui64*>(in); + TraceId = {p[0], p[1]}; + Raw = p[2]; + } + + void Serialize(TSerializedTraceId* out) const { + auto p = reinterpret_cast<ui64*>(*out); + p[0] = TraceId[0]; + p[1] = TraceId[1]; + p[2] = Raw; } TTraceId& operator=(TTraceId&& other) { TraceId = other.TraceId; - SpanId = other.SpanId; - other.TraceId = 0; - other.SpanId = 1; // explicitly mark invalid + other.TraceId.fill(0); + Raw = other.Raw; return *this; } // do not allow implicit copy of trace id - TTraceId(const TTraceId& other) = delete; TTraceId& operator=(const TTraceId& other) = delete; - static TTraceId NewTraceId() { - return TTraceId(GenerateTraceId(), 0); - } - - // create separate branch from this point - TTraceId SeparateBranch() const { - return Clone(); + static TTraceId NewTraceId(ui8 verbosity, ui32 timeToLive) { + return TTraceId(GenerateTraceId(), 0, verbosity, timeToLive); } - TTraceId Clone() const { - return TTraceId(TraceId, SpanId); + static TTraceId NewTraceId() { // NBS stub + return TTraceId(); } - TTraceId Span() const { - return *this ? TTraceId(TraceId, GenerateSpanId()) : TTraceId(); + TTraceId Span(ui8 verbosity) const { + return *this && TimeToLive && verbosity <= Verbosity + ? TTraceId(TraceId, GenerateSpanId(), Verbosity, TimeToLive - 1) + : TTraceId(); } - ui64 GetTraceId() const { - return TraceId; + TTraceId Span() const { // compatibility stub + return {}; } // Check if request tracing is enabled explicit operator bool() const { - return TraceId != 0; - } - - // Output trace id into a string stream - void Output(IOutputStream& s, const TTraceId& parentTraceId) const { - union { - ui8 buffer[3 * sizeof(ui64)]; - struct { - ui64 traceId; - ui64 spanId; - ui64 parentSpanId; - } x; - }; - - x.traceId = TraceId; - x.spanId = SpanId; - x.parentSpanId = parentTraceId.SpanId; - - const size_t base64size = Base64EncodeBufSize(sizeof(x)); - char base64[base64size]; - char* end = Base64Encode(base64, buffer, sizeof(x)); - s << TStringBuf(base64, end); + return TraceId[0] || TraceId[1]; } - // output just span id into stream - void OutputSpanId(IOutputStream& s) const { - const size_t base64size = Base64EncodeBufSize(sizeof(SpanId)); - char base64[base64size]; - char* end = Base64Encode(base64, reinterpret_cast<const ui8*>(&SpanId), sizeof(SpanId)); - - // cut trailing padding character - Y_VERIFY(end > base64 && end[-1] == '='); - --end; - - s << TStringBuf(base64, end); - } - - void CheckConsistency() { - // if TraceId is zero, then SpanId must be zero too - Y_VERIFY_DEBUG(*this || !SpanId); + bool IsRoot() const { + return !SpanId; } friend bool operator==(const TTraceId& x, const TTraceId& y) { - return x.TraceId == y.TraceId && x.SpanId == y.SpanId; + return x.TraceId == y.TraceId && x.Raw == y.Raw; } - TString ToString() const { - return Sprintf("%" PRIu64 ":%" PRIu64, TraceId, SpanId); + ui8 GetVerbosity() const { + return Verbosity; } - bool IsFromSameTree(const TTraceId& other) const { - return TraceId == other.TraceId; - } + const void *GetTraceIdPtr() const { return TraceId.data(); } + static constexpr size_t GetTraceIdSize() { return sizeof(TTrace); } + const void *GetSpanIdPtr() const { return &Raw; } + static constexpr size_t GetSpanIdSize() { return sizeof(ui64); } - void Serialize(TSerializedTraceId* out) const { - ui64* p = reinterpret_cast<ui64*>(*out); - p[0] = TraceId; - p[1] = SpanId; - } + // for compatibility with NBS + TTraceId Clone() const { return NWilson::TTraceId(*this); } + ui64 GetTraceId() const { return 0; } + void OutputSpanId(IOutputStream&) const {} }; } diff --git a/ydb/core/actorlib_impl/test_interconnect_ut.cpp b/ydb/core/actorlib_impl/test_interconnect_ut.cpp index 0d9d3535ef..75bc226027 100644 --- a/ydb/core/actorlib_impl/test_interconnect_ut.cpp +++ b/ydb/core/actorlib_impl/test_interconnect_ut.cpp @@ -618,7 +618,7 @@ Y_UNIT_TEST_SUITE(TInterconnectTest) { const_cast<TString::value_type*>(blob.data())[i] = TString::value_type(i % 256); - auto sentTraceId = NWilson::TTraceId::NewTraceId(); + auto sentTraceId = NWilson::TTraceId::NewTraceId(15, 4095); runtime.Send(new IEventHandle(edge, edge, new TEvents::TEvBlob(blob), @@ -630,7 +630,6 @@ Y_UNIT_TEST_SUITE(TInterconnectTest) { UNIT_ASSERT_EQUAL(handle->Cookie, 13); UNIT_ASSERT_EQUAL(event->Blob, blob); UNIT_ASSERT_EQUAL((bool)handle->TraceId, true); - UNIT_ASSERT(handle->TraceId.IsFromSameTree(sentTraceId)); } Y_UNIT_TEST(TestAddressResolve) { diff --git a/ydb/core/base/CMakeLists.txt b/ydb/core/base/CMakeLists.txt index 1346c75550..e125480825 100644 --- a/ydb/core/base/CMakeLists.txt +++ b/ydb/core/base/CMakeLists.txt @@ -15,6 +15,7 @@ target_link_libraries(ydb-core-base PUBLIC cpp-actors-helpers cpp-actors-interconnect cpp-actors-protos + cpp-actors-wilson cpp-deprecated-enum_codegen library-cpp-logger library-cpp-lwtrace @@ -30,8 +31,6 @@ target_link_libraries(ydb-core-base PUBLIC ydb-library-login ydb-library-pdisk_io library-pretty_types_print-protobuf - library-pretty_types_print-wilson - ydb-library-wilson api-protos-out library-yql-minikql library-cpp-resource @@ -89,6 +88,7 @@ target_link_libraries(ydb-core-base.global PUBLIC cpp-actors-helpers cpp-actors-interconnect cpp-actors-protos + cpp-actors-wilson cpp-deprecated-enum_codegen library-cpp-logger library-cpp-lwtrace @@ -104,8 +104,6 @@ target_link_libraries(ydb-core-base.global PUBLIC ydb-library-login ydb-library-pdisk_io library-pretty_types_print-protobuf - library-pretty_types_print-wilson - ydb-library-wilson api-protos-out library-yql-minikql library-cpp-resource diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 58011ce1bf..c9e6504c45 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -14,8 +14,7 @@ #include <ydb/core/protos/blobstorage_config.pb.h> #include <ydb/core/util/yverify_stream.h> -#include <ydb/library/wilson/wilson_event.h> - +#include <library/cpp/actors/wilson/wilson_trace.h> #include <library/cpp/lwtrace/shuttle.h> #include <util/stream/str.h> diff --git a/ydb/core/blobstorage/backpressure/defs.h b/ydb/core/blobstorage/backpressure/defs.h index d510a3545e..d179e1ba76 100644 --- a/ydb/core/blobstorage/backpressure/defs.h +++ b/ydb/core/blobstorage/backpressure/defs.h @@ -7,7 +7,6 @@ #include <ydb/core/blobstorage/vdisk/common/vdisk_costmodel.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> #include <ydb/core/blobstorage/base/blobstorage_events.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> #include <ydb/core/protos/blobstorage.pb.h> #include <ydb/core/base/interconnect_channels.h> @@ -18,4 +17,5 @@ #include <library/cpp/actors/core/mailbox.h> #include <library/cpp/actors/core/mon.h> #include <library/cpp/containers/intrusive_rb_tree/rb_tree.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <google/protobuf/message.h> diff --git a/ydb/core/blobstorage/backpressure/queue.cpp b/ydb/core/blobstorage/backpressure/queue.cpp index e1574116a0..6e29552d81 100644 --- a/ydb/core/blobstorage/backpressure/queue.cpp +++ b/ydb/core/blobstorage/backpressure/queue.cpp @@ -115,7 +115,7 @@ void TBlobStorageQueue::SetItemQueue(TItem& item, EItemQueue newQueue) { } } -void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, IActor *actor) { +void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui32 vdiskOrderNumber) { const TInstant now = ctx.Now(); const bool sendMeCostSettings = now >= CostSettingsUpdate; @@ -198,10 +198,11 @@ void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& re ++*QueueItemsSent; // send item - WILSON_TRACE_FROM_ACTOR(ctx, *actor, &item.TraceId, EvBlobStorageQueueForward, - InQueueWaitingItems = GetItemsWaiting(), InQueueWaitingBytes = GetBytesWaiting()); + item.Span.Event("SendToVDisk", {{ + {"VDiskOrderNumber", vdiskOrderNumber} + }}); item.Event.SendToVDisk(ctx, remoteVDisk, item.QueueCookie, item.MsgId, item.SequenceId, sendMeCostSettings, - item.TraceId.Clone(), ClientId, item.ProcessingTimer); + item.Span, ClientId, item.ProcessingTimer); // update counters as far as item got sent ++NextMsgId; @@ -217,6 +218,7 @@ void TBlobStorageQueue::ReplyWithError(TItem& item, NKikimrProto::EReplyStatus s << " cookie# " << item.Event.GetCookie() << " processingTime# " << processingTime); + item.Span.EndError(TStringBuilder() << NKikimrProto::EReplyStatus_Name(status) << ": " << errorReason); ctx.Send(item.Event.GetSender(), item.Event.MakeErrorReply(status, errorReason, QueueDeserializedItems, QueueDeserializedBytes), 0, item.Event.GetCookie()); @@ -247,6 +249,7 @@ bool TBlobStorageQueue::OnResponse(ui64 msgId, ui64 sequenceId, ui64 cookie, TAc it->Event.GetByteSize(), !relevant); InFlightLookup.erase(lookupIt); + it->Span.EndOk(); EraseItem(Queues.InFlight, it); // unpause execution when InFlight queue gets empty @@ -323,6 +326,7 @@ void TBlobStorageQueue::OnConnect() { TBlobStorageQueue::TItemList::iterator TBlobStorageQueue::EraseItem(TItemList& queue, TItemList::iterator it) { SetItemQueue(*it, EItemQueue::NotSet); + it->Span.EndError("EraseItem called"); TItemList::iterator nextIter = std::next(it); if (Queues.Unused.size() < MaxUnusedItems) { Queues.Unused.splice(Queues.Unused.end(), queue, it); diff --git a/ydb/core/blobstorage/backpressure/queue.h b/ydb/core/blobstorage/backpressure/queue.h index 996975c9c3..d59c035a58 100644 --- a/ydb/core/blobstorage/backpressure/queue.h +++ b/ydb/core/blobstorage/backpressure/queue.h @@ -43,7 +43,7 @@ class TBlobStorageQueue { { EItemQueue Queue; TCostModel::TMessageCostEssence CostEssence; - NWilson::TTraceId TraceId; + NWilson::TSpan Span; TEventHolder Event; ui64 MsgId; ui64 SequenceId; @@ -54,15 +54,16 @@ class TBlobStorageQueue { THPTimer ProcessingTimer; TTrackableList<TItem>::iterator Iterator; - template<typename TPtr> - TItem(TPtr& event, TInstant deadline, + template<typename TEvent> + TItem(TAutoPtr<TEventHandle<TEvent>>& event, TInstant deadline, const NMonitoring::TDynamicCounters::TCounterPtr& serItems, const NMonitoring::TDynamicCounters::TCounterPtr& serBytes, const TBSProxyContextPtr& bspctx, ui32 interconnectChannel, - bool local) + bool local, TInstant now) : Queue(EItemQueue::NotSet) , CostEssence(*event->Get()) - , TraceId(std::move(event->TraceId)) + , Span(9 /*verbosity*/, NWilson::ERelation::ChildOf, std::move(event->TraceId), now, TStringBuilder() + << "Backpressure(" << TypeName<TEvent>() << ")") , Event(event, serItems, serBytes, bspctx, interconnectChannel, local) , MsgId(Max<ui64>()) , SequenceId(0) @@ -183,7 +184,7 @@ public: void SetItemQueue(TItem& item, EItemQueue newQueue); - void SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, IActor *actor); + void SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui32 vdiskOrderNumber); void ReplyWithError(TItem& item, NKikimrProto::EReplyStatus status, const TString& errorReason, const TActorContext& ctx); bool Expecting(ui64 msgId, ui64 sequenceId) const; @@ -196,13 +197,13 @@ public: void OnConnect(); template<typename TPtr> - void Enqueue(const TActorContext &ctx, TPtr& event, TInstant deadline, bool local) { + void Enqueue(const TActorContext &ctx, TPtr& event, TInstant deadline, bool local, TInstant now) { Y_UNUSED(ctx); TItemList::iterator newIt; if (Queues.Unused.empty()) { newIt = Queues.Waiting.emplace(Queues.Waiting.end(), event, deadline, - QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local); + QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local, now); ++*QueueSize; } else { newIt = Queues.Unused.begin(); @@ -211,7 +212,7 @@ public: TItem& item = *newIt; item.~TItem(); new(&item) TItem(event, deadline, QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, - InterconnectChannel, local); + InterconnectChannel, local, now); } newIt->Iterator = newIt; diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp index acdd648a53..ed38409713 100644 --- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp +++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp @@ -36,6 +36,7 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu const TVDiskIdShort VDiskIdShort; TActorId RemoteVDisk; TVDiskID VDiskId; + ui32 VDiskOrderNumber; NKikimrBlobStorage::EVDiskQueueId QueueId; const TDuration QueueWatchdogTimeout; ui64 CheckReadinessCookie = 1; @@ -108,6 +109,7 @@ private: Y_VERIFY(info.Type.GetErasure() == GType.GetErasure()); VDiskId = info.CreateVDiskID(VDiskIdShort); RemoteVDisk = info.GetActorId(VDiskIdShort); + VDiskOrderNumber = info.GetOrderNumber(VDiskIdShort); LogPrefix = Sprintf("[%s TargetVDisk# %s Queue# %s]", SelfId().ToString().data(), VDiskId.ToString().data(), QueueName.data()); RecentGroup = info.Group; } @@ -123,7 +125,7 @@ private: void Pump(const TActorContext &ctx) { // if in 'Running' state, then send messages to VDisk if (IsReady()) { - Queue.SendToVDisk(ctx, RemoteVDisk, this); + Queue.SendToVDisk(ctx, RemoteVDisk, VDiskOrderNumber); } } @@ -197,11 +199,7 @@ private: << " cookie# " << ev->Cookie); if (IsReady()) { - // trace wilson event if tracing is enabled for this request - WILSON_TRACE_FROM_ACTOR(ctx, *this, &ev->TraceId, EvBlobStorageQueuePut, - InQueueWaitingItems = Queue.GetItemsWaiting(), InQueueWaitingBytes = Queue.GetBytesWaiting()); - - Queue.Enqueue(ctx, ev, deadline, RemoteVDisk.NodeId() == SelfId().NodeId()); + Queue.Enqueue(ctx, ev, deadline, RemoteVDisk.NodeId() == SelfId().NodeId(), TActivationContext::Now()); Pump(ctx); UpdateRequestTrackingStats(ctx); } else { diff --git a/ydb/core/blobstorage/base/CMakeLists.txt b/ydb/core/blobstorage/base/CMakeLists.txt index 19c7abdfc7..2874ba8173 100644 --- a/ydb/core/blobstorage/base/CMakeLists.txt +++ b/ydb/core/blobstorage/base/CMakeLists.txt @@ -11,9 +11,9 @@ add_library(core-blobstorage-base) target_link_libraries(core-blobstorage-base PUBLIC contrib-libs-cxxsupp yutil + cpp-actors-wilson library-cpp-lwtrace ydb-core-protos - ydb-library-wilson ) target_sources(core-blobstorage-base PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/base/blobstorage_vdiskid.cpp diff --git a/ydb/core/blobstorage/base/wilson_events.h b/ydb/core/blobstorage/base/wilson_events.h deleted file mode 100644 index 7ed9528463..0000000000 --- a/ydb/core/blobstorage/base/wilson_events.h +++ /dev/null @@ -1,112 +0,0 @@ -#pragma once - - -#include "blobstorage_vdiskid.h" -#include <ydb/library/wilson/wilson_event.h> - -#define WILSON_TRACE_FROM_ACTOR(CTX, ACTOR, TRACE_ID, EVENT_TYPE, ...) \ - WILSON_TRACE(CTX, TRACE_ID, EVENT_TYPE, \ - ActivityType = static_cast<NKikimrServices::TActivity::EType>((ACTOR).GetActivityType()), \ - ActorId = (ACTOR).SelfId(), \ - ##__VA_ARGS__); - -#define DECLARE_ACTOR_EVENT(EVENT_TYPE, ...) \ - DECLARE_WILSON_EVENT(EVENT_TYPE, \ - (::NKikimrServices::TActivity::EType, ActivityType), \ - (::NActors::TActorId, ActorId), \ - ##__VA_ARGS__ \ - ) - -namespace NKikimr { - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // DSPROXY - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - DECLARE_ACTOR_EVENT(MultiGetReceived); - DECLARE_ACTOR_EVENT(EvGetSent); - DECLARE_ACTOR_EVENT(EvGetReceived); - DECLARE_ACTOR_EVENT(EvVGetSent); - DECLARE_ACTOR_EVENT(EvVGetReceived); - DECLARE_ACTOR_EVENT(EvVGetResultSent); - DECLARE_ACTOR_EVENT(EvVGetResultReceived, (::NWilson::TTraceId, MergedNode)); - DECLARE_ACTOR_EVENT(EvGetResultSent, (::NKikimrProto::EReplyStatus, ReplyStatus), (ui32, ResponseSize)); - DECLARE_ACTOR_EVENT(EvGetResultReceived, (::NWilson::TTraceId, MergedNode)); - DECLARE_ACTOR_EVENT(MultiGetResultSent); - - DECLARE_ACTOR_EVENT(RangeGetReceived); - DECLARE_ACTOR_EVENT(RangeGetResultSent, (::NKikimrProto::EReplyStatus, ReplyStatus), (ui32, ResponseSize)); - - DECLARE_ACTOR_EVENT(EvPutReceived, (ui32, Size), (TLogoBlobID, LogoBlobId)); - DECLARE_ACTOR_EVENT(EvVPutSent); - DECLARE_ACTOR_EVENT(EvVPutResultReceived, (::NWilson::TTraceId, MergedNode)); - DECLARE_ACTOR_EVENT(EvPutResultSent, (::NKikimrProto::EReplyStatus, ReplyStatus)); - - DECLARE_ACTOR_EVENT(EvDiscoverReceived, (ui32, GroupId), (TLogoBlobID, From), (TLogoBlobID, To)); - DECLARE_ACTOR_EVENT(EvDiscoverResultSent); - - DECLARE_ACTOR_EVENT(EvVGetBlockSent); - DECLARE_ACTOR_EVENT(EvVGetBlockResultReceived, (::NWilson::TTraceId, MergedNode)); - - DECLARE_ACTOR_EVENT(ReadBatcherStart); - DECLARE_ACTOR_EVENT(ReadBatcherFinish, (::NWilson::TTraceId, MergedNode)); - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // VDISK - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - DECLARE_ACTOR_EVENT(EvVPutReceived, (TVDiskID, VDiskId), (ui32, PDiskId), (ui32, VDiskSlotId)); - DECLARE_ACTOR_EVENT(EvPutIntoEmergQueue); - DECLARE_ACTOR_EVENT(EvVPutResultSent); - DECLARE_ACTOR_EVENT(EvVMultiPutResultSent); - - DECLARE_ACTOR_EVENT(EvChunkReadSent, (ui32, ChunkIdx), (ui32, Offset), (ui32, Size), (void*, YardCookie)); - - DECLARE_ACTOR_EVENT(EvChunkReadResultReceived, (void*, YardCookie), (::NWilson::TTraceId, MergedNode)); - - DECLARE_ACTOR_EVENT(EvHullWriteHugeBlobSent); - DECLARE_ACTOR_EVENT(EvHullLogHugeBlobReceived); - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // BS_QUEUE - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - DECLARE_ACTOR_EVENT(EvBlobStorageQueuePut, (ui64, InQueueWaitingItems), (ui64, InQueueWaitingBytes)); - DECLARE_ACTOR_EVENT(EvBlobStorageQueueForward, (ui64, InQueueWaitingItems), (ui64, InQueueWaitingBytes)); - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // SKELETON FRONT - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - DECLARE_ACTOR_EVENT(EvSkeletonFrontEnqueue); - DECLARE_ACTOR_EVENT(EvSkeletonFrontProceed); - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // YARD - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - DECLARE_WILSON_EVENT(EvChunkReadReceived, (ui32, ChunkIdx), (ui32, Offset), (ui32, Size)); - DECLARE_WILSON_EVENT(AsyncReadScheduled, (ui64, DiskOffset), (ui32, Size)); - - DECLARE_WILSON_EVENT(EvChunkWriteReceived, (ui32, ChunkIdx), (ui32, Offset), (ui32, Size)); - - DECLARE_WILSON_EVENT(EvLogReceived); - DECLARE_WILSON_EVENT(EnqueueLogWrite); - DECLARE_WILSON_EVENT(RouteLogWrite); - - DECLARE_WILSON_EVENT(BlockPwrite, (ui64, DiskOffset), (ui32, Size)); - DECLARE_WILSON_EVENT(BlockPread, (ui64, DiskOffset), (ui32, Size)); - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // LIBAIO - //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - DECLARE_WILSON_EVENT(AsyncIoInQueue); - DECLARE_WILSON_EVENT(AsyncIoFinished); - -} // NKikimr - -template<> -inline void Out<NKikimrServices::TActivity::EType>(IOutputStream& os, NKikimrServices::TActivity::EType status) { - os << NKikimrServices::TActivity::EType_Name(status); -} diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index f206d7ad21..451e7fa315 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -13,9 +13,9 @@ #include <ydb/core/blobstorage/base/batched_vec.h> #include <ydb/core/blobstorage/base/blobstorage_events.h> #include <ydb/core/blobstorage/base/transparent.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/backpressure/queue_backpressure_client.h> #include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/group_stat.h> #include <library/cpp/containers/stack_vector/stack_vec.h> @@ -140,13 +140,13 @@ public: TBlobStorageGroupRequestActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TIntrusivePtr<TGroupQueues> groupQueues, TIntrusivePtr<TBlobStorageGroupProxyMon> mon, const TActorId& source, ui64 cookie, NWilson::TTraceId traceId, NKikimrServices::EServiceKikimr logComponent, bool logAccEnabled, TMaybe<TGroupStat::EKind> latencyQueueKind, - TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, ui32 restartCounter) + TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, ui32 restartCounter, TString name) : Info(std::move(info)) , GroupQueues(std::move(groupQueues)) , Mon(std::move(mon)) , PoolCounters(storagePoolCounters) , LogCtx(logComponent, logAccEnabled) - , TraceId(std::move(traceId)) + , Span(8 /*verbosity*/, NWilson::ERelation::ChildOf, std::move(traceId), now, std::move(name)) , RestartCounter(restartCounter) , Source(source) , Cookie(cookie) @@ -155,6 +155,9 @@ public: , RacingDomains(&Info->GetTopology()) { TDerived::ActiveCounter(Mon)->Inc(); + Span + .Attribute("GroupId", Info->GroupID) + .Attribute("RestartCounter", RestartCounter); } template<typename T> @@ -258,7 +261,7 @@ public: Y_VERIFY_DEBUG(RestartCounter < 100); auto q = self.RestartQuery(RestartCounter + 1); ++*Mon->NodeMon->RestartHisto[Min<size_t>(Mon->NodeMon->RestartHisto.size() - 1, RestartCounter)]; - TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, std::move(TraceId))); + TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, Span)); PassAway(); return true; } @@ -314,11 +317,11 @@ public: } template<typename T> - void SendToQueue(std::unique_ptr<T> event, ui64 cookie, NWilson::TTraceId traceId, bool timeStatsEnabled = false) { + void SendToQueue(std::unique_ptr<T> event, ui64 cookie, bool timeStatsEnabled = false) { if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus>) { event->MessageRelevanceTracker = MessageRelevanceTracker; } - const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, std::move(traceId), + const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, Span, timeStatsEnabled); ++RequestsInFlight; } @@ -332,13 +335,12 @@ public: void SendToQueues(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets, bool timeStatsEnabled) { for (auto& request : vGets) { - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetSent); Y_VERIFY(request->Record.HasCookie()); ui64 messageCookie = request->Record.GetCookie(); CountEvent(*request); const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000; request->Record.MutableTimestamps()->SetSentByDSProxyUs(GetCycleCountFast() / cyclesPerUs); - SendToQueue(std::move(request), messageCookie, TraceId.SeparateBranch(), timeStatsEnabled); + SendToQueue(std::move(request), messageCookie, timeStatsEnabled); } } @@ -370,7 +372,6 @@ public: template <typename TEvent> void SendToQueues(TDeque<std::unique_ptr<TEvent>> &events, bool timeStatsEnabled) { for (auto& request : events) { - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVPutSent); Y_VERIFY(request->Record.HasCookie()); ui64 messageCookie = request->Record.GetCookie(); CountEvent(*request); @@ -380,18 +381,18 @@ public: TVDiskID vDiskId = VDiskIDFromVDiskID(request->Record.GetVDiskID()); LWTRACK(DSProxyPutVPutIsSent, request->Orbit, Info->GetFailDomainOrderNumber(vDiskId), Info->GroupID, id.Channel(), id.PartId(), id.ToString(), id.BlobSize()); - SendToQueue(std::move(request), messageCookie, TraceId.SeparateBranch(), timeStatsEnabled); + SendToQueue(std::move(request), messageCookie, timeStatsEnabled); } } void SendResponseAndDie(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, - ui64 cookie, NWilson::TTraceId traceId) { - SendResponse(std::move(ev), timeStats, source, cookie, std::move(traceId)); + ui64 cookie) { + SendResponse(std::move(ev), timeStats, source, cookie); PassAway(); } void SendResponseAndDie(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats = nullptr) { - SendResponseAndDie(std::move(ev), timeStats, Source, Cookie, std::move(TraceId)); + SendResponseAndDie(std::move(ev), timeStats, Source, Cookie); } TActorId GetProxyActorId() const { @@ -406,15 +407,21 @@ public: TActorBootstrapped<TDerived>::PassAway(); } - void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, ui64 cookie, - NWilson::TTraceId traceId) { + void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, ui64 cookie) { const TInstant now = TActivationContext::Now(); + NKikimrProto::EReplyStatus status; + TString errorReason; + switch (ev->Type()) { #define XX(T) \ - case TEvBlobStorage::Ev##T##Result: \ - Mon->RespStat##T->Account(static_cast<TEvBlobStorage::TEv##T##Result&>(*ev).Status); \ - break; + case TEvBlobStorage::Ev##T##Result: { \ + auto& msg = static_cast<TEvBlobStorage::TEv##T##Result&>(*ev); \ + status = msg.Status; \ + errorReason = msg.ErrorReason; \ + Mon->RespStat##T->Account(status); \ + break; \ + } XX(Put) XX(Get) @@ -450,12 +457,18 @@ public: static_cast<TEvBlobStorage::TEvGetResult&>(*ev).Sent = now; } + if (status == NKikimrProto::OK) { + Span.EndOk(); + } else { + Span.EndError(std::move(errorReason)); + } + // send the reply to original request sender - Derived().Send(source, ev.release(), 0, cookie, std::move(traceId)); + Derived().Send(source, ev.release(), 0, cookie, Span); }; void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats = nullptr) { - SendResponse(std::move(ev), timeStats, Source, Cookie, std::move(TraceId)); + SendResponse(std::move(ev), timeStats, Source, Cookie); } static double GetStartTime(const NKikimrBlobStorage::TTimestamps& timestamps) { @@ -489,7 +502,7 @@ protected: TIntrusivePtr<TBlobStorageGroupProxyMon> Mon; TIntrusivePtr<TStoragePoolCounters> PoolCounters; TLogContext LogCtx; - NWilson::TTraceId TraceId; + NWilson::TSpan Span; TStackVec<std::pair<TDiskResponsivenessTracker::TDiskId, TDuration>, 16> Responsiveness; TString ErrorReason; TMaybe<TStoragePoolCounters::EHandleClass> RequestHandleClass; @@ -565,7 +578,6 @@ IActor* CreateBlobStorageGroupIndexRestoreGetRequest(const TIntrusivePtr<TBlobSt ui64 cookie, NWilson::TTraceId traceId, TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); - IActor* CreateBlobStorageGroupDiscoverRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvDiscover *ev, @@ -584,22 +596,22 @@ IActor* CreateBlobStorageGroupMirror3of4DiscoverRequest(const TIntrusivePtr<TBlo IActor* CreateBlobStorageGroupCollectGarbageRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev, - ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); IActor* CreateBlobStorageGroupMultiCollectRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev, - ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); IActor* CreateBlobStorageGroupBlockRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvBlock *ev, - ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); IActor* CreateBlobStorageGroupStatusRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvStatus *ev, - ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters); IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp index dba0a2fd43..cf742ad23e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp @@ -88,7 +88,7 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor<TBlob Y_FAIL("unexpected newStatus# %s", NKikimrProto::EReplyStatus_Name(newStatus).data()); } for (const TVDiskID& vdiskId : queryStatus) { - SendToQueue(std::make_unique<TEvBlobStorage::TEvVStatus>(vdiskId), 0, NWilson::TTraceId()); + SendToQueue(std::make_unique<TEvBlobStorage::TEvVStatus>(vdiskId), 0); } for (const TVDiskID& vdiskId : resend) { SendBlockRequest(vdiskId); @@ -113,7 +113,7 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor<TBlob << " node# " << Info->GetActorId(vdiskId).NodeId()); auto msg = std::make_unique<TEvBlobStorage::TEvVBlock>(TabletId, Generation, vdiskId, Deadline, IssuerGuid); - SendToQueue(std::move(msg), cookie, NWilson::TTraceId()); // FIXME: wilson + SendToQueue(std::move(msg), cookie); } std::unique_ptr<IEventBase> RestartQuery(ui32 counter) { @@ -135,10 +135,11 @@ public: TBlobStorageGroupBlockRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvBlock *ev, - ui64 cookie, TInstant now, + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) - : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, NWilson::TTraceId(), - NKikimrServices::BS_PROXY_BLOCK, false, {}, now, storagePoolCounters, ev->RestartCounter) + : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), + NKikimrServices::BS_PROXY_BLOCK, false, {}, now, storagePoolCounters, ev->RestartCounter, + "DSProxy.Block") , TabletId(ev->TabletId) , Generation(ev->Generation) , Deadline(ev->Deadline) @@ -177,8 +178,8 @@ public: IActor* CreateBlobStorageGroupBlockRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvBlock *ev, - ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) { - return new TBlobStorageGroupBlockRequest(info, state, source, mon, ev, cookie, now, storagePoolCounters); + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) { + return new TBlobStorageGroupBlockRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, storagePoolCounters); } } // NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp index d12c819a74..492d57bb0b 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp @@ -91,7 +91,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc Y_FAIL("unexpected newStatus# %s", NKikimrProto::EReplyStatus_Name(newStatus).data()); } for (const TVDiskID& vdiskId : queryStatus) { - SendToQueue(std::make_unique<TEvBlobStorage::TEvVStatus>(vdiskId), 0, NWilson::TTraceId()); + SendToQueue(std::make_unique<TEvBlobStorage::TEvVStatus>(vdiskId), 0); RequestsSent++; } for (const TVDiskID& vdiskId : resend) { @@ -118,7 +118,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc const ui64 cookie = TVDiskIdShort(vdiskId).GetRaw(); auto msg = std::make_unique<TEvBlobStorage::TEvVCollectGarbage>(TabletId, RecordGeneration, PerGenerationCounter, Channel, Collect, CollectGeneration, CollectStep, Hard, Keep.get(), DoNotKeep.get(), vdiskId, Deadline); - SendToQueue(std::move(msg), cookie, NWilson::TTraceId()); // FIXME: wilson + SendToQueue(std::move(msg), cookie); RequestsSent++; } @@ -142,9 +142,10 @@ public: TBlobStorageGroupCollectGarbageRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev, ui64 cookie, - TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) - : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, NWilson::TTraceId(), - NKikimrServices::BS_PROXY_COLLECT, false, {}, now, storagePoolCounters, ev->RestartCounter) + NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) + : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), + NKikimrServices::BS_PROXY_COLLECT, false, {}, now, storagePoolCounters, ev->RestartCounter, + "DSProxy.CollectGarbage") , TabletId(ev->TabletId) , RecordGeneration(ev->RecordGeneration) , PerGenerationCounter(ev->PerGenerationCounter) @@ -204,8 +205,9 @@ public: IActor* CreateBlobStorageGroupCollectGarbageRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev, - ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) { - return new TBlobStorageGroupCollectGarbageRequest(info, state, source, mon, ev, cookie, now, storagePoolCounters); + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) { + return new TBlobStorageGroupCollectGarbageRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, + storagePoolCounters); } } // NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp index 1f810a385d..dad2ecd745 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp @@ -3,7 +3,6 @@ #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> -#include <ydb/core/blobstorage/base/wilson_events.h> namespace NKikimr { @@ -300,7 +299,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB const TDuration duration = TActivationContext::Now() - StartTime; Mon->CountDiscoverResponseTime(duration); const bool success = result->Status == NKikimrProto::OK; - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvDiscoverResultSent); LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvDiscover, 0, duration.SecondsFloat() * 1000.0, TabletId, Info->GroupID, TLogoBlobID::MaxChannel, "", success); SendResponseAndDie(std::move(result)); @@ -317,7 +315,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB void Handle(TEvBlobStorage::TEvVGetBlockResult::TPtr &ev) { ProcessReplyFromQueue(ev); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetBlockResultReceived, MergedNode = std::move(ev->TraceId)); TotalRecieved++; NKikimrBlobStorage::TEvVGetBlockResult &record = ev->Get()->Record; @@ -362,8 +359,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB ProcessReplyFromQueue(ev); CountEvent(*ev->Get()); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetResultReceived, MergedNode = std::move(ev->TraceId)); - TotalRecieved++; NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record; Y_VERIFY(record.HasStatus()); @@ -381,8 +376,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB ProcessReplyFromQueue(ev); CountEvent(*ev->Get()); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetResultReceived, MergedNode = std::move(ev->TraceId)); - TotalRecieved++; NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record; Y_VERIFY(record.HasStatus()); @@ -609,8 +602,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB getRequest->IsInternal = true; getRequest->TabletId = TabletId; getRequest->AcquireBlockedGeneration = true; - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetSent); - bool isSent = SendToBSProxy(SelfId(), Info->GroupID, getRequest.release(), 0, TraceId.SeparateBranch()); + bool isSent = SendToBSProxy(SelfId(), Info->GroupID, getRequest.release(), 0, Span); Y_VERIFY(isSent); TotalSent++; @@ -722,7 +714,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB << " msg# " << msg->ToString() << " cookie# " << cookie); CountEvent(*msg); - SendToQueue(std::move(msg), cookie, NWilson::TTraceId()); // FIXME: wilson + SendToQueue(std::move(msg), cookie); TotalSent++; curVDisk.IsMoreRequested = true; @@ -743,8 +735,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB } void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev) { - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetResultReceived, MergedNode = std::move(ev->TraceId)); - TotalRecieved++; TEvBlobStorage::TEvGetResult *msg = ev->Get(); const NKikimrProto::EReplyStatus status = msg->Status; @@ -897,7 +887,8 @@ public: ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), - NKikimrServices::BS_PROXY_DISCOVER, true, {}, now, storagePoolCounters, ev->RestartCounter) + NKikimrServices::BS_PROXY_DISCOVER, true, {}, now, storagePoolCounters, ev->RestartCounter, + "DSProxy.Discover") , TabletId(ev->TabletId) , MinGeneration(ev->MinGeneration) , ReadBody(ev->ReadBody) @@ -921,13 +912,10 @@ public: << " FromLeader# " << (FromLeader ? "true" : "false") << " RestartCounter# " << RestartCounter); - const ui32 groupId = Info->GroupID; const TLogoBlobID from = TLogoBlobID(TabletId, Max<ui32>(), Max<ui32>(), 0, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxChannel, TLogoBlobID::MaxPartId); const TLogoBlobID to = TLogoBlobID(TabletId, MinGeneration, 0, 0, 0, 0, 1); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvDiscoverReceived, GroupId = groupId, From = from, To = to); - for (const auto& vdisk : Info->GetVDisks()) { auto vd = Info->GetVDiskId(vdisk.OrderNumber); if (!IsGetBlockDone) { @@ -937,8 +925,7 @@ public: << " vDiskId# " << vd << " cookie# " << cookie << " node# " << Info->GetActorId(vd).NodeId()); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetBlockSent); - SendToQueue(std::move(getBlock), cookie, TraceId.SeparateBranch()); + SendToQueue(std::move(getBlock), cookie); TotalSent++; } @@ -953,9 +940,8 @@ public: << " node# " << Info->GetActorId(vd).NodeId() << " msg# " << msg->ToString() << " cookie# " << cookie); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetSent); CountEvent(*msg); - SendToQueue(std::move(msg), cookie, TraceId.SeparateBranch()); + SendToQueue(std::move(msg), cookie); TotalSent++; TVDiskInfo &curVDisk = VDiskInfo[TVDiskIdShort(vd).GetRaw()]; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp index 7c10b1e25a..590a92bdf0 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp @@ -462,7 +462,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters, - ev->RestartCounter) + ev->RestartCounter, "DSProxy.Discover(mirror-3-dc)") , TabletId(ev->TabletId) , MinGeneration(ev->MinGeneration) , StartTime(now) @@ -505,7 +505,7 @@ public: A_LOG_DEBUG_S("DSPDM06", "sending TEvVGetBlock# " << query->ToString()); - SendToQueue(std::move(query), 0, NWilson::TTraceId()); + SendToQueue(std::move(query), 0); ++RequestsInFlight; } } @@ -523,7 +523,7 @@ public: A_LOG_DEBUG_S("DSPDM07", "sending TEvVGet# " << msg->ToString()); CountEvent(*msg); - SendToQueue(std::move(msg), 0, NWilson::TTraceId()); + SendToQueue(std::move(msg), 0); ++RequestsInFlight; } Msgs.clear(); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp index 22412d0081..5fdc6c30d9 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp @@ -36,7 +36,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters, - ev->RestartCounter) + ev->RestartCounter, "DSProxy.Discover(mirror-3of4)") , TabletId(ev->TabletId) , MinGeneration(ev->MinGeneration) , StartTime(now) @@ -156,7 +156,7 @@ public: const TLogoBlobID to = TLogoBlobID(TabletId, MinGeneration, 0, 0, 0, 0); SendToQueue(TEvBlobStorage::TEvVGet::CreateRangeIndexQuery(state.VDiskId, Deadline, HandleClass, TEvBlobStorage::TEvVGet::EFlags::None, Nothing(), state.From, to, MaxBlobsAtOnce, nullptr, - ForceBlockedGeneration), 0, {} /*traceId*/); + ForceBlockedGeneration), 0); const EDiskState prev = std::exchange(state.State, EDiskState::READ_PENDING); Y_VERIFY(prev == EDiskState::IDLE); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp index 9b08b41861..e4043c3675 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp @@ -3,7 +3,6 @@ #include "root_cause.h" #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <library/cpp/containers/stack_vector/stack_vec.h> #include <library/cpp/digest/crc32c/crc32c.h> #include <util/generic/set.h> @@ -231,9 +230,6 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt } DiskCounters[orderNumber].Received++; - // generate wilson event with merging into trunk - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetResultReceived, MergedNode = std::move(ev->TraceId)); - TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets; TAutoPtr<TEvBlobStorage::TEvGetResult> getResult; ResponsesReceived++; @@ -351,8 +347,6 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt void HandleVPutResult(typename TPutEventResult::TPtr &ev) { Y_VERIFY(ev->Get()->Record.HasStatus()); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVPutResultReceived, MergedNode = std::move(ev->TraceId)); - const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000; ev->Get()->Record.MutableTimestamps()->SetReceivedByDSProxyUs(GetCycleCountFast() / cyclesPerUs); const auto &record = ev->Get()->Record; @@ -451,15 +445,12 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt } void SendReplyAndDie(TAutoPtr<TEvBlobStorage::TEvGetResult> &evResult) { - const NKikimrProto::EReplyStatus status = evResult->Status; const TInstant now = TActivationContext::Now(); const TDuration duration = (now > StartTime) ? (now - StartTime) : TDuration::MilliSeconds(0); Mon->CountGetResponseTime(Info->GetDeviceType(), GetImpl.GetHandleClass(), evResult->PayloadSizeBytes(), duration); *Mon->ActiveGetCapacity -= ReportedBytes; ReportedBytes = 0; bool success = evResult->Status == NKikimrProto::OK; - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetResultSent, ReplyStatus = status, - ResponseSize = GetImpl.GetReplyBytes()); ui64 requestSize = 0; ui64 tabletId = 0; ui32 channel = 0; @@ -502,7 +493,7 @@ public: TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, bool isVMultiPutMode) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_GET, ev->IsVerboseNoDataEnabled || ev->CollectDebugInfo, - latencyQueueKind, now, storagePoolCounters, ev->RestartCounter) + latencyQueueKind, now, storagePoolCounters, ev->RestartCounter, "DSProxy.Get") , GetImpl(info, state, ev, std::move(nodeLayout), LogCtx.RequestPrefix) , Orbit(std::move(ev->Orbit)) , Deadline(ev->Deadline) @@ -537,7 +528,6 @@ public: << " RestartCounter# " << RestartCounter); LWTRACK(DSProxyGetBootstrap, Orbit); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetReceived); TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets; TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h index 4908ae9bda..815ec33c28 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h @@ -5,7 +5,6 @@ #include "dsproxy_cookies.h" #include "dsproxy_mon.h" #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <util/generic/set.h> namespace NKikimr { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h index 25eee78a6e..324f7d1d1f 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h @@ -219,7 +219,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> if (rate) { const ui64 num = RandomNumber<ui64>(1000000); // in range [0, 1000000) if (num < rate) { - ev->TraceId = NWilson::TTraceId::NewTraceId(); + ev->TraceId = NWilson::TTraceId::NewTraceId(15, 4095); } } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp index e1a634e559..1b4868cf26 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp @@ -3,7 +3,6 @@ #include "dsproxy_quorum_tracker.h" #include "dsproxy_blob_tracker.h" #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <util/generic/set.h> namespace NKikimr { @@ -261,7 +260,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_INDEXRESTOREGET, false, latencyQueueKind, now, storagePoolCounters, - ev->RestartCounter) + ev->RestartCounter, "DSProxy.IndexRestoreGet") , QuerySize(ev->QuerySize) , Queries(ev->Queries.Release()) , Deadline(ev->Deadline) @@ -285,8 +284,6 @@ public: } void Bootstrap() { - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetReceived); - auto makeQueriesList = [this] { TStringStream str; str << "{"; @@ -314,7 +311,7 @@ public: if (vget) { const ui64 cookie = TVDiskIdShort(vd).GetRaw(); CountEvent(*vget); - SendToQueue(std::move(vget), cookie, NWilson::TTraceId()); // FIXME: wilson + SendToQueue(std::move(vget), cookie); vget.reset(); ++VGetsInFlight; } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp index 54a74eba58..dfa437be49 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp @@ -93,9 +93,10 @@ public: TBlobStorageGroupMultiCollectRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev, ui64 cookie, - TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) - : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, NWilson::TTraceId(), - NKikimrServices::BS_PROXY_MULTICOLLECT, false, {}, now, storagePoolCounters, 0) + NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) + : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), + NKikimrServices::BS_PROXY_MULTICOLLECT, false, {}, now, storagePoolCounters, 0, + "DSProxy.MultiCollect") , TabletId(ev->TabletId) , RecordGeneration(ev->RecordGeneration) , PerGenerationCounter(ev->PerGenerationCounter) @@ -154,7 +155,7 @@ public: R_LOG_DEBUG_S("BPMC3", "SendRequest idx# " << idx << " isLast# " << isLast << " ev# " << ev->ToString()); - SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie); + SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span); if (isLast) { CollectRequestsInFlight++; @@ -205,8 +206,8 @@ public: IActor* CreateBlobStorageGroupMultiCollectRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev, - ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) { - return new TBlobStorageGroupMultiCollectRequest(info, state, source, mon, ev, cookie, now, + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) { + return new TBlobStorageGroupMultiCollectRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, storagePoolCounters); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp index 02d7849855..dd4ab92d58 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp @@ -1,7 +1,6 @@ #include "dsproxy.h" #include "dsproxy_mon.h" -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/vdisk/query/query_spacetracker.h> #include <util/generic/set.h> @@ -39,8 +38,6 @@ class TBlobStorageGroupMultiGetRequest : public TBlobStorageGroupRequestActor<TB void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev) { RequestsInFlight--; - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetResultReceived, MergedNode = std::move(ev->TraceId)); - const TEvBlobStorage::TEvGetResult &res = *ev->Get(); if (res.Status != NKikimrProto::OK) { R_LOG_ERROR_S("BPMG1", "Handle TEvGetResult status# " << NKikimrProto::EReplyStatus_Name(res.Status)); @@ -72,7 +69,6 @@ class TBlobStorageGroupMultiGetRequest : public TBlobStorageGroupRequestActor<TB } ev->ErrorReason = ErrorReason; Mon->CountGetResponseTime(Info->GetDeviceType(), GetHandleClass, ev->PayloadSizeBytes(), TActivationContext::Now() - StartTime); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, MultiGetResultSent); Y_VERIFY(status != NKikimrProto::OK); SendResponseAndDie(std::move(ev)); } @@ -96,7 +92,8 @@ public: NWilson::TTraceId traceId, TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), - NKikimrServices::BS_PROXY_MULTIGET, false, latencyQueueKind, now, storagePoolCounters, 0) + NKikimrServices::BS_PROXY_MULTIGET, false, latencyQueueKind, now, storagePoolCounters, 0, + "DSProxy.MultiGet") , QuerySize(ev->QuerySize) , Queries(ev->Queries.Release()) , Deadline(ev->Deadline) @@ -125,22 +122,18 @@ public: void SendRequests() { for (; RequestsInFlight < MaxRequestsInFlight && !PendingGets.empty(); ++RequestsInFlight, PendingGets.pop_front()) { auto& [ev, cookie] = PendingGets.front(); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetSent); - SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, TraceId.SeparateBranch()); + SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span); } if (!RequestsInFlight && PendingGets.empty()) { auto ev = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, 0, Info->GroupID); ev->ResponseSz = QuerySize; ev->Responses = std::move(Responses); Mon->CountGetResponseTime(Info->GetDeviceType(), GetHandleClass, ev->PayloadSizeBytes(), TActivationContext::Now() - StartTime); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, MultiGetResultSent); SendResponseAndDie(std::move(ev)); } } void Bootstrap() { - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, MultiGetReceived); - auto dumpQuery = [this] { TStringStream str; str << "{"; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp index fcced0b6f8..b246214495 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp @@ -93,7 +93,7 @@ public: const TActorId &proxyId, bool useVPatch = false) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_PATCH, false, {}, now, storagePoolCounters, - ev->RestartCounter) + ev->RestartCounter, "DSProxy.Patch") , ProxyActorId(proxyId) , OriginalGroupId(ev->OriginalGroupId) , OriginalId(ev->OriginalId) @@ -138,7 +138,6 @@ public: void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev) { TEvBlobStorage::TEvGetResult *result = ev->Get(); Orbit = std::move(result->Orbit); - TraceId = std::move(ev->TraceId); ui32 patchedIdHash = PatchedId.Hash(); bool incorrectCookie = ev->Cookie != patchedIdHash; @@ -173,13 +172,12 @@ public: std::unique_ptr<TEvBlobStorage::TEvPut> put = std::make_unique<TEvBlobStorage::TEvPut>(PatchedId, Buffer, Deadline, NKikimrBlobStorage::AsyncBlob, TEvBlobStorage::TEvPut::TacticDefault); put->Orbit = std::move(Orbit); - Send(ProxyActorId, put.release(), 0, OriginalId.Hash(), std::move(TraceId)); + Send(ProxyActorId, put.release(), 0, OriginalId.Hash(), Span); } void Handle(TEvBlobStorage::TEvPutResult::TPtr &ev) { TEvBlobStorage::TEvPutResult *result = ev->Get(); Orbit = std::move(result->Orbit); - TraceId = std::move(ev->TraceId); StatusFlags = result->StatusFlags; ApproximateFreeSpaceShare = result->ApproximateFreeSpaceShare; @@ -224,7 +222,6 @@ public: NKikimrBlobStorage::TEvVMovedPatchResult &record = result->Record; PullOutStatusFlagsAndFressSpace(record); Orbit = std::move(result->Orbit); - TraceId = std::move(ev->TraceId); ui64 expectedCookie = ((ui64)OriginalId.Hash() << 32) | PatchedId.Hash(); bool incorrectCookie = ev->Cookie != expectedCookie; @@ -531,9 +528,9 @@ public: NKikimrBlobStorage::AsyncRead); get->Orbit = std::move(Orbit); if (OriginalGroupId == Info->GroupID) { - Send(ProxyActorId, get.release(), 0, PatchedId.Hash(), std::move(TraceId)); + Send(ProxyActorId, get.release(), 0, PatchedId.Hash(), Span); } else { - SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), std::move(TraceId)); + SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), Span); } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index d9e0e2b320..df51c9a78e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -5,7 +5,6 @@ #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <util/generic/ymath.h> #include <util/system/datetime.h> @@ -142,8 +141,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt "BPP01", "received " << ev->Get()->ToString() << " from# " << VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID())); ProcessReplyFromQueue(ev); - // generate wilson event about request completion - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVPutResultReceived, MergedNode = std::move(ev->TraceId)); ResponsesReceived++; const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000; @@ -220,8 +217,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt void Handle(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev) { ProcessReplyFromQueue(ev); - // generate wilson event about request completion - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVPutResultReceived, MergedNode = std::move(ev->TraceId)); ResponsesReceived++; const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000; @@ -363,7 +358,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt const TDuration duration = TActivationContext::Now() - StartTime; TLogoBlobID blobId = putResult->Id; TLogoBlobID origBlobId = TLogoBlobID(blobId, 0); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &ItemsInfo[blobIdx].TraceId, EvPutResultSent, ReplyStatus = status); Mon->CountPutPesponseTime(Info->GetDeviceType(), HandleClass, ItemsInfo[blobIdx].BufferSize, duration); *Mon->ActivePutCapacity -= ReportedBytes; Y_VERIFY(PutImpl.GetHandoffPartsSent() <= Info->Type.TotalPartCount() * MaxHandoffNodes * ItemsInfo.size()); @@ -384,8 +378,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr); } else { SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr, - ItemsInfo[blobIdx].Recipient, ItemsInfo[blobIdx].Cookie, - std::move(ItemsInfo[blobIdx].TraceId)); + ItemsInfo[blobIdx].Recipient, ItemsInfo[blobIdx].Cookie); // FIXME about traces ItemsInfo[blobIdx].Replied = true; } } @@ -452,7 +445,7 @@ public: bool enableRequestMod3x3ForMinLatecy) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_PUT, false, latencyQueueKind, now, storagePoolCounters, - ev->RestartCounter) + ev->RestartCounter, "DSProxy.Put") , PutImpl(info, state, ev, mon, enableRequestMod3x3ForMinLatecy) , WaitingVDiskResponseCount(info->GetTotalVDisksNum()) , Deadline(ev->Deadline) @@ -497,7 +490,7 @@ public: bool enableRequestMod3x3ForMinLatecy) : TBlobStorageGroupRequestActor(info, state, mon, TActorId(), 0, NWilson::TTraceId(), NKikimrServices::BS_PROXY_PUT, false, latencyQueueKind, now, storagePoolCounters, - MaxRestartCounter(events)) + MaxRestartCounter(events), "DSProxy.Put") , PutImpl(info, state, events, mon, handleClass, tactic, enableRequestMod3x3ForMinLatecy) , WaitingVDiskResponseCount(info->GetTotalVDisksNum()) , IsManyPuts(true) @@ -564,10 +557,6 @@ public: Timer.Reset(); - // TODO: how correct rewrite this? - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvPutReceived, Size = RequestBytes, - LogoBlobId = ItemsInfo[0].BlobId); - double wilsonSec = Timer.PassedReset(); const ui32 totalParts = Info->Type.TotalPartCount(); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index 34f675032b..b8b74c1cde 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -10,7 +10,6 @@ #include "dsproxy_strategy_put_m3dc.h" #include "dsproxy_strategy_put_m3of4.h" #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <util/generic/set.h> namespace NKikimr { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp index e234f8c18e..319e7e11ca 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp @@ -4,7 +4,6 @@ #include "dsproxy_blob_tracker.h" #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <library/cpp/pop_count/popcount.h> @@ -51,7 +50,6 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob size += resp.Buffer.size(); } Mon->CountRangeResponseTime(TActivationContext::Now() - StartTime); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, RangeGetResultSent, ReplyStatus = reply->Status, ResponseSize = size); SendResponseAndDie(std::move(reply)); } @@ -65,9 +63,8 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob msg->Record.SetSuppressBarrierCheck(true); // trace message and send it to queue - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetSent); CountEvent(*msg); - SendToQueue(std::move(msg), 0, TraceId.SeparateBranch()); + SendToQueue(std::move(msg), 0); // add pending count ++NumVGetsPending; @@ -88,8 +85,6 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob << " VDiskId# " << vdisk << " TEvVGetResult# " << ev->Get()->ToString()); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetResultReceived, MergedNode = std::move(ev->TraceId)); - Y_VERIFY(NumVGetsPending > 0); --NumVGetsPending; @@ -252,8 +247,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob A_LOG_DEBUG_S("DSR08", "sending TEvGet# " << get->ToString()); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetSent); - SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0, TraceId.SeparateBranch()); + SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0, Span); // switch state Become(&TThis::StateGet); @@ -271,8 +265,6 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob } void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev) { - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetResultReceived, MergedNode = std::move(ev->TraceId)); - TEvBlobStorage::TEvGetResult &getResult = *ev->Get(); NKikimrProto::EReplyStatus status = getResult.Status; if (status != NKikimrProto::OK) { @@ -346,7 +338,7 @@ public: TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_RANGE, false, {}, now, storagePoolCounters, - ev->RestartCounter) + ev->RestartCounter, "DSProxy.Range") , TabletId(ev->TabletId) , From(ev->From) , To(ev->To) @@ -370,8 +362,6 @@ public: << " ForceBlockedGeneration# " << ForceBlockedGeneration << " RestartCounter# " << RestartCounter); - WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, RangeGetReceived); - // ensure we are querying ranges for the same tablet Y_VERIFY(TabletId == From.TabletID()); Y_VERIFY(TabletId == To.TabletID()); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp index 49986464a2..3ef82b2c92 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp @@ -144,9 +144,8 @@ namespace NKikimr { void TBlobStorageGroupProxy::HandleNormal(TEvBlobStorage::TEvBlock::TPtr &ev) { EnsureMonitoring(ev->Get()->IsMonitored); Mon->EventBlock->Inc(); - const TActorId reqID = Register( - CreateBlobStorageGroupBlockRequest(Info, Sessions->GroupQueues, ev->Sender, Mon, - ev->Get(), ev->Cookie, TActivationContext::Now(), StoragePoolCounters)); + const TActorId reqID = Register(CreateBlobStorageGroupBlockRequest(Info, Sessions->GroupQueues, ev->Sender, Mon, + ev->Get(), ev->Cookie, std::move(ev->TraceId), TActivationContext::Now(), StoragePoolCounters)); ActiveRequests.insert(reqID); } @@ -206,15 +205,15 @@ namespace NKikimr { if (!ev->Get()->IsMultiCollectAllowed || ev->Get()->PerGenerationCounterStepSize() == 1) { Mon->EventCollectGarbage->Inc(); - const TActorId reqID = Register( - CreateBlobStorageGroupCollectGarbageRequest(Info, Sessions->GroupQueues, ev->Sender, Mon, - ev->Get(), ev->Cookie, TActivationContext::Now(), StoragePoolCounters)); + const TActorId reqID = Register(CreateBlobStorageGroupCollectGarbageRequest(Info, Sessions->GroupQueues, + ev->Sender, Mon, ev->Get(), ev->Cookie, std::move(ev->TraceId), TActivationContext::Now(), + StoragePoolCounters)); ActiveRequests.insert(reqID); } else { Mon->EventMultiCollect->Inc(); - const TActorId reqID = Register( - CreateBlobStorageGroupMultiCollectRequest(Info, Sessions->GroupQueues, ev->Sender, Mon, - ev->Get(), ev->Cookie, TActivationContext::Now(), StoragePoolCounters)); + const TActorId reqID = Register(CreateBlobStorageGroupMultiCollectRequest(Info, Sessions->GroupQueues, + ev->Sender, Mon, ev->Get(), ev->Cookie, std::move(ev->TraceId), TActivationContext::Now(), + StoragePoolCounters)); ActiveRequests.insert(reqID); } } @@ -227,9 +226,8 @@ namespace NKikimr { } EnsureMonitoring(true); Mon->EventStatus->Inc(); - const TActorId reqID = Register( - CreateBlobStorageGroupStatusRequest(Info, Sessions->GroupQueues, ev->Sender, Mon, - ev->Get(), ev->Cookie, TActivationContext::Now(), StoragePoolCounters)); + const TActorId reqID = Register(CreateBlobStorageGroupStatusRequest(Info, Sessions->GroupQueues, ev->Sender, Mon, + ev->Get(), ev->Cookie, std::move(ev->TraceId), TActivationContext::Now(), StoragePoolCounters)); ActiveRequests.insert(reqID); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp index 2b4ec35fd5..979cd28767 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp @@ -79,10 +79,10 @@ public: TBlobStorageGroupStatusRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvStatus *ev, - ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) - : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, NWilson::TTraceId(), + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) + : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_STATUS, false, {}, now, storagePoolCounters, - ev->RestartCounter) + ev->RestartCounter, "DSProxy.Status") , Deadline(ev->Deadline) , Requests(0) , Responses(0) @@ -105,7 +105,7 @@ public: << " node# " << Info->GetActorId(vd).NodeId()); auto msg = std::make_unique<TEvBlobStorage::TEvVStatus>(vd); - SendToQueue(std::move(msg), cookie, NWilson::TTraceId()); // FIXME: wilson + SendToQueue(std::move(msg), cookie); ++Requests; } @@ -129,8 +129,8 @@ public: IActor* CreateBlobStorageGroupStatusRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, const TActorId &source, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvStatus *ev, - ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) { - return new TBlobStorageGroupStatusRequest(info, state, source, mon, ev, cookie, now, storagePoolCounters); + ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) { + return new TBlobStorageGroupStatusRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, storagePoolCounters); } } // NKikimr diff --git a/ydb/core/blobstorage/dsproxy/root_cause.h b/ydb/core/blobstorage/dsproxy/root_cause.h index 5126190cc8..0a1049e8ee 100644 --- a/ydb/core/blobstorage/dsproxy/root_cause.h +++ b/ydb/core/blobstorage/dsproxy/root_cause.h @@ -3,7 +3,6 @@ #include "defs.h" #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/base/appdata.h> namespace NKikimr { diff --git a/ydb/core/blobstorage/pdisk/CMakeLists.txt b/ydb/core/blobstorage/pdisk/CMakeLists.txt index 2ac16b5fcd..80196bdbd4 100644 --- a/ydb/core/blobstorage/pdisk/CMakeLists.txt +++ b/ydb/core/blobstorage/pdisk/CMakeLists.txt @@ -15,6 +15,7 @@ target_link_libraries(core-blobstorage-pdisk PUBLIC cpp-actors-core cpp-actors-protos cpp-actors-util + cpp-actors-wilson cpp-containers-stack_vector library-cpp-lwtrace monlib-dynamic_counters-percentile @@ -32,7 +33,6 @@ target_link_libraries(core-blobstorage-pdisk PUBLIC ydb-library-schlab library-schlab-mon library-schlab-schine - ydb-library-wilson tools-enum_parser-enum_serialization_runtime ) target_sources(core-blobstorage-pdisk PRIVATE diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp index d92b19c6cc..5ff29d8685 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp @@ -16,7 +16,6 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/base/counters.h> #include <ydb/core/blobstorage/base/html.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/base/blobstorage_events.h> #include <ydb/core/blobstorage/crypto/secured_block.h> #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h index 650b2e6fc5..7469ec76d8 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h @@ -10,7 +10,6 @@ #include <ydb/library/pdisk_io/aio.h> #include <ydb/library/pdisk_io/drivedata.h> #include <ydb/library/pdisk_io/sector_map.h> -#include <ydb/library/wilson/wilson_event.h> namespace NActors { class TActorSystem; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp index a86f62bef1..d80dae0868 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp @@ -9,7 +9,6 @@ #include "blobstorage_pdisk_util_idlecounter.h" #include <ydb/core/base/appdata.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> #include <ydb/core/protos/services.pb.h> #include <ydb/core/util/cache.h> @@ -943,9 +942,6 @@ protected: REQUEST_VALGRIND_CHECK_MEM_IS_ADDRESSABLE(data, size); } - if (ActorSystem) { - WILSON_TRACE(*ActorSystem, traceId, BlockPread, DiskOffset = offset, Size = size); - } IAsyncIoOperation* op = IoContext->CreateAsyncIoOperation(completionAction, reqId, traceId); IoContext->PreparePRead(op, data, size, offset); Submit(op); @@ -963,9 +959,6 @@ protected: REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(data, size); } - if (ActorSystem) { - WILSON_TRACE(*ActorSystem, traceId, BlockPwrite, DiskOffset = offset, Size = size); - } IAsyncIoOperation* op = IoContext->CreateAsyncIoOperation(completionAction, reqId, traceId); IoContext->PreparePWrite(op, const_cast<void*>(data), size, offset); Submit(op); diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h index 148856c3d5..393c7f021c 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h @@ -1,11 +1,11 @@ #pragma once #include <ydb/library/pdisk_io/aio.h> -#include <ydb/library/wilson/wilson_event.h> #include <util/system/hp_timer.h> #include <util/generic/string.h> #include <library/cpp/lwtrace/shuttle.h> +#include <library/cpp/actors/wilson/wilson_span.h> namespace NKikimr::NPDisk { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index d00fc0f220..0707ce828f 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -973,7 +973,6 @@ TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> & ui64 readOffset = Format.Offset(read->ChunkIdx, read->FirstSector, currentSectorOffset); // TODO: Get this from the drive - WILSON_TRACE(*ActorSystem, &read->TraceId, AsyncReadScheduled, DiskOffset = readOffset, Size = bytesToRead); THolder<TCompletionChunkReadPart> completion(new TCompletionChunkReadPart(this, read, bytesToRead, payloadBytesToRead, payloadOffset, read->FinalCompletion, isTheLastPart, Cfg->UseT1ha0HashInFooter)); completion->CostNs = DriveModel.TimeForSizeNs(bytesToRead, read->ChunkIdx, TDriveModel::OP_TYPE_READ); @@ -2297,7 +2296,6 @@ void TPDisk::PrepareLogError(TLogWrite *logWrite, TStringStream& err, NKikimrPro logWrite->Result.Reset(new NPDisk::TEvLogResult(status, GetStatusFlags(logWrite->Owner, logWrite->OwnerGroupType), err.Str())); logWrite->Result->Results.push_back(NPDisk::TEvLogResult::TRecord(logWrite->Lsn, logWrite->Cookie)); - WILSON_TRACE(*ActorSystem, &logWrite->TraceId, EnqueueLogWrite); } NKikimrProto::EReplyStatus TPDisk::CheckOwnerAndRound(TRequestBase* req, TStringStream& err) { @@ -2533,7 +2531,6 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) { log->SetOwnerGroupType(ownerData.IsStaticGroupOwner()); ownerData.SetLastSeenLsn(log->Lsn); ownerData.WriteThroughput.Increment(log->Data.size(), ActorSystem->Timestamp()); - WILSON_TRACE(*ActorSystem, &log->TraceId, EnqueueLogWrite); break; } case ERequestType::RequestYardInit: @@ -2832,7 +2829,6 @@ void TPDisk::RouteRequest(TRequestBase *request) { if (log->Signature.HasCommitRecord()) { JointCommits.push_back(log); } - WILSON_TRACE(*ActorSystem, &log->TraceId, RouteLogWrite); log = batch; } break; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h index a9b910cb0c..c8487df8a6 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h @@ -21,7 +21,6 @@ #include <ydb/core/node_whiteboard/node_whiteboard.h> #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/control/immediate_control_board_wrapper.h> #include <ydb/library/schlab/schine/scheduler.h> #include <ydb/library/schlab/schine/job_kind.h> diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h index af5402a4f8..737ad571c7 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h @@ -2,7 +2,6 @@ #include "defs.h" -#include <ydb/core/blobstorage/base/wilson_events.h> #include "blobstorage_pdisk.h" #include "blobstorage_pdisk_gate.h" @@ -217,7 +216,6 @@ public: if (ev.Data.size() > (1 << 20)) { Mon->WriteHugeLog.CountRequest(); } - // WILSON_TRACE(*ActorSystem, &traceId, EvLogReceived); // TODO return NewRequest(new TLogWrite(ev, sender, AtomicGet(*EstimatedLogChunkIdx), reqId, std::move(traceId)), &burstMs); } @@ -229,7 +227,6 @@ public: Mon->QueueRequests->Inc(); *Mon->QueueBytes += ev.Size; Mon->GetReadCounter(ev.PriorityClass)->CountRequest(ev.Size); - WILSON_TRACE(*ActorSystem, &traceId, EvChunkReadReceived, ChunkIdx = ev.ChunkIdx, Offset = ev.Offset, Size = ev.Size); auto read = new TChunkRead(ev, sender, reqId, std::move(traceId)); read->SelfPointer = read; return NewRequest(read, &burstMs); @@ -245,7 +242,6 @@ public: ev.Validate(); *Mon->QueueBytes += size; Mon->GetWriteCounter(ev.PriorityClass)->CountRequest(size); - WILSON_TRACE(*ActorSystem, &traceId, EvChunkWriteReceived, ChunkIdx = ev.ChunkIdx, Offset = ev.Offset, Size = size); return NewRequest(new TChunkWrite(ev, sender, reqId, std::move(traceId)), &burstMs); } }; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.cpp index e6e279c4a5..d7731c4e8b 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.cpp @@ -14,7 +14,6 @@ #include "blobstorage_pdisk_util_countedqueueoneone.h" #include "blobstorage_pdisk_writer.h" -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> #include <ydb/core/util/queue_oneone_inplace.h> diff --git a/ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp b/ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp index 4d770a4037..f26ac7bc31 100644 --- a/ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp +++ b/ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp @@ -36,7 +36,7 @@ namespace NKikimr { SetRacingGroupInfo(record, Result->Record, GroupInfo); LOG_DEBUG(ctx, BS_VDISK_OTHER, VDISKP(VCtx->VDiskLogPrefix, "TEvVStatusResult Request# {%s} Response# {%s}", SingleLineProto(record).data(), SingleLineProto(Result->Record).data())); - SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, Ev->Cookie, Ev->GetChannel()); + SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, Ev->GetChannel()); Die(ctx); return; } @@ -71,7 +71,7 @@ namespace NKikimr { ctx.Send(NotifyId, new TEvents::TEvActorDied()); LOG_DEBUG(ctx, BS_VDISK_GET, VDISKP(VCtx->VDiskLogPrefix, "TEvVStatusResult")); - SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, Ev->Cookie, Ev->GetChannel()); + SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, Ev->GetChannel()); Die(ctx); } } diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp index 384931d305..6555ca4631 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp +++ b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp @@ -1,22 +1,19 @@ #include "vdisk_response.h" #include "vdisk_events.h" -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/base/interconnect_channels.h> #include <util/system/datetime.h> namespace NKikimr { -void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, const IActor& actor, - ui64 cookie) { +void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie) { ui32 channel = TInterconnectChannels::IC_BLOBSTORAGE; if (TEvVResultBase *base = dynamic_cast<TEvVResultBase *>(ev)) { channel = base->GetChannelToSend(); } - SendVDiskResponse(ctx, recipient, ev, actor, cookie, channel); + SendVDiskResponse(ctx, recipient, ev, cookie, channel); } -void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, const IActor& actor, - ui64 cookie, ui32 channel) { +void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel) { NWilson::TTraceId traceId; switch (const ui32 type = ev->Type()) { @@ -24,7 +21,6 @@ void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEve case TEvBlobStorage::T::EventType: { \ TEvBlobStorage::T *event = static_cast<TEvBlobStorage::T *>(ev); \ traceId = std::move(event->TraceId); \ - WILSON_TRACE_FROM_ACTOR(ctx, actor, &traceId, EV); \ const double usPerCycle = 1000000.0 / NHPTimer::GetCyclesPerSecond(); \ event->Record.MutableTimestamps()->SetSentByVDiskUs(GetCycleCountFast() * usPerCycle); \ break; \ diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_response.h b/ydb/core/blobstorage/vdisk/common/vdisk_response.h index 1ac0452b8e..6ecc700ac7 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_response.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_response.h @@ -3,10 +3,8 @@ namespace NKikimr { -void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, const IActor& actor, - ui64 cookie); +void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie); -void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, const IActor& actor, - ui64 cookie, ui32 channel); +void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel); }//NKikimr diff --git a/ydb/core/blobstorage/vdisk/query/query_barrier.cpp b/ydb/core/blobstorage/vdisk/query/query_barrier.cpp index c469bb8f40..dc63806674 100644 --- a/ydb/core/blobstorage/vdisk/query/query_barrier.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_barrier.cpp @@ -33,7 +33,7 @@ namespace NKikimr { LOG_DEBUG(ctx, BS_VDISK_GC, VDISKP(HullCtx->VCtx->VDiskLogPrefix, "TEvVGetBarrierResult: %s", Result->ToString().data())); - SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, Ev->Cookie); + SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie); ctx.Send(ParentId, new TEvents::TEvActorDied); Die(ctx); } diff --git a/ydb/core/blobstorage/vdisk/query/query_base.h b/ydb/core/blobstorage/vdisk/query/query_base.h index 092b93964f..54391bea8d 100644 --- a/ydb/core/blobstorage/vdisk/query/query_base.h +++ b/ydb/core/blobstorage/vdisk/query/query_base.h @@ -5,7 +5,6 @@ #include "query_spacetracker.h" #include <ydb/core/blobstorage/vdisk/hulldb/hull_ds_all_snap.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_response.h> -#include <ydb/core/blobstorage/base/wilson_events.h> namespace NKikimr { @@ -110,7 +109,7 @@ namespace NKikimr { ctx.Send(ReplSchedulerId, new TEvBlobStorage::TEvEnrichNotYet(BatcherCtx->OrigEv, std::move(Result))); } else { // send reply event to sender - SendVDiskResponse(ctx, BatcherCtx->OrigEv->Sender, Result.release(), *self, BatcherCtx->OrigEv->Cookie); + SendVDiskResponse(ctx, BatcherCtx->OrigEv->Sender, Result.release(), BatcherCtx->OrigEv->Cookie); } ctx.Send(ParentId, new TEvents::TEvActorDied); diff --git a/ydb/core/blobstorage/vdisk/query/query_dumpdb.h b/ydb/core/blobstorage/vdisk/query/query_dumpdb.h index bba8a7fbdc..620903bfcc 100644 --- a/ydb/core/blobstorage/vdisk/query/query_dumpdb.h +++ b/ydb/core/blobstorage/vdisk/query/query_dumpdb.h @@ -47,7 +47,7 @@ namespace NKikimr { // send result Result->SetResult(str.Str()); - SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, Ev->Cookie); + SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie); TThis::Die(ctx); } diff --git a/ydb/core/blobstorage/vdisk/query/query_extr.cpp b/ydb/core/blobstorage/vdisk/query/query_extr.cpp index 196b6d2054..6315bd47f2 100644 --- a/ydb/core/blobstorage/vdisk/query/query_extr.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_extr.cpp @@ -305,7 +305,6 @@ namespace NKikimr { } void HandleReadCompletion(TEvents::TEvCompleted::TPtr& ev, const TActorContext &ctx) { - WILSON_TRACE_FROM_ACTOR(ctx, *this, &Result->TraceId, ReadBatcherFinish, MergedNode = std::move(ev->TraceId)); ActiveActors.Erase(ev->Sender); Finish(ctx); } @@ -346,9 +345,8 @@ namespace NKikimr { SendResponseAndDie(ctx, this); } else { ui8 priority = PDiskPriority(); - WILSON_TRACE_FROM_ACTOR(ctx, *this, &Result->TraceId, ReadBatcherStart); - std::unique_ptr<IActor> a(Batcher.CreateAsyncDataReader(ctx.SelfID, priority, Result->TraceId.SeparateBranch(), - IsRepl())); + std::unique_ptr<IActor> a(Batcher.CreateAsyncDataReader(ctx.SelfID, priority, std::move(Result->TraceId), + IsRepl(), TActivationContext::Now())); if (a) { auto aid = ctx.Register(a.release()); ActiveActors.Insert(aid); diff --git a/ydb/core/blobstorage/vdisk/query/query_public.cpp b/ydb/core/blobstorage/vdisk/query/query_public.cpp index 25761b685e..db91316a88 100644 --- a/ydb/core/blobstorage/vdisk/query/query_public.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_public.cpp @@ -152,14 +152,13 @@ namespace NKikimr { const TVDiskContextPtr &vctx, const TActorContext &ctx, TEvBlobStorage::TEvVDbStat::TPtr &ev, - std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result, - const IActor& actor) + std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result) { result->SetError(); LOG_DEBUG(ctx, NKikimrServices::BS_VDISK_OTHER, VDISKP(vctx->VDiskLogPrefix, "TEvVDbStatResult: %s", result->ToString().data())); - SendVDiskResponse(ctx, ev->Sender, result.release(), actor, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie); } template <class TKey, class TMemRec> @@ -169,8 +168,7 @@ namespace NKikimr { TLevelIndexSnapshot<TKey, TMemRec> &&levelSnap, const TActorId &parentId, TEvBlobStorage::TEvVDbStat::TPtr &ev, - std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result, - const IActor& actor) + std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result) { const NKikimrBlobStorage::TEvVDbStat &record = ev->Get()->Record; switch (record.GetAction()) { @@ -183,7 +181,7 @@ namespace NKikimr { return new TStatActor(hullCtx, parentId, std::move(levelSnap), ev, std::move(result)); } default: { - DbStatError(hullCtx->VCtx, ctx, ev, std::move(result), actor); + DbStatError(hullCtx->VCtx, ctx, ev, std::move(result)); return nullptr; } } @@ -201,29 +199,27 @@ namespace NKikimr { THullDsSnap &&fullSnap, const TActorId &parentId, TEvBlobStorage::TEvVDbStat::TPtr &ev, - std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result, - const IActor& actor) { + std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result) { const NKikimrBlobStorage::TEvVDbStat &record = ev->Get()->Record; switch (record.GetType()) { - case NKikimrBlobStorage::StatLogoBlobs: { - return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.LogoBlobsSnap), parentId, ev, std::move(result), actor); - } - case NKikimrBlobStorage::StatBlocks: { - return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.BlocksSnap), parentId, ev, std::move(result), actor); - } - case NKikimrBlobStorage::StatBarriers: { - return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.BarriersSnap), parentId, ev, std::move(result), actor); - } - case NKikimrBlobStorage::StatTabletType: { + case NKikimrBlobStorage::StatLogoBlobs: + return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.LogoBlobsSnap), parentId, ev, std::move(result)); + + case NKikimrBlobStorage::StatBlocks: + return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.BlocksSnap), parentId, ev, std::move(result)); + + case NKikimrBlobStorage::StatBarriers: + return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.BarriersSnap), parentId, ev, std::move(result)); + + case NKikimrBlobStorage::StatTabletType: return CreateTabletStatActor(hullCtx, parentId, std::move(fullSnap), ev, std::move(result)); - } - case NKikimrBlobStorage::StatHugeType: { + + case NKikimrBlobStorage::StatHugeType: return CreateHugeStatActor(hullCtx, hugeBlobCtx, parentId, std::move(fullSnap), ev, std::move(result)); - } - default: { - DbStatError(hullCtx->VCtx, ctx, ev, std::move(result), actor); + + default: + DbStatError(hullCtx->VCtx, ctx, ev, std::move(result)); return nullptr; - } } } diff --git a/ydb/core/blobstorage/vdisk/query/query_public.h b/ydb/core/blobstorage/vdisk/query/query_public.h index 38e498ad71..6367ef107e 100644 --- a/ydb/core/blobstorage/vdisk/query/query_public.h +++ b/ydb/core/blobstorage/vdisk/query/query_public.h @@ -83,8 +83,7 @@ namespace NKikimr { THullDsSnap &&fullSnap, const TActorId &parentId, TEvBlobStorage::TEvVDbStat::TPtr &ev, - std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result, - const IActor& actor); + std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result); IActor *CreateMonStreamActor(THullDsSnap&& fullSnap, TEvBlobStorage::TEvMonStreamQuery::TPtr& ev); diff --git a/ydb/core/blobstorage/vdisk/query/query_readactor.cpp b/ydb/core/blobstorage/vdisk/query/query_readactor.cpp index dfe5cff3c0..9a27de1b3f 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readactor.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_readactor.cpp @@ -1,6 +1,6 @@ #include "query_readbatch.h" -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/base/vdisk_priorities.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <util/generic/algorithm.h> using namespace NKikimrServices; @@ -17,7 +17,7 @@ namespace NKikimr { const TActorId NotifyID; std::shared_ptr<TReadBatcherResult> Result; const ui8 Priority; - NWilson::TTraceId TraceId; + NWilson::TSpan Span; const bool IsRepl; ui32 Counter = 0; @@ -37,10 +37,6 @@ namespace NKikimr { // cookie for this request void *cookie = &*it; - // generate wilson event with query details - WILSON_TRACE_FROM_ACTOR(ctx, *this, &TraceId, EvChunkReadSent, ChunkIdx = it->Part.ChunkIdx, - Offset = it->Part.Offset, Size = it->Part.Size, YardCookie = cookie); - // create request std::unique_ptr<NPDisk::TEvChunkRead> msg(new NPDisk::TEvChunkRead(Ctx->PDiskCtx->Dsk->Owner, Ctx->PDiskCtx->Dsk->OwnerRound, it->Part.ChunkIdx, it->Part.Offset, it->Part.Size, @@ -51,7 +47,7 @@ namespace NKikimr { // send request TReplQuoter::QuoteMessage(quoter, std::make_unique<IEventHandle>(Ctx->PDiskCtx->PDiskId, SelfId(), - msg.release(), 0, 0, nullptr, TraceId.SeparateBranch()), it->Part.Size); + msg.release(), 0, 0, nullptr, Span), it->Part.Size); Counter++; } @@ -62,7 +58,7 @@ namespace NKikimr { VDISKP(Ctx->VCtx->VDiskLogPrefix, "GLUEREAD FINISHED(%p): actualReadN# %" PRIu32 " origReadN# %" PRIu32, this, ui32(Result->GlueReads.size()), ui32(Result->DiskDataItemPtrs.size()))); - ctx.Send(NotifyID, new TEvents::TEvCompleted(), 0, 0, std::move(TraceId)); + ctx.Send(NotifyID, new TEvents::TEvCompleted); Die(ctx); } @@ -87,8 +83,6 @@ namespace NKikimr { } NPDisk::TEvChunkReadResult *msg = ev->Get(); - WILSON_TRACE_FROM_ACTOR(ctx, *this, &TraceId, EvChunkReadResultReceived, YardCookie = msg->Cookie, - MergedNode = std::move(ev->TraceId)); TGlueRead *glueRead = static_cast<TGlueRead *>(msg->Cookie); glueRead->Data = std::move(msg->Data); @@ -122,13 +116,14 @@ namespace NKikimr { std::shared_ptr<TReadBatcherResult> result, ui8 priority, NWilson::TTraceId traceId, - bool isRepl) + bool isRepl, + TInstant now) : TActorBootstrapped<TTReadBatcherActor>() , Ctx(ctx) , NotifyID(notifyID) , Result(std::move(result)) , Priority(priority) - , TraceId(std::move(traceId)) + , Span(12, NWilson::ERelation::ChildOf, std::move(traceId), now, "VDisk.TReadBatcherActor") , IsRepl(isRepl) {} }; @@ -139,9 +134,10 @@ namespace NKikimr { std::shared_ptr<TReadBatcherResult> result, ui8 priority, NWilson::TTraceId traceId, - bool isRepl) + bool isRepl, + TInstant now) { - return new TTReadBatcherActor(ctx, notifyID, result, priority, std::move(traceId), isRepl); + return new TTReadBatcherActor(ctx, notifyID, result, priority, std::move(traceId), isRepl, now); } } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/query/query_readactor.h b/ydb/core/blobstorage/vdisk/query/query_readactor.h index 614625ee8b..daee28aaf6 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readactor.h +++ b/ydb/core/blobstorage/vdisk/query/query_readactor.h @@ -11,7 +11,8 @@ namespace NKikimr { std::shared_ptr<TReadBatcherResult> result, ui8 priority, NWilson::TTraceId traceId, - bool isRepl); + bool isRepl, + TInstant now); } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp index a883191f15..566af60d43 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp @@ -1,6 +1,5 @@ #include "query_readbatch.h" #include "query_readactor.h" -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/base/vdisk_priorities.h> #include <util/generic/algorithm.h> @@ -218,10 +217,8 @@ namespace NKikimr { return true; } - IActor *TReadBatcher::CreateAsyncDataReader(const TActorId ¬ifyID, - ui8 priority, - NWilson::TTraceId traceId, - bool isRepl) { + IActor *TReadBatcher::CreateAsyncDataReader(const TActorId ¬ifyID, ui8 priority, NWilson::TTraceId traceId, + bool isRepl, TInstant now) { if (Result->DiskDataItemPtrs.empty()) return nullptr; else { @@ -242,7 +239,7 @@ namespace NKikimr { PDiskReadBytes += size; } // start reader - return CreateReadBatcherActor(Ctx, notifyID, Result, priority, std::move(traceId), isRepl); + return CreateReadBatcherActor(Ctx, notifyID, Result, priority, std::move(traceId), isRepl, now); } } diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.h b/ydb/core/blobstorage/vdisk/query/query_readbatch.h index b0592a9a20..fba173b809 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readbatch.h +++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.h @@ -329,7 +329,8 @@ namespace NKikimr { // creates an actor for efficient async data reads, returns nullptr // if no read required - IActor *CreateAsyncDataReader(const TActorId ¬ifyID, ui8 priority, NWilson::TTraceId traceId, bool isRepl); + IActor *CreateAsyncDataReader(const TActorId ¬ifyID, ui8 priority, NWilson::TTraceId traceId, bool isRepl, + TInstant now); const TReadBatcherResult &GetResult() const { return *Result; } ui64 GetPDiskReadBytes() const { return PDiskReadBytes; } diff --git a/ydb/core/blobstorage/vdisk/query/query_statdb.h b/ydb/core/blobstorage/vdisk/query/query_statdb.h index 67aa2aeec5..b8a57ebd89 100644 --- a/ydb/core/blobstorage/vdisk/query/query_statdb.h +++ b/ydb/core/blobstorage/vdisk/query/query_statdb.h @@ -28,7 +28,7 @@ namespace NKikimr { const bool prettyPrint = Ev->Get()->Record.GetPrettyPrint(); CalculateStat(str, prettyPrint); Result->SetResult(str.Str()); - SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, Ev->Cookie); + SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie); ctx.Send(ParentId, new TEvents::TEvActorDied); TThis::Die(ctx); } diff --git a/ydb/core/blobstorage/vdisk/query/query_stathuge.cpp b/ydb/core/blobstorage/vdisk/query/query_stathuge.cpp index e4f14dba80..08c30752ab 100644 --- a/ydb/core/blobstorage/vdisk/query/query_stathuge.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_stathuge.cpp @@ -22,7 +22,7 @@ namespace NKikimr { CalculateUsedHugeChunks(str, prettyPrint); Result->SetResult(str.Str()); - SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, 0); + SendVDiskResponse(ctx, Ev->Sender, Result.release(), 0); ctx.Send(ParentId, new TEvents::TEvActorDied); TThis::Die(ctx); } diff --git a/ydb/core/blobstorage/vdisk/query/query_stattablet.cpp b/ydb/core/blobstorage/vdisk/query/query_stattablet.cpp index 16e25fb1a6..5e4817b059 100644 --- a/ydb/core/blobstorage/vdisk/query/query_stattablet.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_stattablet.cpp @@ -24,7 +24,7 @@ namespace NKikimr { ProcessLogoBlobs(str, tabletId, prettyPrint); Result->SetResult(str.Str()); - SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, 0); + SendVDiskResponse(ctx, Ev->Sender, Result.release(), 0); ctx.Send(ParentId, new TEvents::TEvActorDied); TThis::Die(ctx); } diff --git a/ydb/core/blobstorage/vdisk/repl/query_donor.h b/ydb/core/blobstorage/vdisk/repl/query_donor.h index 7479d607c9..aa8ead2005 100644 --- a/ydb/core/blobstorage/vdisk/repl/query_donor.h +++ b/ydb/core/blobstorage/vdisk/repl/query_donor.h @@ -91,7 +91,7 @@ namespace NKikimr { void PassAway() override { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_VDISK_GET, SelfId() << " finished query"); Send(ParentId, new TEvents::TEvActorDied); - SendVDiskResponse(TActivationContext::AsActorContext(), Sender, Result.release(), *this, Cookie); + SendVDiskResponse(TActivationContext::AsActorContext(), Sender, Result.release(), Cookie); TActorBootstrapped::PassAway(); } diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 728a196965..54b755bf77 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -15,7 +15,6 @@ #include "skeleton_events.h" #include "skeleton_capturevdisklayout.h" #include "skeleton_compactionstate.h" -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_iter.h> #include <ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.h> #include <ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h> @@ -139,7 +138,7 @@ namespace NKikimr { template <class TOrigEv> void SendReply(const TActorContext &ctx, std::unique_ptr<IEventBase> result, TOrigEv &orig, EServiceKikimr logService) { Y_UNUSED(logService); - SendVDiskResponse(ctx, orig->Sender, result.release(), *this, orig->Cookie); + SendVDiskResponse(ctx, orig->Sender, result.release(), orig->Cookie); } //////////////////////////////////////////////////////////////////////// @@ -147,7 +146,7 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////// void Handle(TEvBlobStorage::TEvVMovedPatch::TPtr &ev, const TActorContext &ctx) { - const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this); + const bool postpone = OverloadHandler->PostponeEvent(ev); if (!postpone) { PrivateHandle(ev, ctx); } @@ -187,7 +186,7 @@ namespace NKikimr { } void Handle(TEvBlobStorage::TEvVPatchStart::TPtr &ev, const TActorContext &ctx) { - const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this); + const bool postpone = OverloadHandler->PostponeEvent(ev); if (!postpone) { PrivateHandle(ev, ctx); } @@ -267,7 +266,7 @@ namespace NKikimr { } void Handle(TEvBlobStorage::TEvVMultiPut::TPtr &ev, const TActorContext &ctx) { - const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this); + const bool postpone = OverloadHandler->PostponeEvent(ev); if (!postpone) { PrivateHandle(ev, ctx); } @@ -348,13 +347,11 @@ namespace NKikimr { return logMsg; } - std::unique_ptr<TEvHullWriteHugeBlob> CreateHullWriteHugeBlob(const TActorContext &ctx, NActors::TActorId sender, - ui64 cookie, NWilson::TTraceId &traceId, bool ignoreBlock, - NKikimrBlobStorage::EPutHandleClass handleClass, TVPutInfo &info, + std::unique_ptr<TEvHullWriteHugeBlob> CreateHullWriteHugeBlob(NActors::TActorId sender, + ui64 cookie, bool ignoreBlock, NKikimrBlobStorage::EPutHandleClass handleClass, TVPutInfo &info, std::unique_ptr<TEvBlobStorage::TEvVPutResult> res) { Y_VERIFY_DEBUG(info.HullStatus.Status == NKikimrProto::OK); - WILSON_TRACE_FROM_ACTOR(ctx, *this, &traceId, EvHullWriteHugeBlobSent); info.Buffer = TDiskBlob::Create(info.BlobId.BlobSize(), info.BlobId.PartId(), Db->GType.TotalPartCount(), std::move(info.Buffer), *Arena); UpdatePDiskWriteBytes(info.Buffer.GetSize()); @@ -417,8 +414,6 @@ namespace NKikimr { } void PrivateHandle(TEvBlobStorage::TEvVMultiPut::TPtr &ev, const TActorContext &ctx) { - WILSON_TRACE_FROM_ACTOR(ctx, *this, &ev->TraceId, EvVPutReceived, VDiskId = SelfVDiskId, - PDiskId = Config->BaseInfo.PDiskId, VDiskSlotId = Config->BaseInfo.VDiskSlotId); IFaceMonGroup->MultiPutMsgs()++; IFaceMonGroup->PutTotalBytes() += ev->GetSize(); @@ -522,6 +517,7 @@ namespace NKikimr { std::unique_ptr<NPDisk::TEvMultiLog> evLogs = std::make_unique<NPDisk::TEvMultiLog>(); ui64 cookie = ev->Cookie; + const NWilson::TTraceId traceId(ev->TraceId); IActor* vMultiPutActor = CreateSkeletonVMultiPutActor(SelfId(), statuses, oosStatus, ev, SkeletonFrontIDPtr, IFaceMonGroup->MultiPutResMsgsPtr(), Db->GetVDiskIncarnationGuid()); @@ -553,7 +549,6 @@ namespace NKikimr { if (info.IsHugeBlob) { // pass the work to huge blob writer - NWilson::TTraceId traceId; TInstant deadline = (record.HasMsgQoS() && record.GetMsgQoS().HasDeadlineSeconds()) ? TInstant::Seconds(record.GetMsgQoS().GetDeadlineSeconds()) : TInstant::Max(); @@ -565,9 +560,9 @@ namespace NKikimr { VCtx->Histograms.GetHistogram(handleClass), info.Buffer.GetSize(), NWilson::TTraceId(), Db->GetVDiskIncarnationGuid(), errorReason)); if (info.Buffer) { - auto hugeWrite = CreateHullWriteHugeBlob(ctx, vMultiPutActorId, cookie, traceId, ignoreBlock, - handleClass, info, std::move(result)); - ctx.Send(Db->HugeKeeperID, hugeWrite.release()); + auto hugeWrite = CreateHullWriteHugeBlob(vMultiPutActorId, cookie, ignoreBlock, handleClass, + info, std::move(result)); + ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, NWilson::TTraceId(traceId)); } else { ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(), ignoreBlock, vMultiPutActorId, cookie, std::move(result))); @@ -614,16 +609,13 @@ namespace NKikimr { } void Handle(TEvBlobStorage::TEvVPut::TPtr &ev, const TActorContext &ctx) { - const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this); + const bool postpone = OverloadHandler->PostponeEvent(ev); if (!postpone) { PrivateHandle(ev, ctx); } } void PrivateHandle(TEvBlobStorage::TEvVPut::TPtr &ev, const TActorContext &ctx) { - WILSON_TRACE_FROM_ACTOR(ctx, *this, &ev->TraceId, EvVPutReceived, VDiskId = SelfVDiskId, - PDiskId = Config->BaseInfo.PDiskId, VDiskSlotId = Config->BaseInfo.VDiskSlotId); - IFaceMonGroup->PutMsgs()++; IFaceMonGroup->PutTotalBytes() += ev->GetSize(); TInstant now = TAppData::TimeProvider->Now(); @@ -698,9 +690,9 @@ namespace NKikimr { } else if (info.Buffer) { // pass the work to huge blob writer NKikimrBlobStorage::EPutHandleClass handleClass = record.GetHandleClass(); - auto hugeWrite = CreateHullWriteHugeBlob(ctx, ev->Sender, ev->Cookie, ev->TraceId, ignoreBlock, - handleClass, info, std::move(result)); - ctx.Send(Db->HugeKeeperID, hugeWrite.release()); + auto hugeWrite = CreateHullWriteHugeBlob(ev->Sender, ev->Cookie, ignoreBlock, handleClass, info, + std::move(result)); + ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(ev->TraceId)); } else { ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(), ignoreBlock, ev->Sender, ev->Cookie, std::move(result))); @@ -710,8 +702,6 @@ namespace NKikimr { void Handle(TEvHullLogHugeBlob::TPtr &ev, const TActorContext &ctx) { TEvHullLogHugeBlob *msg = ev->Get(); - WILSON_TRACE_FROM_ACTOR(ctx, *this, &msg->Result->TraceId, EvHullLogHugeBlobReceived); - // update hull write duration msg->Result->MarkHugeWriteTime(); auto status = Hull->CheckLogoBlob(ctx, msg->LogoBlobID, msg->IgnoreBlock); @@ -727,7 +717,7 @@ namespace NKikimr { Hull->PostponeReplyUntilCommitted(msg->Result.release(), msg->OrigClient, msg->OrigCookie, status.Lsn); } else { - SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), *this, msg->OrigCookie); + SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), msg->OrigCookie); } return; @@ -824,12 +814,10 @@ namespace NKikimr { using namespace NErrBuilder; std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo)); - SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie); } void Handle(TEvBlobStorage::TEvVGet::TPtr &ev, const TActorContext &ctx) { - WILSON_TRACE_FROM_ACTOR(ctx, *this, &ev->TraceId, EvVGetReceived); - IFaceMonGroup->GetMsgs()++; TInstant now = TAppData::TimeProvider->Now(); NKikimrBlobStorage::TEvVGet &record = ev->Get()->Record; @@ -990,7 +978,7 @@ namespace NKikimr { LOG_DEBUG_S(ctx, BS_VDISK_BLOCK, VCtx->VDiskLogPrefix << "TEvVGetBlockResult: " << result->ToString() << " Marker# BSVS17"); - SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie); } //////////////////////////////////////////////////////////////////////// @@ -1064,7 +1052,7 @@ namespace NKikimr { using namespace NErrBuilder; std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo)); - SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie); } void Handle(TEvBlobStorage::TEvVGetBarrier::TPtr &ev, const TActorContext &ctx) { @@ -1127,7 +1115,7 @@ namespace NKikimr { using namespace NErrBuilder; std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo)); - SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie); } void Handle(TEvBlobStorage::TEvVDbStat::TPtr &ev, const TActorContext &ctx) { @@ -1144,7 +1132,7 @@ namespace NKikimr { IFaceMonGroup->DbStatResMsgsPtr(), nullptr, std::move(ev->TraceId)); THullDsSnap fullSnap = Hull->GetIndexSnapshot(); IActor *actor = CreateDbStatActor(HullCtx, HugeBlobCtx, ctx, std::move(fullSnap), - ctx.SelfID, ev, std::move(result), *this); + ctx.SelfID, ev, std::move(result)); if (actor) { auto aid = ctx.Register(actor); ActiveActors.Insert(aid); @@ -1181,7 +1169,7 @@ namespace NKikimr { void Reply(const NKikimrProto::EReplyStatus status, const TString& /*errorReason*/, TEvBlobStorage::TEvVCompact::TPtr &ev, const TActorContext &ctx, const TInstant &/*now*/) { auto result = std::make_unique<TEvBlobStorage::TEvVCompactResult>(status, SelfVDiskId); - SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie); } void ReplyError(NKikimrProto::EReplyStatus status, const TString& errorReason, TEvBlobStorage::TEvVCompact::TPtr &ev, @@ -1239,7 +1227,7 @@ namespace NKikimr { void Handle(TEvHullCompactResult::TPtr &ev, const TActorContext &ctx) { Y_VERIFY(VDiskCompactionState); - VDiskCompactionState->Compacted(ctx, *this, ev->Get()->RequestId, ev->Get()->Type); + VDiskCompactionState->Compacted(ctx, ev->Get()->RequestId, ev->Get()->Type); } //////////////////////////////////////////////////////////////////////// @@ -1248,7 +1236,7 @@ namespace NKikimr { void Reply(const NKikimrProto::EReplyStatus status, const TString& /*errorReason*/, TEvBlobStorage::TEvVBaldSyncLog::TPtr &ev, const TActorContext &ctx, const TInstant &/*now*/) { auto result = std::make_unique<TEvBlobStorage::TEvVBaldSyncLogResult>(status, SelfVDiskId); - SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie); } void ReplyError(NKikimrProto::EReplyStatus status, const TString& errorReason, TEvBlobStorage::TEvVBaldSyncLog::TPtr &ev, @@ -1282,7 +1270,7 @@ namespace NKikimr { using namespace NErrBuilder; std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo)); - SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie); } void Handle(TEvBlobStorage::TEvVSync::TPtr &ev, const TActorContext &ctx) { @@ -1297,7 +1285,7 @@ namespace NKikimr { using namespace NErrBuilder; std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo)); - SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie); } // FIXME: check for RACE in other handlers!!! @@ -1336,7 +1324,7 @@ namespace NKikimr { } void Handle(TEvLocalSyncData::TPtr &ev, const TActorContext &ctx) { - const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this); + const bool postpone = OverloadHandler->PostponeEvent(ev); if (!postpone) { PrivateHandle(ev, ctx); } @@ -1391,7 +1379,7 @@ namespace NKikimr { } void Handle(TEvAnubisOsirisPut::TPtr &ev, const TActorContext &ctx) { - const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this); + const bool postpone = OverloadHandler->PostponeEvent(ev); if (!postpone) { PrivateHandle(ev, ctx); } @@ -1461,7 +1449,7 @@ namespace NKikimr { using namespace NErrBuilder; std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo)); - SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie); } void Handle(TEvBlobStorage::TEvVSyncFull::TPtr &ev, const TActorContext &ctx) { @@ -1488,7 +1476,7 @@ namespace NKikimr { std::unique_ptr<ILoggedRec> loggedRec(LoggedRecsVault.Extract(loggedRecId)); Db->LsnMngr->ConfirmLsnForHull(loggedRec->Seg, loggedRec->ConfirmSyncLogAlso); - loggedRec->Replay(*Hull, ctx, *this); + loggedRec->Replay(*Hull, ctx); } if (VDiskCompactionState && !results.empty()) { VDiskCompactionState->Logged(ctx, results.back().Lsn); diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp index 9ed7de23f7..7f21fefcdc 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp @@ -3,7 +3,6 @@ #include "blobstorage_skeletonerr.h" #include "blobstorage_skeleton.h" #include <ydb/core/blobstorage/base/blobstorage_events.h> -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/base/utility.h> #include <ydb/core/blobstorage/base/html.h> @@ -99,21 +98,13 @@ namespace NKikimr { NKikimrBlobStorage::EVDiskQueueId ExtQueueId; NBackpressure::TQueueClientId ClientId; TActorId ActorId; + NWilson::TSpan Span; - TRecord() - : Ev() - , ReceivedTime() - , Deadline() - , ByteSize(0) - , MsgId() - , Cost(0) - , ExtQueueId(NKikimrBlobStorage::EVDiskQueueId::Unknown) - , ClientId() - {} + TRecord() = default; TRecord(std::unique_ptr<IEventHandle> ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId, ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId, - const NBackpressure::TQueueClientId& clientId) + const NBackpressure::TQueueClientId& clientId, TString name) : Ev(std::move(ev)) , ReceivedTime(now) , Deadline(deadline) @@ -123,7 +114,10 @@ namespace NKikimr { , ExtQueueId(extQueueId) , ClientId(clientId) , ActorId(Ev->Sender) - {} + , Span(9 /*verbosity*/, NWilson::ERelation::FollowsFrom, std::move(Ev->TraceId), now, "VDisk.PutInQueue") + { + Span.Attribute("QueueName", std::move(name)); + } }; using TMyQueueBackpressure = NBackpressure::TQueueBackpressure<NBackpressure::TQueueClientId>; @@ -224,10 +218,9 @@ namespace NKikimr { ++*SkeletonFrontDelayedCount; *SkeletonFrontDelayedBytes += recByteSize; - WILSON_TRACE_FROM_ACTOR(ctx, front, &converted->TraceId, EvSkeletonFrontEnqueue); - TInstant now = TAppData::TimeProvider->Now(); - Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId, clientId)); + Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId, + clientId, Name)); } } @@ -256,13 +249,15 @@ namespace NKikimr { TDuration inQueue = now - rec->ReceivedTime; ApplyToRecord(*rec->Ev, TUpdateInQueueTime(inQueue)); + // trace end of in-queue span + rec->Span.EndOk(); + if (forceError) { front.GetExtQueue(rec->ExtQueueId).DroppedWithError(ctx, rec, now, front); } else if (now >= rec->Deadline) { ++Deadlines; front.GetExtQueue(rec->ExtQueueId).DeadlineHappened(ctx, rec, now, front); } else { - WILSON_TRACE_FROM_ACTOR(ctx, front, &rec->Ev->TraceId, EvSkeletonFrontProceed); ctx.ExecutorThread.Send(rec->Ev.release()); ++InFlightCount; @@ -1298,7 +1293,7 @@ namespace NKikimr { TInstant now) { using namespace NErrBuilder; auto res = ErroneousResult(VCtx, status, errorReason, ev, now, nullptr, SelfVDiskId, VDiskIncarnationGuid, GInfo); - SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie); } void Reply(TEvBlobStorage::TEvVCheckReadiness::TPtr &ev, const TActorContext &ctx, diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfull.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfull.cpp index 3d4f78d008..8733f30643 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfull.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfull.cpp @@ -125,7 +125,7 @@ namespace NKikimr { Result->Record.SetBlockTabletFrom(KeyBlock.TabletId); KeyBarrier.Serialize(*Result->Record.MutableBarrierFrom()); // send reply - SendVDiskResponse(ctx, Recipient, Result.release(), *this, 0); + SendVDiskResponse(ctx, Recipient, Result.release(), 0); // notify parent about death ctx.Send(ParentId, new TEvents::TEvActorDied); Die(ctx); diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp index 11f708e717..8452a57134 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp @@ -59,7 +59,7 @@ namespace NKikimr { auto result = std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(NKikimrProto::ERROR, SelfVDiskId, Record.GetCookie(), Now, IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, std::move(Ev->TraceId), Ev->GetChannel()); - SendVDiskResponse(ctx, recipient, result.release(), *this, cookie); + SendVDiskResponse(ctx, recipient, result.release(), cookie); Die(ctx); return; } @@ -74,7 +74,7 @@ namespace NKikimr { auto result = std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(NKikimrProto::NODATA, SelfVDiskId, TSyncState(Db->GetVDiskIncarnationGuid(), DbBirthLsn), Record.GetCookie(), Now, IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, std::move(Ev->TraceId), Ev->GetChannel()); - SendVDiskResponse(ctx, recipient, result.release(), *this, cookie); + SendVDiskResponse(ctx, recipient, result.release(), cookie); Die(ctx); return; } diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.cpp index daaa498926..cc7967978f 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.cpp @@ -53,7 +53,6 @@ namespace NKikimr { void TVDiskCompactionState::Compacted( const TActorContext &ctx, - const IActor& actor, i64 reqId, EHullDbType dbType) { auto it = Requests.find(reqId); @@ -68,7 +67,7 @@ namespace NKikimr { } if (req.AllDone()) { - SendVDiskResponse(ctx, req.ClientId, req.Reply.release(), actor, req.ClientCookie); + SendVDiskResponse(ctx, req.ClientId, req.Reply.release(), req.ClientCookie); // delete req from Request, we handled it Requests.erase(it); } diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.h index 1a0a39677b..2b9777609a 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.h +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.h @@ -24,7 +24,7 @@ namespace NKikimr { // setup input compaction request void Setup(const TActorContext &ctx, std::optional<ui64> lsn, TCompactionReq cState); // when hull db reports compaction finish we change state by calling this function - void Compacted(const TActorContext &ctx, const IActor& actor, i64 reqId, EHullDbType dbType); + void Compacted(const TActorContext &ctx, i64 reqId, EHullDbType dbType); // when data is flushed to recovery log run compaction void Logged(const TActorContext &ctx, ui64 lsn) { if (Triggered && lsn >= LsnToCommit) { diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp index ce4615fb5d..474714864b 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp @@ -35,7 +35,7 @@ namespace NKikimr { , RecipientCookie(recipientCookie) {} - void TLoggedRecVPut::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) { + void TLoggedRecVPut::Replay(THull &hull, const TActorContext &ctx) { TLogoBlobID genId(Id, 0); hull.AddLogoBlob(ctx, genId, Id.PartId(), Ingress, Buffer, Seg.Point()); @@ -44,7 +44,7 @@ namespace NKikimr { << " msg# " << Result->ToString() << " Marker# BSVSLR01"); - SendVDiskResponse(ctx, Recipient, Result.release(), actor, RecipientCookie); + SendVDiskResponse(ctx, Recipient, Result.release(), RecipientCookie); } /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -68,8 +68,7 @@ namespace NKikimr { , RecipientCookie(recipientCookie) {} - void TLoggedRecVMultiPutItem::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) { - Y_UNUSED(actor); + void TLoggedRecVMultiPutItem::Replay(THull &hull, const TActorContext &ctx) { TLogoBlobID genId(Id, 0); hull.AddLogoBlob(ctx, genId, Id.PartId(), Ingress, Buffer, Seg.Point()); @@ -95,7 +94,7 @@ namespace NKikimr { , Ev(ev) {} - void TLoggedRecVPutHuge::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) { + void TLoggedRecVPutHuge::Replay(THull &hull, const TActorContext &ctx) { TEvHullLogHugeBlob *msg = Ev->Get(); TLogoBlobID genId(msg->LogoBlobID, 0); @@ -108,7 +107,7 @@ namespace NKikimr { LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PUT, hull.GetHullCtx()->VCtx->VDiskLogPrefix << "TEvVPut: realtime# false result# " << msg->Result->ToString() << " Marker# BSVSLR03"); - SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), actor, msg->OrigCookie); + SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), msg->OrigCookie); } /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -132,9 +131,9 @@ namespace NKikimr { , RecipientCookie(recipientCookie) {} - void TLoggedRecVBlock::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) { - auto replySender = [&ctx, &actor] (const TActorId &id, ui64 cookie, IEventBase *msg) { - SendVDiskResponse(ctx, id, msg, actor, cookie); + void TLoggedRecVBlock::Replay(THull &hull, const TActorContext &ctx) { + auto replySender = [&ctx] (const TActorId &id, ui64 cookie, IEventBase *msg) { + SendVDiskResponse(ctx, id, msg, cookie); }; hull.AddBlockCmd(ctx, TabletId, Gen, IssuerGuid, Seg.Point(), replySender); @@ -142,7 +141,7 @@ namespace NKikimr { LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_BLOCK, hull.GetHullCtx()->VCtx->VDiskLogPrefix << "TEvVBlock: result# " << Result->ToString() << " Marker# BSVSLR04"); - SendVDiskResponse(ctx, Recipient, Result.release(), actor, RecipientCookie); + SendVDiskResponse(ctx, Recipient, Result.release(), RecipientCookie); } /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -160,14 +159,14 @@ namespace NKikimr { , OrigEv(origEv) {} - void TLoggedRecVCollectGarbage::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) { + void TLoggedRecVCollectGarbage::Replay(THull &hull, const TActorContext &ctx) { NKikimrBlobStorage::TEvVCollectGarbage &record = OrigEv->Get()->Record; hull.AddGCCmd(ctx, record, Ingress, Seg); LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_GC, hull.GetHullCtx()->VCtx->VDiskLogPrefix << "TEvVCollectGarbage: result# " << Result->ToString() << " Marker# BSVSLR05"); - SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), actor, OrigEv->Cookie); + SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), OrigEv->Cookie); } /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -183,9 +182,9 @@ namespace NKikimr { , OrigEv(origEv) {} - void TLoggedRecLocalSyncData::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) { - auto replySender = [&ctx, &actor] (const TActorId &id, ui64 cookie, IEventBase *msg) { - SendVDiskResponse(ctx, id, msg, actor, cookie); + void TLoggedRecLocalSyncData::Replay(THull &hull, const TActorContext &ctx) { + auto replySender = [&ctx] (const TActorId &id, ui64 cookie, IEventBase *msg) { + SendVDiskResponse(ctx, id, msg, cookie); }; #ifdef UNPACK_LOCALSYNCDATA @@ -193,7 +192,7 @@ namespace NKikimr { #else hull.AddSyncDataCmd(ctx, OrigEv->Get()->Data, Seg, replySender); #endif - SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), actor, OrigEv->Cookie); + SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), OrigEv->Cookie); } /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -211,9 +210,9 @@ namespace NKikimr { , OrigEv(origEv) {} - void TLoggedRecAnubisOsirisPut::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) { + void TLoggedRecAnubisOsirisPut::Replay(THull &hull, const TActorContext &ctx) { hull.AddAnubisOsirisLogoBlob(ctx, Insert.Id, Insert.Ingress, Seg); - SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), actor, OrigEv->Cookie); + SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), OrigEv->Cookie); } /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -227,8 +226,7 @@ namespace NKikimr { , OrigEv(origEv) {} - void TLoggedRecPhantoms::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) { - Y_UNUSED(actor); + void TLoggedRecPhantoms::Replay(THull &hull, const TActorContext &ctx) { TEvDetectedPhantomBlob *msg = OrigEv->Get(); hull.CollectPhantoms(ctx, msg->Phantoms, Seg); } @@ -248,9 +246,9 @@ namespace NKikimr { , RecipientCookie(recipientCookie) {} - void TLoggedRecDelLogoBlobDataSyncLog::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) { + void TLoggedRecDelLogoBlobDataSyncLog::Replay(THull &hull, const TActorContext &ctx) { Y_UNUSED(hull); - SendVDiskResponse(ctx, Recipient, Result.release(), actor, RecipientCookie); + SendVDiskResponse(ctx, Recipient, Result.release(), RecipientCookie); } /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -264,9 +262,9 @@ namespace NKikimr { , OrigEv(ev) {} - void TLoggedRecAddBulkSst::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) { + void TLoggedRecAddBulkSst::Replay(THull &hull, const TActorContext &ctx) { hull.AddBulkSst(ctx, OrigEv->Get()->Essence, Seg); - SendVDiskResponse(ctx, OrigEv->Sender, new TEvAddBulkSstResult, actor, OrigEv->Cookie); + SendVDiskResponse(ctx, OrigEv->Sender, new TEvAddBulkSstResult, OrigEv->Cookie); } /////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h index 50a7a06d2a..34c3c4a5df 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h @@ -36,7 +36,7 @@ namespace NKikimr { ILoggedRec(TLsnSeg seg, bool confirmSyncLogAlso); virtual ~ILoggedRec() = default; // a method that replays changes that has been written to the recovery log - virtual void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) = 0; + virtual void Replay(THull &hull, const TActorContext &ctx) = 0; const TLsnSeg Seg; const bool ConfirmSyncLogAlso; @@ -50,7 +50,7 @@ namespace NKikimr { TLoggedRecVPut(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id, const TIngress &ingress, TRope &&buffer, std::unique_ptr<TEvBlobStorage::TEvVPutResult> result, const TActorId &recipient, ui64 recipientCookie); - void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override; + void Replay(THull &hull, const TActorContext &ctx) override; private: TLogoBlobID Id; @@ -69,7 +69,7 @@ namespace NKikimr { TLoggedRecVMultiPutItem(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id, const TIngress &ingress, TRope &&buffer, std::unique_ptr<TEvVMultiPutItemResult> result, const TActorId &recipient, ui64 recipientCookie); - void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override; + void Replay(THull &hull, const TActorContext &ctx) override; private: TLogoBlobID Id; @@ -87,7 +87,7 @@ namespace NKikimr { public: TLoggedRecVPutHuge(TLsnSeg seg, bool confirmSyncLogAlso, const TActorId &hugeKeeperId, TEvHullLogHugeBlob::TPtr ev); - void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override; + void Replay(THull &hull, const TActorContext &ctx) override; private: const TActorId HugeKeeperId; @@ -101,7 +101,7 @@ namespace NKikimr { public: TLoggedRecVBlock(TLsnSeg seg, bool confirmSyncLogAlso, ui64 tabletId, ui32 gen, ui64 issuerGuid, std::unique_ptr<TEvBlobStorage::TEvVBlockResult> result, const TActorId &recipient, ui64 recipientCookie); - void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override; + void Replay(THull &hull, const TActorContext &ctx) override; private: ui64 TabletId; @@ -120,7 +120,7 @@ namespace NKikimr { TLoggedRecVCollectGarbage(TLsnSeg seg, bool confirmSyncLogAlso, TBarrierIngress ingress, std::unique_ptr<TEvBlobStorage::TEvVCollectGarbageResult> result, TEvBlobStorage::TEvVCollectGarbage::TPtr origEv); - void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override; + void Replay(THull &hull, const TActorContext &ctx) override; private: TBarrierIngress Ingress; @@ -135,7 +135,7 @@ namespace NKikimr { public: TLoggedRecLocalSyncData(TLsnSeg seg, bool confirmSyncLogAlso, std::unique_ptr<TEvLocalSyncDataResult> result, TEvLocalSyncData::TPtr origEv); - void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override; + void Replay(THull &hull, const TActorContext &ctx) override; private: std::unique_ptr<TEvLocalSyncDataResult> Result; @@ -150,7 +150,7 @@ namespace NKikimr { TLoggedRecAnubisOsirisPut(TLsnSeg seg, bool confirmSyncLogAlso, const TEvAnubisOsirisPut::THullDbInsert &insert, std::unique_ptr<TEvAnubisOsirisPutResult> result, TEvAnubisOsirisPut::TPtr origEv); - void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override; + void Replay(THull &hull, const TActorContext &ctx) override; private: TEvAnubisOsirisPut::THullDbInsert Insert; @@ -164,7 +164,7 @@ namespace NKikimr { class TLoggedRecPhantoms : public ILoggedRec { public: TLoggedRecPhantoms(TLsnSeg seg, bool confirmSyncLogAlso, TEvDetectedPhantomBlob::TPtr origEv); - void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override; + void Replay(THull &hull, const TActorContext &ctx) override; private: TEvDetectedPhantomBlob::TPtr OrigEv; @@ -178,7 +178,7 @@ namespace NKikimr { TLoggedRecDelLogoBlobDataSyncLog(TLsnSeg seg, bool confirmSyncLogAlso, std::unique_ptr<TEvDelLogoBlobDataSyncLogResult> result, const TActorId &recipient, ui64 recipientCookie); - void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override; + void Replay(THull &hull, const TActorContext &ctx) override; private: std::unique_ptr<TEvDelLogoBlobDataSyncLogResult> Result; @@ -193,7 +193,7 @@ namespace NKikimr { class TLoggedRecAddBulkSst : public ILoggedRec { public: TLoggedRecAddBulkSst(TLsnSeg seg, bool confirmSyncLogAlso, TEvAddBulkSst::TPtr ev); - void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override; + void Replay(THull &hull, const TActorContext &ctx) override; private: TEvAddBulkSst::TPtr OrigEv; diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp index 45968b8d54..fbe53c1968 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp @@ -1,5 +1,4 @@ #include "skeleton_overload_handler.h" -#include <ydb/core/blobstorage/base/wilson_events.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_pdiskctx.h> #include <ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hullsatisfactionrank.h> #include <ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h> @@ -210,30 +209,6 @@ namespace NKikimr { return proceedFurther; } - bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVMovedPatch::TPtr &ev, const TActorContext &ctx, IActor *skeleton) { - return PostponeEventPrivate(ev, ctx, skeleton); - } - - bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVPatchStart::TPtr &ev, const TActorContext &ctx, IActor *skeleton) { - return PostponeEventPrivate(ev, ctx, skeleton); - } - - bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton) { - return PostponeEventPrivate(ev, ctx, skeleton); - } - - bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVMultiPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton) { - return PostponeEventPrivate(ev, ctx, skeleton); - } - - bool TOverloadHandler::PostponeEvent(TEvLocalSyncData::TPtr &ev, const TActorContext &ctx, IActor *skeleton) { - return PostponeEventPrivate(ev, ctx, skeleton); - } - - bool TOverloadHandler::PostponeEvent(TEvAnubisOsirisPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton) { - return PostponeEventPrivate(ev, ctx, skeleton); - } - void TOverloadHandler::ToWhiteboard(const TOverloadHandler *this_, NKikimrWhiteboard::TVDiskSatisfactionRank &v) { if (this_) { this_->DynamicPDiskWeightsManager->ToWhiteboard(v); @@ -258,9 +233,8 @@ namespace NKikimr { } template <class TEv> - inline bool TOverloadHandler::PostponeEventPrivate(TEv &ev, const TActorContext &ctx, IActor *skeleton) { + inline bool TOverloadHandler::PostponeEvent(TAutoPtr<TEventHandle<TEv>> &ev) { if (DynamicPDiskWeightsManager->StopPuts() || !EmergencyQueue->Empty()) { - WILSON_TRACE_FROM_ACTOR(ctx, *skeleton, &ev->TraceId, EvPutIntoEmergQueue); EmergencyQueue->Push(ev); return true; } else { @@ -268,4 +242,11 @@ namespace NKikimr { } } + template bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVMovedPatch::TPtr &ev); + template bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVPatchStart::TPtr &ev); + template bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVPut::TPtr &ev); + template bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVMultiPut::TPtr &ev); + template bool TOverloadHandler::PostponeEvent(TEvLocalSyncData::TPtr &ev); + template bool TOverloadHandler::PostponeEvent(TEvAnubisOsirisPut::TPtr &ev); + } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.h index 00707bb803..a20d375ef5 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.h +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.h @@ -78,12 +78,8 @@ namespace NKikimr { bool ProcessPostponedEvents(const TActorContext &ctx, int batchSize, bool actualizeLevels); // Postpone event in case of overload - bool PostponeEvent(TEvBlobStorage::TEvVMovedPatch::TPtr &ev, const TActorContext &ctx, IActor *skeleton); - bool PostponeEvent(TEvBlobStorage::TEvVPatchStart::TPtr &ev, const TActorContext &ctx, IActor *skeleton); - bool PostponeEvent(TEvBlobStorage::TEvVPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton); - bool PostponeEvent(TEvBlobStorage::TEvVMultiPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton); - bool PostponeEvent(TEvLocalSyncData::TPtr &ev, const TActorContext &ctx, IActor *skeleton); - bool PostponeEvent(TEvAnubisOsirisPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton); + template<typename TEv> + bool PostponeEvent(TAutoPtr<TEventHandle<TEv>> &ev); static void ToWhiteboard(const TOverloadHandler *this_, NKikimrWhiteboard::TVDiskSatisfactionRank &v); ui32 GetIntegralRankPercent() const; @@ -95,9 +91,6 @@ namespace NKikimr { NMonGroup::TSkeletonOverloadGroup Mon; std::unique_ptr<TEmergencyQueue> EmergencyQueue; std::shared_ptr<TDynamicPDiskWeightsManager> DynamicPDiskWeightsManager; - - template <class TEv> - bool PostponeEventPrivate(TEv &ev, const TActorContext &ctx, IActor *skeleton); }; } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp index cc24115b3f..78870536aa 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp @@ -98,7 +98,7 @@ namespace NKikimr { << " ErrorReason# " << ErrorReason << " Marker# BSVSP01"); } - SendVDiskResponse(ctx, Event->Sender, vMovedPatchResult.release(), *this, Event->Cookie); + SendVDiskResponse(ctx, Event->Sender, vMovedPatchResult.release(), Event->Cookie); PassAway(); } diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp index ac7dcd7d58..bbea6637af 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp @@ -90,7 +90,7 @@ namespace NKikimr { vMultiPutResult->Record.SetStatusFlags(OOSStatus.Flags); - SendVDiskResponse(ctx, Event->Sender, vMultiPutResult.release(), *this, Event->Cookie); + SendVDiskResponse(ctx, Event->Sender, vMultiPutResult.release(), Event->Cookie); PassAway(); } diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp index b04e8d6d8f..0e1f8eb796 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp @@ -184,7 +184,7 @@ namespace NKikimr::NPrivate { FoundPartsEvent->AddPart(part); } FoundPartsEvent->SetStatus(status); - SendVDiskResponse(TActivationContext::AsActorContext(), Sender, FoundPartsEvent.release(), *this, Cookie); + SendVDiskResponse(TActivationContext::AsActorContext(), Sender, FoundPartsEvent.release(), Cookie); } void PullOriginalPart(ui64 pullingPart) { @@ -254,7 +254,7 @@ namespace NKikimr::NPrivate { (ErrorReason, ErrorReason)); Y_VERIFY(ResultEvent); ResultEvent->SetStatus(status, ErrorReason); - SendVDiskResponse(TActivationContext::AsActorContext(), Sender, ResultEvent.release(), *this, Cookie); + SendVDiskResponse(TActivationContext::AsActorContext(), Sender, ResultEvent.release(), Cookie); } void HandleVGetResult(TEvBlobStorage::TEvVGetResult::TPtr &ev) { @@ -316,7 +316,7 @@ namespace NKikimr::NPrivate { for (ui32 idx = ReceivedXorDiffs.size(); idx != 0; --idx) { auto &[diffs, partId, result, sender, cookie] = ReceivedXorDiffs.back(); GType.ApplyXorDiff(TErasureType::CrcModeNone, dataSize, buffer, diffs, partId - 1, toPart - 1); - SendVDiskResponse(TActivationContext::AsActorContext(), sender, result.release(), *this, cookie); + SendVDiskResponse(TActivationContext::AsActorContext(), sender, result.release(), cookie); ReceivedXorDiffs.pop_back(); } @@ -528,7 +528,7 @@ namespace NKikimr::NPrivate { auto resultEvent = std::make_unique<TEvBlobStorage::TEvVPatchXorDiffResult>( NKikimrProto::ERROR, now, &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr, std::move(ev->TraceId)); - SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), *this, ev->Cookie); + SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), ev->Cookie); } void Handle(TEvBlobStorage::TEvVPatchXorDiff::TPtr &ev) { @@ -554,9 +554,9 @@ namespace NKikimr::NPrivate { if (!CheckDiff(xorDiffs, "XorDiff from datapart")) { for (auto &[diffs, partId, result, sender, cookie] : ReceivedXorDiffs) { - SendVDiskResponse(TActivationContext::AsActorContext(), sender, result.release(), *this, cookie); + SendVDiskResponse(TActivationContext::AsActorContext(), sender, result.release(), cookie); } - SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), *this, ev->Cookie); + SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), ev->Cookie); if (ResultEvent) { SendVPatchResult(NKikimrProto::ERROR); @@ -577,7 +577,7 @@ namespace NKikimr::NPrivate { } xorDiffs.clear(); - SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), *this, ev->Cookie); + SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), ev->Cookie); } else { ReceivedXorDiffs.emplace_back(std::move(xorDiffs), fromPart, std::move(resultEvent), ev->Sender, ev->Cookie); diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp index 907b3873f8..9b7dd49ebb 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp @@ -142,7 +142,7 @@ namespace NKikimr { TSyncState(), true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now, SlCtx->CountersMonGroup.VDiskCheckFailedPtr(), nullptr, std::move(ev->TraceId), ev->GetChannel()); - SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie); return; } @@ -161,7 +161,7 @@ namespace NKikimr { auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(NKikimrProto::BLOCKED, SelfVDiskId, TSyncState(), true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now, SlCtx->CountersMonGroup.DiskLockedPtr(), nullptr, std::move(ev->TraceId), ev->GetChannel()); - SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie); return; } @@ -181,7 +181,7 @@ namespace NKikimr { auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(status, SelfVDiskId, syncState, true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now, SlCtx->CountersMonGroup.UnequalGuidPtr(), nullptr, std::move(ev->TraceId), ev->GetChannel()); - SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie); + SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie); return; } diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp index 2873bcb56c..6cbe05c4ec 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp @@ -177,7 +177,7 @@ namespace NKikimr { FragmentWriter.Finish(result->Record.MutableData()); } - SendVDiskResponse(ctx, Ev->Sender, result.release(), *this, Ev->Cookie); + SendVDiskResponse(ctx, Ev->Sender, result.release(), Ev->Cookie); ctx.Send(ParentId, new TEvSyncLogReadFinished(SourceVDisk)); Die(ctx); } diff --git a/ydb/library/pdisk_io/CMakeLists.darwin.txt b/ydb/library/pdisk_io/CMakeLists.darwin.txt index 902a753e73..ef2e2d0688 100644 --- a/ydb/library/pdisk_io/CMakeLists.darwin.txt +++ b/ydb/library/pdisk_io/CMakeLists.darwin.txt @@ -13,10 +13,10 @@ target_link_libraries(ydb-library-pdisk_io PUBLIC yutil tools-enum_parser-enum_serialization_runtime cpp-actors-core + cpp-actors-wilson cpp-monlib-dynamic_counters ydb-core-debug library-pdisk_io-protos - ydb-library-wilson ) target_sources(ydb-library-pdisk_io PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/pdisk_io/aio_mtp.cpp diff --git a/ydb/library/pdisk_io/CMakeLists.linux.txt b/ydb/library/pdisk_io/CMakeLists.linux.txt index 845369e155..9706b4e3b5 100644 --- a/ydb/library/pdisk_io/CMakeLists.linux.txt +++ b/ydb/library/pdisk_io/CMakeLists.linux.txt @@ -16,10 +16,10 @@ target_link_libraries(ydb-library-pdisk_io PUBLIC AIO::aio $CONAN_OPTS_SEM cpp-actors-core + cpp-actors-wilson cpp-monlib-dynamic_counters ydb-core-debug library-pdisk_io-protos - ydb-library-wilson ) target_sources(ydb-library-pdisk_io PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/pdisk_io/aio_linux.cpp diff --git a/ydb/library/pdisk_io/aio.h b/ydb/library/pdisk_io/aio.h index 61d0ee20e6..3dfd861459 100644 --- a/ydb/library/pdisk_io/aio.h +++ b/ydb/library/pdisk_io/aio.h @@ -7,7 +7,7 @@ #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h> #include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_devicemode.h> -#include <ydb/library/wilson/wilson_event.h> +#include <library/cpp/actors/wilson/wilson_event.h> #include <util/system/file.h> #include <util/generic/string.h> diff --git a/ydb/library/pretty_types_print/wilson/CMakeLists.txt b/ydb/library/pretty_types_print/wilson/CMakeLists.txt index 2c5b6069e0..359ebc1d57 100644 --- a/ydb/library/pretty_types_print/wilson/CMakeLists.txt +++ b/ydb/library/pretty_types_print/wilson/CMakeLists.txt @@ -11,7 +11,7 @@ add_library(library-pretty_types_print-wilson) target_link_libraries(library-pretty_types_print-wilson PUBLIC contrib-libs-cxxsupp yutil - ydb-library-wilson + cpp-actors-wilson ) target_sources(library-pretty_types_print-wilson PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/pretty_types_print/wilson/out.cpp diff --git a/ydb/library/pretty_types_print/wilson/out.cpp b/ydb/library/pretty_types_print/wilson/out.cpp index 9b8c60f981..fdae4e0d11 100644 --- a/ydb/library/pretty_types_print/wilson/out.cpp +++ b/ydb/library/pretty_types_print/wilson/out.cpp @@ -1,4 +1,4 @@ -#include <ydb/library/wilson/wilson_trace.h> +#include <library/cpp/actors/wilson/wilson_trace.h> #include <util/stream/output.h> diff --git a/ydb/library/wilson/CMakeLists.txt b/ydb/library/wilson/CMakeLists.txt deleted file mode 100644 index 08855a190c..0000000000 --- a/ydb/library/wilson/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ - -# This file was gererated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(ydb-library-wilson INTERFACE) -target_link_libraries(ydb-library-wilson INTERFACE - contrib-libs-cxxsupp - yutil - cpp-actors-wilson -) diff --git a/ydb/library/wilson/wilson_event.h b/ydb/library/wilson/wilson_event.h deleted file mode 100644 index e092a2cd97..0000000000 --- a/ydb/library/wilson/wilson_event.h +++ /dev/null @@ -1,2 +0,0 @@ -#pragma once -#include <library/cpp/actors/wilson/wilson_event.h> diff --git a/ydb/library/wilson/wilson_trace.h b/ydb/library/wilson/wilson_trace.h deleted file mode 100644 index 8c27c41fbd..0000000000 --- a/ydb/library/wilson/wilson_trace.h +++ /dev/null @@ -1,2 +0,0 @@ -#pragma once -#include <library/cpp/actors/wilson/wilson_trace.h> |