aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-06-16 14:45:38 +0300
committerAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-06-16 14:45:38 +0300
commit20d96d3531fa27af7cf21e8de55d71255b054cfd (patch)
treeab97943b524e5f222b839b4c767321538244eb36
parentc0fe73f947f62476b336002f7fa85301f8a80dee (diff)
downloadydb-20d96d3531fa27af7cf21e8de55d71255b054cfd.tar.gz
Refactor Wilson KIKIMR-15105
ref:55ce6a1b08bba785ea62b3bdfea902ef7263cf57
-rw-r--r--CMakeLists.darwin.txt6
-rw-r--r--CMakeLists.linux.txt6
-rw-r--r--library/cpp/actors/core/events_undelivered.cpp6
-rw-r--r--library/cpp/actors/interconnect/CMakeLists.darwin.txt1
-rw-r--r--library/cpp/actors/interconnect/CMakeLists.linux.txt1
-rw-r--r--library/cpp/actors/interconnect/events_local.h7
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp46
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h16
-rw-r--r--library/cpp/actors/interconnect/interconnect_handshake.cpp4
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp40
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp7
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.h2
-rw-r--r--library/cpp/actors/interconnect/packet.cpp1
-rw-r--r--library/cpp/actors/interconnect/packet.h46
-rw-r--r--library/cpp/actors/interconnect/types.h26
-rw-r--r--library/cpp/actors/protos/interconnect.proto4
-rw-r--r--library/cpp/actors/wilson/CMakeLists.txt10
-rw-r--r--library/cpp/actors/wilson/protos/CMakeLists.txt33
-rw-r--r--library/cpp/actors/wilson/protos/common.proto84
-rw-r--r--library/cpp/actors/wilson/protos/resource.proto31
-rw-r--r--library/cpp/actors/wilson/protos/trace.proto326
-rw-r--r--library/cpp/actors/wilson/wilson_event.h176
-rw-r--r--library/cpp/actors/wilson/wilson_span.cpp64
-rw-r--r--library/cpp/actors/wilson/wilson_span.h160
-rw-r--r--library/cpp/actors/wilson/wilson_trace.h186
-rw-r--r--ydb/core/actorlib_impl/test_interconnect_ut.cpp3
-rw-r--r--ydb/core/base/CMakeLists.txt6
-rw-r--r--ydb/core/base/blobstorage.h3
-rw-r--r--ydb/core/blobstorage/backpressure/defs.h2
-rw-r--r--ydb/core/blobstorage/backpressure/queue.cpp12
-rw-r--r--ydb/core/blobstorage/backpressure/queue.h19
-rw-r--r--ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp10
-rw-r--r--ydb/core/blobstorage/base/CMakeLists.txt2
-rw-r--r--ydb/core/blobstorage/base/wilson_events.h112
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy.h64
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_block.cpp15
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp16
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp26
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp6
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp4
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get.cpp12
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h1
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_impl.h2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp7
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp13
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp13
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp11
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put.cpp17
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h1
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_range.cpp16
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_request.cpp22
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_status.cpp12
-rw-r--r--ydb/core/blobstorage/dsproxy/root_cause.h1
-rw-r--r--ydb/core/blobstorage/pdisk/CMakeLists.txt2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp1
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h1
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp7
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h2
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp4
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h1
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h4
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.cpp1
-rw-r--r--ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp4
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_response.cpp10
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_response.h6
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_barrier.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_base.h3
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_dumpdb.h2
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_extr.cpp6
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_public.cpp44
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_public.h3
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readactor.cpp24
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readactor.h3
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readbatch.cpp9
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readbatch.h3
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_statdb.h2
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_stathuge.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_stattablet.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/repl/query_donor.h2
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp70
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp31
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfull.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp4
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.cpp3
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.h2
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp46
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h22
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp35
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.h11
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp14
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp6
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp2
-rw-r--r--ydb/library/pdisk_io/CMakeLists.darwin.txt2
-rw-r--r--ydb/library/pdisk_io/CMakeLists.linux.txt2
-rw-r--r--ydb/library/pdisk_io/aio.h2
-rw-r--r--ydb/library/pretty_types_print/wilson/CMakeLists.txt2
-rw-r--r--ydb/library/pretty_types_print/wilson/out.cpp2
-rw-r--r--ydb/library/wilson/CMakeLists.txt15
-rw-r--r--ydb/library/wilson/wilson_event.h2
-rw-r--r--ydb/library/wilson/wilson_trace.h2
102 files changed, 1259 insertions, 889 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index aefec545c0..dd95749e37 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -293,6 +293,8 @@ add_subdirectory(library/cpp/actors/dnsresolver)
add_subdirectory(library/cpp/actors/interconnect)
add_subdirectory(library/cpp/actors/dnscachelib)
add_subdirectory(library/cpp/actors/helpers)
+add_subdirectory(library/cpp/actors/wilson)
+add_subdirectory(library/cpp/actors/wilson/protos)
add_subdirectory(library/cpp/digest/crc32c)
add_subdirectory(contrib/libs/crcutil)
add_subdirectory(library/cpp/monlib/service/pages/tablesorter)
@@ -394,10 +396,7 @@ add_subdirectory(library/cpp/digest/argonish/internal/proxies/sse41)
add_subdirectory(library/cpp/digest/argonish/internal/proxies/ssse3)
add_subdirectory(ydb/library/pdisk_io)
add_subdirectory(ydb/library/pdisk_io/protos)
-add_subdirectory(ydb/library/wilson)
-add_subdirectory(library/cpp/actors/wilson)
add_subdirectory(ydb/library/pretty_types_print/protobuf)
-add_subdirectory(ydb/library/pretty_types_print/wilson)
add_subdirectory(ydb/public/api/protos/out)
add_subdirectory(ydb/core/mon)
add_subdirectory(library/cpp/string_utils/url)
@@ -997,6 +996,7 @@ add_subdirectory(ydb/library/keys/ut)
add_subdirectory(ydb/library/login/ut)
add_subdirectory(ydb/library/mkql_proto/ut)
add_subdirectory(ydb/library/naming_conventions/ut)
+add_subdirectory(ydb/library/pretty_types_print/wilson)
add_subdirectory(ydb/library/protobuf_printer/ut)
add_subdirectory(ydb/library/schlab/ut)
add_subdirectory(ydb/library/security/ut)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 428284f5c8..fd309c8d8b 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -373,6 +373,8 @@ add_subdirectory(library/cpp/actors/dnsresolver)
add_subdirectory(library/cpp/actors/interconnect)
add_subdirectory(library/cpp/actors/dnscachelib)
add_subdirectory(library/cpp/actors/helpers)
+add_subdirectory(library/cpp/actors/wilson)
+add_subdirectory(library/cpp/actors/wilson/protos)
add_subdirectory(library/cpp/digest/crc32c)
add_subdirectory(contrib/libs/crcutil)
add_subdirectory(library/cpp/monlib/service/pages/tablesorter)
@@ -474,10 +476,7 @@ add_subdirectory(library/cpp/digest/argonish/internal/proxies/sse41)
add_subdirectory(library/cpp/digest/argonish/internal/proxies/ssse3)
add_subdirectory(ydb/library/pdisk_io)
add_subdirectory(ydb/library/pdisk_io/protos)
-add_subdirectory(ydb/library/wilson)
-add_subdirectory(library/cpp/actors/wilson)
add_subdirectory(ydb/library/pretty_types_print/protobuf)
-add_subdirectory(ydb/library/pretty_types_print/wilson)
add_subdirectory(ydb/public/api/protos/out)
add_subdirectory(ydb/core/mon)
add_subdirectory(library/cpp/string_utils/url)
@@ -1092,6 +1091,7 @@ add_subdirectory(ydb/library/keys/ut)
add_subdirectory(ydb/library/login/ut)
add_subdirectory(ydb/library/mkql_proto/ut)
add_subdirectory(ydb/library/naming_conventions/ut)
+add_subdirectory(ydb/library/pretty_types_print/wilson)
add_subdirectory(ydb/library/protobuf_printer/ut)
add_subdirectory(ydb/library/schlab/ut)
add_subdirectory(ydb/library/security/ut)
diff --git a/library/cpp/actors/core/events_undelivered.cpp b/library/cpp/actors/core/events_undelivered.cpp
index 23deaffd10..dfd79bf96e 100644
--- a/library/cpp/actors/core/events_undelivered.cpp
+++ b/library/cpp/actors/core/events_undelivered.cpp
@@ -44,15 +44,15 @@ namespace NActors {
const TActorId recp = OnNondeliveryHolder ? OnNondeliveryHolder->Recipient : TActorId();
if (Event)
- return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, TraceId.Clone());
+ return new IEventHandle(recp, Sender, Event.Release(), updatedFlags, Cookie, &Recipient, std::move(TraceId));
else
- return new IEventHandle(Type, updatedFlags, recp, Sender, Buffer, Cookie, &Recipient, TraceId.Clone());
+ return new IEventHandle(Type, updatedFlags, recp, Sender, Buffer, Cookie, &Recipient, std::move(TraceId));
}
if (Flags & FlagTrackDelivery) {
const ui32 updatedFlags = Flags & ~(FlagTrackDelivery | FlagSubscribeOnSession | FlagGenerateUnsureUndelivered);
return new IEventHandle(Sender, Recipient, new TEvents::TEvUndelivered(Type, reason, unsure), updatedFlags,
- Cookie, nullptr, TraceId.Clone());
+ Cookie, nullptr, std::move(TraceId));
}
return nullptr;
diff --git a/library/cpp/actors/interconnect/CMakeLists.darwin.txt b/library/cpp/actors/interconnect/CMakeLists.darwin.txt
index 16d1546920..9bd0c83fce 100644
--- a/library/cpp/actors/interconnect/CMakeLists.darwin.txt
+++ b/library/cpp/actors/interconnect/CMakeLists.darwin.txt
@@ -21,6 +21,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC
cpp-actors-prof
cpp-actors-protos
cpp-actors-util
+ cpp-actors-wilson
cpp-digest-crc32c
library-cpp-json
library-cpp-lwtrace
diff --git a/library/cpp/actors/interconnect/CMakeLists.linux.txt b/library/cpp/actors/interconnect/CMakeLists.linux.txt
index 464477ce1d..c0e1b39c45 100644
--- a/library/cpp/actors/interconnect/CMakeLists.linux.txt
+++ b/library/cpp/actors/interconnect/CMakeLists.linux.txt
@@ -21,6 +21,7 @@ target_link_libraries(cpp-actors-interconnect PUBLIC
cpp-actors-prof
cpp-actors-protos
cpp-actors-util
+ cpp-actors-wilson
cpp-digest-crc32c
library-cpp-json
library-cpp-lwtrace
diff --git a/library/cpp/actors/interconnect/events_local.h b/library/cpp/actors/interconnect/events_local.h
index 8a46ffd535..474b3dba8d 100644
--- a/library/cpp/actors/interconnect/events_local.h
+++ b/library/cpp/actors/interconnect/events_local.h
@@ -7,16 +7,9 @@
#include <util/network/address.h>
#include "interconnect_stream.h"
-#include "packet.h"
#include "types.h"
namespace NActors {
- struct TProgramInfo {
- ui64 PID = 0;
- ui64 StartTime = 0;
- ui64 Serial = 0;
- };
-
enum class ENetwork : ui32 {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// local messages
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index a66ba2a154..32f015af54 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -11,20 +11,18 @@
LWTRACE_USING(ACTORLIB_PROVIDER);
namespace NActors {
- DECLARE_WILSON_EVENT(EventSentToSocket);
- DECLARE_WILSON_EVENT(EventReceivedFromSocket);
-
bool TEventOutputChannel::FeedDescriptor(TTcpPacketOutTask& task, TEventHolder& event, ui64 *weightConsumed) {
- const size_t amount = sizeof(TChannelPart) + sizeof(TEventDescr);
+ const size_t descrSize = Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1);
+ const size_t amount = sizeof(TChannelPart) + descrSize;
if (task.GetVirtualFreeAmount() < amount) {
return false;
}
- NWilson::TTraceId traceId(event.Descr.TraceId);
-// if (ctx) {
-// WILSON_TRACE(*ctx, &traceId, EventSentToSocket);
-// }
- traceId.Serialize(&event.Descr.TraceId);
+ auto& span = *event.Span;
+ span.EndOk();
+ const NWilson::TTraceId traceId(span);
+ event.Span.reset();
+
LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize());
task.Orbit.Take(event.Orbit);
@@ -33,8 +31,34 @@ namespace NActors {
TChannelPart *part = static_cast<TChannelPart*>(task.GetFreeArea());
part->Channel = ChannelId | TChannelPart::LastPartFlag;
- part->Size = sizeof(TEventDescr);
- memcpy(part + 1, &event.Descr, sizeof(TEventDescr));
+ part->Size = descrSize;
+
+ void *descr = part + 1;
+ if (Params.UseExtendedTraceFmt) {
+ auto *p = static_cast<TEventDescr2*>(descr);
+ *p = {
+ event.Descr.Type,
+ event.Descr.Flags,
+ event.Descr.Recipient,
+ event.Descr.Sender,
+ event.Descr.Cookie,
+ {},
+ event.Descr.Checksum
+ };
+ traceId.Serialize(&p->TraceId);
+ } else {
+ auto *p = static_cast<TEventDescr1*>(descr);
+ *p = {
+ event.Descr.Type,
+ event.Descr.Flags,
+ event.Descr.Recipient,
+ event.Descr.Sender,
+ event.Descr.Cookie,
+ {},
+ event.Descr.Checksum
+ };
+ }
+
task.AppendBuf(part, amount);
*weightConsumed += amount;
OutputQueueSize -= part->Size;
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index cf68cd27fd..e48d294420 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -8,7 +8,7 @@
#include <util/generic/vector.h>
#include <util/generic/map.h>
#include <util/stream/walk.h>
-#include <library/cpp/actors/wilson/wilson_event.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include "interconnect_common.h"
#include "interconnect_counters.h"
@@ -55,10 +55,18 @@ namespace NActors {
~TEventOutputChannel() {
}
- std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
+ std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TInstant now) {
TEventHolder& event = Pool.Allocate(Queue);
- const ui32 bytes = event.Fill(ev) + sizeof(TEventDescr);
+ const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1));
OutputQueueSize += bytes;
+ event.Span.emplace(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf,
+ NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue");
+ if (*event.Span) {
+ auto& span = *event.Span;
+ span
+ .Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
+ .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize));
+ }
return std::make_pair(bytes, &event);
}
@@ -102,8 +110,6 @@ namespace NActors {
};
EState State = EState::INITIAL;
- static constexpr ui16 MinimumFreeSpace = sizeof(TChannelPart) + sizeof(TEventDescr);
-
protected:
ui64 OutputQueueSize = 0;
diff --git a/library/cpp/actors/interconnect/interconnect_handshake.cpp b/library/cpp/actors/interconnect/interconnect_handshake.cpp
index 9ede998d8e..78e114a574 100644
--- a/library/cpp/actors/interconnect/interconnect_handshake.cpp
+++ b/library/cpp/actors/interconnect/interconnect_handshake.cpp
@@ -496,6 +496,7 @@ namespace NActors {
request.SetRequestModernFrame(true);
request.SetRequestAuthOnly(Common->Settings.TlsAuthOnly);
+ request.SetRequestExtendedTraceFmt(true);
SendExBlock(request, "ExRequest");
@@ -526,6 +527,7 @@ namespace NActors {
Params.Encryption = success.GetStartEncryption();
Params.UseModernFrame = success.GetUseModernFrame();
Params.AuthOnly = Params.Encryption && success.GetAuthOnly();
+ Params.UseExtendedTraceFmt = success.GetUseExtendedTraceFmt();
if (success.HasServerScopeId()) {
ParsePeerScopeId(success.GetServerScopeId());
}
@@ -681,6 +683,7 @@ namespace NActors {
Params.UseModernFrame = request.GetRequestModernFrame();
Params.AuthOnly = Params.Encryption && request.GetRequestAuthOnly() && Common->Settings.TlsAuthOnly;
+ Params.UseExtendedTraceFmt = request.GetRequestExtendedTraceFmt();
if (request.HasClientScopeId()) {
ParsePeerScopeId(request.GetClientScopeId());
@@ -706,6 +709,7 @@ namespace NActors {
}
success.SetUseModernFrame(Params.UseModernFrame);
success.SetAuthOnly(Params.AuthOnly);
+ success.SetUseExtendedTraceFmt(Params.UseExtendedTraceFmt);
SendExBlock(record, "ExReply");
// extract sender actor id (self virtual id)
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
index 65bb956e58..cbb2d16e46 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_input_session.cpp
@@ -270,13 +270,43 @@ namespace NActors {
Metrics->AddInputChannelsIncomingTraffic(channel, sizeof(part) + part.Size);
- TEventDescr descr;
+ char buffer[Max(sizeof(TEventDescr1), sizeof(TEventDescr2))];
+ auto& v1 = reinterpret_cast<TEventDescr1&>(buffer);
+ auto& v2 = reinterpret_cast<TEventDescr2&>(buffer);
if (~part.Channel & TChannelPart::LastPartFlag) {
Payload.ExtractFront(part.Size, eventData);
- } else if (part.Size != sizeof(descr)) {
+ } else if (part.Size != sizeof(v1) && part.Size != sizeof(v2)) {
LOG_CRIT_IC_SESSION("ICIS11", "incorrect last part of an event");
return DestroySession(TDisconnectReason::FormatError());
- } else if (Payload.ExtractFrontPlain(&descr, sizeof(descr))) {
+ } else if (Payload.ExtractFrontPlain(buffer, part.Size)) {
+ TEventData descr;
+
+ switch (part.Size) {
+ case sizeof(TEventDescr1):
+ descr = {
+ v1.Type,
+ v1.Flags,
+ v1.Recipient,
+ v1.Sender,
+ v1.Cookie,
+ NWilson::TTraceId(), // do not accept traces with old format
+ v1.Checksum
+ };
+ break;
+
+ case sizeof(TEventDescr2):
+ descr = {
+ v2.Type,
+ v2.Flags,
+ v2.Recipient,
+ v2.Sender,
+ v2.Cookie,
+ NWilson::TTraceId(v2.TraceId),
+ v2.Checksum
+ };
+ break;
+ }
+
Metrics->IncInputChannelsIncomingEvents(channel);
ProcessEvent(*eventData, descr);
*eventData = TRope();
@@ -286,7 +316,7 @@ namespace NActors {
}
}
- void TInputSessionTCP::ProcessEvent(TRope& data, TEventDescr& descr) {
+ void TInputSessionTCP::ProcessEvent(TRope& data, TEventData& descr) {
if (!Params.UseModernFrame || descr.Checksum) {
ui32 checksum = 0;
for (const auto&& [data, size] : data) {
@@ -305,7 +335,7 @@ namespace NActors {
MakeIntrusive<TEventSerializedData>(std::move(data), bool(descr.Flags & IEventHandle::FlagExtendedFormat)),
descr.Cookie,
Params.PeerScopeId,
- NWilson::TTraceId(descr.TraceId));
+ std::move(descr.TraceId));
if (Common->EventFilter && !Common->EventFilter->CheckIncomingEvent(*ev, Common->LocalScopeId)) {
LOG_CRIT_IC_SESSION("ICIC03", "Event dropped due to scope error LocalScopeId# %s PeerScopeId# %s Type# 0x%08" PRIx32,
ScopeIdToString(Common->LocalScopeId).data(), ScopeIdToString(Params.PeerScopeId).data(), descr.Type);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 2ded7f9f53..1602f4b8b2 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -12,8 +12,6 @@
namespace NActors {
LWTRACE_USING(ACTORLIB_PROVIDER);
- DECLARE_WILSON_EVENT(OutputQueuePush, (ui32, QueueSizeInEvents), (ui64, QueueSizeInBytes));
-
template<typename T>
T Coalesce(T&& x) {
return x;
@@ -128,7 +126,7 @@ namespace NActors {
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
- const auto [dataSize, event] = oChannel.Push(*ev);
+ const auto [dataSize, event] = oChannel.Push(*ev, TActivationContext::Now());
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
TotalOutputQueueSize += dataSize;
@@ -142,9 +140,6 @@ namespace NActors {
++NumEventsInReadyChannels;
LWTRACK(EnqueueEvent, event->Orbit, Proxy->PeerNodeId, NumEventsInReadyChannels, GetWriteBlockedTotal(), evChannel, oChannel.GetQueueSize(), oChannel.GetBufferedAmountOfData());
- WILSON_TRACE(*TlsActivationContext, &ev->TraceId, OutputQueuePush,
- QueueSizeInEvents = oChannel.GetQueueSize(),
- QueueSizeInBytes = oChannel.GetBufferedAmountOfData());
// check for overloaded queues
ui64 sendBufferDieLimit = Proxy->Common->Settings.SendBufferDieLimitInMB * ui64(1 << 20);
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.h b/library/cpp/actors/interconnect/interconnect_tcp_session.h
index 5d4a381e1f..9933bd489e 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.h
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.h
@@ -249,7 +249,7 @@ namespace NActors {
void ReceiveData();
void ProcessHeader(size_t headerLen);
void ProcessPayload(ui64& numDataBytes);
- void ProcessEvent(TRope& data, TEventDescr& descr);
+ void ProcessEvent(TRope& data, TEventData& descr);
bool ReadMore();
void ReestablishConnection(TDisconnectReason reason);
diff --git a/library/cpp/actors/interconnect/packet.cpp b/library/cpp/actors/interconnect/packet.cpp
index e2c289ed59..9ba173e330 100644
--- a/library/cpp/actors/interconnect/packet.cpp
+++ b/library/cpp/actors/interconnect/packet.cpp
@@ -13,7 +13,6 @@ ui32 TEventHolder::Fill(IEventHandle& ev) {
Descr.Recipient = ev.Recipient;
Descr.Sender = ev.Sender;
Descr.Cookie = ev.Cookie;
- ev.TraceId.Serialize(&Descr.TraceId);
ForwardRecipient = ev.GetForwardOnNondeliveryRecipient();
EventActuallySerialized = 0;
Descr.Checksum = 0;
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 4ba50a2b5f..3a6aadfb9f 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -7,21 +7,18 @@
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <library/cpp/actors/util/rope.h>
#include <library/cpp/actors/prof/tag.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <library/cpp/digest/crc32c/crc32c.h>
#include <library/cpp/lwtrace/shuttle.h>
#include <util/generic/string.h>
#include <util/generic/list.h>
+#include "types.h"
+
#ifndef FORCE_EVENT_CHECKSUM
#define FORCE_EVENT_CHECKSUM 0
#endif
-using NActors::IEventBase;
-using NActors::IEventHandle;
-using NActors::TActorId;
-using NActors::TConstIoVec;
-using NActors::TEventSerializedData;
-
Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data, size_t len) {
if constexpr (NSan::MSanIsOn()) {
const char *begin = static_cast<const char*>(data);
@@ -33,14 +30,6 @@ Y_FORCE_INLINE ui32 Crc32cExtendMSanCompatible(ui32 checksum, const void *data,
return Crc32cExtend(checksum, data, len);
}
-struct TSessionParams {
- bool Encryption = {};
- bool UseModernFrame = {};
- bool AuthOnly = {};
- TString AuthCN;
- NActors::TScopeId PeerScopeId;
-};
-
struct TTcpPacketHeader_v1 {
ui32 HeaderCRC32;
ui32 PayloadCRC32;
@@ -87,21 +76,40 @@ union TTcpPacketBuf {
} v2;
};
+struct TEventData {
+ ui32 Type;
+ ui32 Flags;
+ TActorId Recipient;
+ TActorId Sender;
+ ui64 Cookie;
+ NWilson::TTraceId TraceId;
+ ui32 Checksum;
+};
+
#pragma pack(push, 1)
-struct TEventDescr {
+struct TEventDescr1 {
+ ui32 Type;
+ ui32 Flags;
+ TActorId Recipient;
+ TActorId Sender;
+ ui64 Cookie;
+ char TraceId[16]; // obsolete trace id format
+ ui32 Checksum;
+};
+
+struct TEventDescr2 {
ui32 Type;
ui32 Flags;
TActorId Recipient;
TActorId Sender;
ui64 Cookie;
- // wilson trace id is stored as a serialized entity to avoid using complex object with prohibited copy ctor
NWilson::TTraceId::TSerializedTraceId TraceId;
ui32 Checksum;
};
#pragma pack(pop)
struct TEventHolder : TNonCopyable {
- TEventDescr Descr;
+ TEventData Descr;
TActorId ForwardRecipient;
THolder<IEventBase> Event;
TIntrusivePtr<TEventSerializedData> Buffer;
@@ -109,6 +117,7 @@ struct TEventHolder : TNonCopyable {
ui32 EventSerializedSize;
ui32 EventActuallySerialized;
mutable NLWTrace::TOrbit Orbit;
+ std::optional<NWilson::TSpan> Span;
ui32 Fill(IEventHandle& ev);
@@ -123,7 +132,7 @@ struct TEventHolder : TNonCopyable {
}
void ForwardOnNondelivery(bool unsure) {
- TEventDescr& d = Descr;
+ TEventData& d = Descr;
const TActorId& r = d.Recipient;
const TActorId& s = d.Sender;
const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr;
@@ -137,6 +146,7 @@ struct TEventHolder : TNonCopyable {
Event.Reset();
Buffer.Reset();
Orbit.Reset();
+ Span.reset();
}
};
diff --git a/library/cpp/actors/interconnect/types.h b/library/cpp/actors/interconnect/types.h
index 2662c50c22..e0965d7807 100644
--- a/library/cpp/actors/interconnect/types.h
+++ b/library/cpp/actors/interconnect/types.h
@@ -1,5 +1,9 @@
#pragma once
+#include <library/cpp/actors/core/defs.h>
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/event.h>
+
#include <util/generic/string.h>
namespace NActors {
@@ -40,4 +44,26 @@ namespace NActors {
static TVector<const char*> Reasons;
};
+ struct TProgramInfo {
+ ui64 PID = 0;
+ ui64 StartTime = 0;
+ ui64 Serial = 0;
+ };
+
+ struct TSessionParams {
+ bool Encryption = {};
+ bool UseModernFrame = {};
+ bool AuthOnly = {};
+ bool UseExtendedTraceFmt = {};
+ TString AuthCN;
+ NActors::TScopeId PeerScopeId;
+ };
+
} // NActors
+
+using NActors::IEventBase;
+using NActors::IEventHandle;
+using NActors::TActorId;
+using NActors::TConstIoVec;
+using NActors::TEventSerializedData;
+using NActors::TSessionParams;
diff --git a/library/cpp/actors/protos/interconnect.proto b/library/cpp/actors/protos/interconnect.proto
index 2e3b0d0d15..69721b1e06 100644
--- a/library/cpp/actors/protos/interconnect.proto
+++ b/library/cpp/actors/protos/interconnect.proto
@@ -70,8 +70,8 @@ message THandshakeRequest {
optional bool DoCheckCookie = 17;
optional bool RequestModernFrame = 18;
-
optional bool RequestAuthOnly = 19;
+ optional bool RequestExtendedTraceFmt = 20;
}
message THandshakeSuccess {
@@ -92,8 +92,8 @@ message THandshakeSuccess {
optional TScopeId ServerScopeId = 10;
optional bool UseModernFrame = 11;
-
optional bool AuthOnly = 12;
+ optional bool UseExtendedTraceFmt = 13;
}
message THandshakeReply {
diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt
index 03d8c542ff..65b2e32d87 100644
--- a/library/cpp/actors/wilson/CMakeLists.txt
+++ b/library/cpp/actors/wilson/CMakeLists.txt
@@ -7,9 +7,13 @@
-add_library(cpp-actors-wilson INTERFACE)
-target_link_libraries(cpp-actors-wilson INTERFACE
+add_library(cpp-actors-wilson)
+target_link_libraries(cpp-actors-wilson PUBLIC
contrib-libs-cxxsupp
yutil
- cpp-string_utils-base64
+ cpp-actors-core
+ actors-wilson-protos
+)
+target_sources(cpp-actors-wilson PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/wilson_span.cpp
)
diff --git a/library/cpp/actors/wilson/protos/CMakeLists.txt b/library/cpp/actors/wilson/protos/CMakeLists.txt
new file mode 100644
index 0000000000..a7cc2e94ff
--- /dev/null
+++ b/library/cpp/actors/wilson/protos/CMakeLists.txt
@@ -0,0 +1,33 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(actors-wilson-protos)
+target_link_libraries(actors-wilson-protos PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-protobuf
+)
+target_proto_messages(actors-wilson-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/common.proto
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/resource.proto
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/wilson/protos/trace.proto
+)
+target_proto_addincls(actors-wilson-protos
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(actors-wilson-protos
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
diff --git a/library/cpp/actors/wilson/protos/common.proto b/library/cpp/actors/wilson/protos/common.proto
new file mode 100644
index 0000000000..8562ee6d1e
--- /dev/null
+++ b/library/cpp/actors/wilson/protos/common.proto
@@ -0,0 +1,84 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.common.v1;
+
+// AnyValue is used to represent any type of attribute value. AnyValue may contain a
+// primitive value such as a string or integer or it may contain an arbitrary nested
+// object containing arrays, key-value lists and primitives.
+message AnyValue {
+ // The value is one of the listed fields. It is valid for all values to be unspecified
+ // in which case this AnyValue is considered to be "empty".
+ oneof value {
+ string string_value = 1;
+ bool bool_value = 2;
+ int64 int_value = 3;
+ double double_value = 4;
+ ArrayValue array_value = 5;
+ KeyValueList kvlist_value = 6;
+ bytes bytes_value = 7;
+ }
+}
+
+// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message
+// since oneof in AnyValue does not allow repeated fields.
+message ArrayValue {
+ // Array of values. The array may be empty (contain 0 elements).
+ repeated AnyValue values = 1;
+}
+
+// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message
+// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need
+// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to
+// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches
+// are semantically equivalent.
+message KeyValueList {
+ // A collection of key/value pairs of key-value pairs. The list may be empty (may
+ // contain 0 elements).
+ // The keys MUST be unique (it is not allowed to have more than one
+ // value with the same key).
+ repeated KeyValue values = 1;
+}
+
+// KeyValue is a key-value pair that is used to store Span attributes, Link
+// attributes, etc.
+message KeyValue {
+ string key = 1;
+ AnyValue value = 2;
+}
+
+// InstrumentationLibrary is a message representing the instrumentation library information
+// such as the fully qualified name and version.
+// InstrumentationLibrary is wire-compatible with InstrumentationScope for binary
+// Protobuf format.
+// This message is deprecated and will be removed on June 15, 2022.
+message InstrumentationLibrary {
+ option deprecated = true;
+
+ // An empty instrumentation library name means the name is unknown.
+ string name = 1;
+ string version = 2;
+}
+
+// InstrumentationScope is a message representing the instrumentation scope information
+// such as the fully qualified name and version.
+message InstrumentationScope {
+ // An empty instrumentation scope name means the name is unknown.
+ string name = 1;
+ string version = 2;
+ repeated KeyValue attributes = 3;
+ uint32 dropped_attributes_count = 4;
+}
diff --git a/library/cpp/actors/wilson/protos/resource.proto b/library/cpp/actors/wilson/protos/resource.proto
new file mode 100644
index 0000000000..752bf287ea
--- /dev/null
+++ b/library/cpp/actors/wilson/protos/resource.proto
@@ -0,0 +1,31 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.resource.v1;
+
+import "library/cpp/actors/wilson/protos/common.proto";
+
+// Resource information.
+message Resource {
+ // Set of attributes that describe the resource.
+ // Attribute keys MUST be unique (it is not allowed to have more than one
+ // attribute with the same key).
+ repeated opentelemetry.proto.common.v1.KeyValue attributes = 1;
+
+ // dropped_attributes_count is the number of dropped attributes. If the value is 0, then
+ // no attributes were dropped.
+ uint32 dropped_attributes_count = 2;
+}
diff --git a/library/cpp/actors/wilson/protos/trace.proto b/library/cpp/actors/wilson/protos/trace.proto
new file mode 100644
index 0000000000..0b645cf8ad
--- /dev/null
+++ b/library/cpp/actors/wilson/protos/trace.proto
@@ -0,0 +1,326 @@
+// Copyright 2019, OpenTelemetry Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package opentelemetry.proto.trace.v1;
+
+import "library/cpp/actors/wilson/protos/common.proto";
+import "library/cpp/actors/wilson/protos/resource.proto";
+
+// TracesData represents the traces data that can be stored in a persistent storage,
+// OR can be embedded by other protocols that transfer OTLP traces data but do
+// not implement the OTLP protocol.
+//
+// The main difference between this message and collector protocol is that
+// in this message there will not be any "control" or "metadata" specific to
+// OTLP protocol.
+//
+// When new fields are added into this message, the OTLP request MUST be updated
+// as well.
+message TracesData {
+ // An array of ResourceSpans.
+ // For data coming from a single resource this array will typically contain
+ // one element. Intermediary nodes that receive data from multiple origins
+ // typically batch the data before forwarding further and in that case this
+ // array will contain multiple elements.
+ repeated ResourceSpans resource_spans = 1;
+}
+
+// A collection of ScopeSpans from a Resource.
+message ResourceSpans {
+ // The resource for the spans in this message.
+ // If this field is not set then no resource info is known.
+ opentelemetry.proto.resource.v1.Resource resource = 1;
+
+ // A list of ScopeSpans that originate from a resource.
+ repeated ScopeSpans scope_spans = 2;
+
+ // A list of InstrumentationLibrarySpans that originate from a resource.
+ // This field is deprecated and will be removed after grace period expires on June 15, 2022.
+ //
+ // During the grace period the following rules SHOULD be followed:
+ //
+ // For Binary Protobufs
+ // ====================
+ // Binary Protobuf senders SHOULD NOT set instrumentation_library_spans. Instead
+ // scope_spans SHOULD be set.
+ //
+ // Binary Protobuf receivers SHOULD check if instrumentation_library_spans is set
+ // and scope_spans is not set then the value in instrumentation_library_spans
+ // SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans.
+ // If scope_spans is set then instrumentation_library_spans SHOULD be ignored.
+ //
+ // For JSON
+ // ========
+ // JSON senders that set instrumentation_library_spans field MAY also set
+ // scope_spans to carry the same spans, essentially double-publishing the same data.
+ // Such double-publishing MAY be controlled by a user-settable option.
+ // If double-publishing is not used then the senders SHOULD set scope_spans and
+ // SHOULD NOT set instrumentation_library_spans.
+ //
+ // JSON receivers SHOULD check if instrumentation_library_spans is set and
+ // scope_spans is not set then the value in instrumentation_library_spans
+ // SHOULD be used instead by converting InstrumentationLibrarySpans into ScopeSpans.
+ // If scope_spans is set then instrumentation_library_spans field SHOULD be ignored.
+ repeated InstrumentationLibrarySpans instrumentation_library_spans = 1000 [deprecated = true];
+
+ // This schema_url applies to the data in the "resource" field. It does not apply
+ // to the data in the "scope_spans" field which have their own schema_url field.
+ string schema_url = 3;
+}
+
+// A collection of Spans produced by an InstrumentationScope.
+message ScopeSpans {
+ // The instrumentation scope information for the spans in this message.
+ // Semantically when InstrumentationScope isn't set, it is equivalent with
+ // an empty instrumentation scope name (unknown).
+ opentelemetry.proto.common.v1.InstrumentationScope scope = 1;
+
+ // A list of Spans that originate from an instrumentation scope.
+ repeated Span spans = 2;
+
+ // This schema_url applies to all spans and span events in the "spans" field.
+ string schema_url = 3;
+}
+
+// A collection of Spans produced by an InstrumentationLibrary.
+// InstrumentationLibrarySpans is wire-compatible with ScopeSpans for binary
+// Protobuf format.
+// This message is deprecated and will be removed on June 15, 2022.
+message InstrumentationLibrarySpans {
+ option deprecated = true;
+
+ // The instrumentation library information for the spans in this message.
+ // Semantically when InstrumentationLibrary isn't set, it is equivalent with
+ // an empty instrumentation library name (unknown).
+ opentelemetry.proto.common.v1.InstrumentationLibrary instrumentation_library = 1;
+
+ // A list of Spans that originate from an instrumentation library.
+ repeated Span spans = 2;
+
+ // This schema_url applies to all spans and span events in the "spans" field.
+ string schema_url = 3;
+}
+
+// Span represents a single operation within a trace. Spans can be
+// nested to form a trace tree. Spans may also be linked to other spans
+// from the same or different trace and form graphs. Often, a trace
+// contains a root span that describes the end-to-end latency, and one
+// or more subspans for its sub-operations. A trace can also contain
+// multiple root spans, or none at all. Spans do not need to be
+// contiguous - there may be gaps or overlaps between spans in a trace.
+//
+// The next available field id is 17.
+message Span {
+ // A unique identifier for a trace. All spans from the same trace share
+ // the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes
+ // is considered invalid.
+ //
+ // This field is semantically required. Receiver should generate new
+ // random trace_id if empty or invalid trace_id was received.
+ //
+ // This field is required.
+ bytes trace_id = 1;
+
+ // A unique identifier for a span within a trace, assigned when the span
+ // is created. The ID is an 8-byte array. An ID with all zeroes is considered
+ // invalid.
+ //
+ // This field is semantically required. Receiver should generate new
+ // random span_id if empty or invalid span_id was received.
+ //
+ // This field is required.
+ bytes span_id = 2;
+
+ // trace_state conveys information about request position in multiple distributed tracing graphs.
+ // It is a trace_state in w3c-trace-context format: https://www.w3.org/TR/trace-context/#tracestate-header
+ // See also https://github.com/w3c/distributed-tracing for more details about this field.
+ string trace_state = 3;
+
+ // The `span_id` of this span's parent span. If this is a root span, then this
+ // field must be empty. The ID is an 8-byte array.
+ bytes parent_span_id = 4;
+
+ // A description of the span's operation.
+ //
+ // For example, the name can be a qualified method name or a file name
+ // and a line number where the operation is called. A best practice is to use
+ // the same display name at the same call point in an application.
+ // This makes it easier to correlate spans in different traces.
+ //
+ // This field is semantically required to be set to non-empty string.
+ // Empty value is equivalent to an unknown span name.
+ //
+ // This field is required.
+ string name = 5;
+
+ // SpanKind is the type of span. Can be used to specify additional relationships between spans
+ // in addition to a parent/child relationship.
+ enum SpanKind {
+ // Unspecified. Do NOT use as default.
+ // Implementations MAY assume SpanKind to be INTERNAL when receiving UNSPECIFIED.
+ SPAN_KIND_UNSPECIFIED = 0;
+
+ // Indicates that the span represents an internal operation within an application,
+ // as opposed to an operation happening at the boundaries. Default value.
+ SPAN_KIND_INTERNAL = 1;
+
+ // Indicates that the span covers server-side handling of an RPC or other
+ // remote network request.
+ SPAN_KIND_SERVER = 2;
+
+ // Indicates that the span describes a request to some remote service.
+ SPAN_KIND_CLIENT = 3;
+
+ // Indicates that the span describes a producer sending a message to a broker.
+ // Unlike CLIENT and SERVER, there is often no direct critical path latency relationship
+ // between producer and consumer spans. A PRODUCER span ends when the message was accepted
+ // by the broker while the logical processing of the message might span a much longer time.
+ SPAN_KIND_PRODUCER = 4;
+
+ // Indicates that the span describes consumer receiving a message from a broker.
+ // Like the PRODUCER kind, there is often no direct critical path latency relationship
+ // between producer and consumer spans.
+ SPAN_KIND_CONSUMER = 5;
+ }
+
+ // Distinguishes between spans generated in a particular context. For example,
+ // two spans with the same name may be distinguished using `CLIENT` (caller)
+ // and `SERVER` (callee) to identify queueing latency associated with the span.
+ SpanKind kind = 6;
+
+ // start_time_unix_nano is the start time of the span. On the client side, this is the time
+ // kept by the local machine where the span execution starts. On the server side, this
+ // is the time when the server's application handler starts running.
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
+ //
+ // This field is semantically required and it is expected that end_time >= start_time.
+ fixed64 start_time_unix_nano = 7;
+
+ // end_time_unix_nano is the end time of the span. On the client side, this is the time
+ // kept by the local machine where the span execution ends. On the server side, this
+ // is the time when the server application handler stops running.
+ // Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
+ //
+ // This field is semantically required and it is expected that end_time >= start_time.
+ fixed64 end_time_unix_nano = 8;
+
+ // attributes is a collection of key/value pairs. Note, global attributes
+ // like server name can be set using the resource API. Examples of attributes:
+ //
+ // "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36"
+ // "/http/server_latency": 300
+ // "abc.com/myattribute": true
+ // "abc.com/score": 10.239
+ //
+ // The OpenTelemetry API specification further restricts the allowed value types:
+ // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/common/common.md#attributes
+ // Attribute keys MUST be unique (it is not allowed to have more than one
+ // attribute with the same key).
+ repeated opentelemetry.proto.common.v1.KeyValue attributes = 9;
+
+ // dropped_attributes_count is the number of attributes that were discarded. Attributes
+ // can be discarded because their keys are too long or because there are too many
+ // attributes. If this value is 0, then no attributes were dropped.
+ uint32 dropped_attributes_count = 10;
+
+ // Event is a time-stamped annotation of the span, consisting of user-supplied
+ // text description and key-value pairs.
+ message Event {
+ // time_unix_nano is the time the event occurred.
+ fixed64 time_unix_nano = 1;
+
+ // name of the event.
+ // This field is semantically required to be set to non-empty string.
+ string name = 2;
+
+ // attributes is a collection of attribute key/value pairs on the event.
+ // Attribute keys MUST be unique (it is not allowed to have more than one
+ // attribute with the same key).
+ repeated opentelemetry.proto.common.v1.KeyValue attributes = 3;
+
+ // dropped_attributes_count is the number of dropped attributes. If the value is 0,
+ // then no attributes were dropped.
+ uint32 dropped_attributes_count = 4;
+ }
+
+ // events is a collection of Event items.
+ repeated Event events = 11;
+
+ // dropped_events_count is the number of dropped events. If the value is 0, then no
+ // events were dropped.
+ uint32 dropped_events_count = 12;
+
+ // A pointer from the current span to another span in the same trace or in a
+ // different trace. For example, this can be used in batching operations,
+ // where a single batch handler processes multiple requests from different
+ // traces or when the handler receives a request from a different project.
+ message Link {
+ // A unique identifier of a trace that this linked span is part of. The ID is a
+ // 16-byte array.
+ bytes trace_id = 1;
+
+ // A unique identifier for the linked span. The ID is an 8-byte array.
+ bytes span_id = 2;
+
+ // The trace_state associated with the link.
+ string trace_state = 3;
+
+ // attributes is a collection of attribute key/value pairs on the link.
+ // Attribute keys MUST be unique (it is not allowed to have more than one
+ // attribute with the same key).
+ repeated opentelemetry.proto.common.v1.KeyValue attributes = 4;
+
+ // dropped_attributes_count is the number of dropped attributes. If the value is 0,
+ // then no attributes were dropped.
+ uint32 dropped_attributes_count = 5;
+ }
+
+ // links is a collection of Links, which are references from this span to a span
+ // in the same or different trace.
+ repeated Link links = 13;
+
+ // dropped_links_count is the number of dropped links after the maximum size was
+ // enforced. If this value is 0, then no links were dropped.
+ uint32 dropped_links_count = 14;
+
+ // An optional final status for this span. Semantically when Status isn't set, it means
+ // span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0).
+ Status status = 15;
+}
+
+// The Status type defines a logical error model that is suitable for different
+// programming environments, including REST APIs and RPC APIs.
+message Status {
+ reserved 1;
+
+ // A developer-facing human readable error message.
+ string message = 2;
+
+ // For the semantics of status codes see
+ // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#set-status
+ enum StatusCode {
+ // The default status.
+ STATUS_CODE_UNSET = 0;
+ // The Span has been validated by an Application developers or Operator to have
+ // completed successfully.
+ STATUS_CODE_OK = 1;
+ // The Span contains an error.
+ STATUS_CODE_ERROR = 2;
+ };
+
+ // The status code.
+ StatusCode code = 3;
+}
diff --git a/library/cpp/actors/wilson/wilson_event.h b/library/cpp/actors/wilson/wilson_event.h
index 7d89c33b51..4b6a7612c0 100644
--- a/library/cpp/actors/wilson/wilson_event.h
+++ b/library/cpp/actors/wilson/wilson_event.h
@@ -3,179 +3,19 @@
#include "wilson_trace.h"
#include <library/cpp/string_utils/base64/base64.h>
-
+#include <library/cpp/actors/core/actor.h>
#include <library/cpp/actors/core/log.h>
namespace NWilson {
-#if !defined(_win_)
-// works only for those compilers, who trait C++ as ISO IEC 14882, not their own standard
-
-#define __UNROLL_PARAMS_8(N, F, X, ...) \
- F(X, N - 8) \
- __UNROLL_PARAMS_7(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_7(N, F, X, ...) \
- F(X, N - 7) \
- __UNROLL_PARAMS_6(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_6(N, F, X, ...) \
- F(X, N - 6) \
- __UNROLL_PARAMS_5(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_5(N, F, X, ...) \
- F(X, N - 5) \
- __UNROLL_PARAMS_4(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_4(N, F, X, ...) \
- F(X, N - 4) \
- __UNROLL_PARAMS_3(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_3(N, F, X, ...) \
- F(X, N - 3) \
- __UNROLL_PARAMS_2(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_2(N, F, X, ...) \
- F(X, N - 2) \
- __UNROLL_PARAMS_1(N, F, ##__VA_ARGS__)
-#define __UNROLL_PARAMS_1(N, F, X) F(X, N - 1)
-#define __UNROLL_PARAMS_0(N, F)
-#define __EX(...) __VA_ARGS__
-#define __NUM_PARAMS(...) __NUM_PARAMS_SELECT_N(__VA_ARGS__, __NUM_PARAMS_SEQ)
-#define __NUM_PARAMS_SELECT_N(...) __EX(__NUM_PARAMS_SELECT(__VA_ARGS__))
-#define __NUM_PARAMS_SELECT(X, _1, _2, _3, _4, _5, _6, _7, _8, N, ...) N
-#define __NUM_PARAMS_SEQ 8, 7, 6, 5, 4, 3, 2, 1, 0, ERROR
-#define __CAT(X, Y) X##Y
-#define __UNROLL_PARAMS_N(N, F, ...) __EX(__CAT(__UNROLL_PARAMS_, N)(N, F, ##__VA_ARGS__))
-#define __UNROLL_PARAMS(F, ...) __UNROLL_PARAMS_N(__NUM_PARAMS(X, ##__VA_ARGS__), F, ##__VA_ARGS__)
-#define __EX2(F, X, INDEX) __INVOKE(F, __EX X, INDEX)
-#define __INVOKE(F, ...) F(__VA_ARGS__)
-
-#define __DECLARE_PARAM(X, INDEX) __EX2(__DECLARE_PARAM_X, X, INDEX)
-#define __DECLARE_PARAM_X(TYPE, NAME, INDEX) \
- static const struct T##NAME##Param \
- : ::NWilson::TParamBinder<INDEX, TYPE> { \
- T##NAME##Param() { \
- } \
- using ::NWilson::TParamBinder<INDEX, TYPE>::operator=; \
- } NAME;
-
-#define __TUPLE_PARAM(X, INDEX) __EX2(__TUPLE_PARAM_X, X, INDEX)
-#define __TUPLE_PARAM_X(TYPE, NAME, INDEX) TYPE,
-
-#define __OUTPUT_PARAM(X, INDEX) __EX2(__OUTPUT_PARAM_X, X, INDEX)
-#define __OUTPUT_PARAM_X(TYPE, NAME, INDEX) str << (INDEX ? ", " : "") << #NAME << "# " << std::get<INDEX>(ParamPack);
-
-#define __FILL_PARAM(P, INDEX) \
- do { \
- const auto& boundParam = (NParams::P); \
- boundParam.Apply(event.ParamPack); \
- } while (false);
-
-#define DECLARE_WILSON_EVENT(EVENT_NAME, ...) \
- namespace N##EVENT_NAME##Params { \
- __UNROLL_PARAMS(__DECLARE_PARAM, ##__VA_ARGS__) \
- \
- using TParamPack = std::tuple< \
- __UNROLL_PARAMS(__TUPLE_PARAM, ##__VA_ARGS__) char>; \
- } \
- struct T##EVENT_NAME { \
- using TParamPack = N##EVENT_NAME##Params::TParamPack; \
- TParamPack ParamPack; \
- \
- void Output(IOutputStream& str) { \
- str << #EVENT_NAME << "{"; \
- __UNROLL_PARAMS(__OUTPUT_PARAM, ##__VA_ARGS__) \
- str << "}"; \
- } \
- };
-
- template <size_t INDEX, typename T>
- class TBoundParam {
- mutable T Value;
-
- public:
- TBoundParam(T&& value)
- : Value(std::move(value))
- {
- }
-
- template <typename TParamPack>
- void Apply(TParamPack& pack) const {
- std::get<INDEX>(pack) = std::move(Value);
- }
- };
-
- template <size_t INDEX, typename T>
- struct TParamBinder {
- template <typename TValue>
- TBoundParam<INDEX, T> operator=(const TValue& value) const {
- return TBoundParam<INDEX, T>(TValue(value));
- }
-
- template <typename TValue>
- TBoundParam<INDEX, T> operator=(TValue&& value) const {
- return TBoundParam<INDEX, T>(std::move(value));
- }
- };
-
-// generate wilson event having parent TRACE_ID and span TRACE_ID to become parent of logged event
-#define WILSON_TRACE(CTX, TRACE_ID, EVENT_NAME, ...) \
- if (::NWilson::TraceEnabled(CTX)) { \
- ::NWilson::TTraceId* __traceId = (TRACE_ID); \
- if (__traceId && *__traceId) { \
- TInstant now = Now(); \
- T##EVENT_NAME event; \
- namespace NParams = N##EVENT_NAME##Params; \
- __UNROLL_PARAMS(__FILL_PARAM, ##__VA_ARGS__) \
- ::NWilson::TraceEvent((CTX), __traceId, event, now); \
- } \
- }
- inline ui32 GetNodeId(const NActors::TActorSystem& actorSystem) {
- return actorSystem.NodeId;
+ // stub for NBS
+ template<typename TActorSystem>
+ inline bool TraceEnabled(const TActorSystem&) {
+ return false;
}
- inline ui32 GetNodeId(const NActors::TActivationContext& ac) {
- return GetNodeId(*ac.ExecutorThread.ActorSystem);
- }
-
- constexpr ui32 WilsonComponentId = 430; // kikimrservices: wilson
-
- template <typename TActorSystem>
- bool TraceEnabled(const TActorSystem& ctx) {
- const auto* loggerSettings = ctx.LoggerSettings();
- return loggerSettings && loggerSettings->Satisfies(NActors::NLog::PRI_DEBUG, WilsonComponentId);
- }
-
- template <typename TActorSystem, typename TEvent>
- void TraceEvent(const TActorSystem& actorSystem, TTraceId* traceId, TEvent&& event, TInstant timestamp) {
- // ensure that we are not using obsolete TraceId
- traceId->CheckConsistency();
-
- // store parent id (for logging) and generate child trace id
- TTraceId parentTraceId(std::move(*traceId));
- *traceId = parentTraceId.Span();
-
- // create encoded string buffer containing timestamp
- const ui64 timestampValue = timestamp.GetValue();
- const size_t base64size = Base64EncodeBufSize(sizeof(timestampValue));
- char base64[base64size];
- char* end = Base64Encode(base64, reinterpret_cast<const ui8*>(&timestampValue), sizeof(timestampValue));
-
- // cut trailing padding character to save some space
- Y_VERIFY(end > base64 && end[-1] == '=');
- --end;
-
- // generate log record
- TString finalMessage;
- TStringOutput s(finalMessage);
- s << GetNodeId(actorSystem) << " " << TStringBuf(base64, end) << " ";
- traceId->Output(s, parentTraceId);
- s << " ";
- event.Output(s);
-
- // output wilson event FIXME: special facility for wilson events w/binary serialization
- NActors::MemLogAdapter(actorSystem, NActors::NLog::PRI_DEBUG, WilsonComponentId, std::move(finalMessage));
- }
-
-#else
-
-#define DECLARE_WILSON_EVENT(...)
-#define WILSON_TRACE(...)
-#endif
+ template<typename TActorSystem, typename TEvent>
+ inline void TraceEvent(const TActorSystem&, TTraceId*, TEvent&&, TInstant)
+ {}
} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp
new file mode 100644
index 0000000000..6b6ea03ccf
--- /dev/null
+++ b/library/cpp/actors/wilson/wilson_span.cpp
@@ -0,0 +1,64 @@
+#include "wilson_span.h"
+#include <library/cpp/actors/core/log.h>
+#include <google/protobuf/text_format.h>
+
+namespace NWilson {
+
+ using namespace NActors;
+
+ void SerializeValue(TAttributeValue value, NCommonProto::AnyValue *pb) {
+ switch (value.index()) {
+ case 0:
+ pb->set_string_value(std::get<0>(std::move(value)));
+ break;
+
+ case 1:
+ pb->set_bool_value(std::get<1>(value));
+ break;
+
+ case 2:
+ pb->set_int_value(std::get<2>(value));
+ break;
+
+ case 3:
+ pb->set_double_value(std::get<3>(std::move(value)));
+ break;
+
+ case 4: {
+ auto *array = pb->mutable_array_value();
+ for (auto&& item : std::get<4>(std::move(value))) {
+ SerializeValue(std::move(item), array->add_values());
+ }
+ break;
+ }
+
+ case 5: {
+ auto *kv = pb->mutable_kvlist_value();
+ for (auto&& [key, value] : std::get<5>(std::move(value))) {
+ SerializeKeyValue(std::move(key), std::move(value), kv->add_values());
+ }
+ break;
+ }
+
+ case 6:
+ pb->set_bytes_value(std::get<6>(std::move(value)));
+ break;
+ }
+ }
+
+ void SerializeKeyValue(TString key, TAttributeValue value, NCommonProto::KeyValue *pb) {
+ pb->set_key(std::move(key));
+ SerializeValue(std::move(value), pb->mutable_value());
+ }
+
+ void TSpan::Send() {
+ if (TlsActivationContext) {
+ NProtoBuf::TextFormat::Printer p;
+ p.SetSingleLineMode(true);
+ TString s;
+ p.PrintToString(Data->Span, &s);
+ LOG_DEBUG_S(*TlsActivationContext, 430 /* WILSON */, s);
+ }
+ }
+
+} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_span.h b/library/cpp/actors/wilson/wilson_span.h
new file mode 100644
index 0000000000..c2de2f0b68
--- /dev/null
+++ b/library/cpp/actors/wilson/wilson_span.h
@@ -0,0 +1,160 @@
+#pragma once
+
+#include <library/cpp/actors/wilson/protos/trace.pb.h>
+#include <util/generic/hash.h>
+#include <util/datetime/cputimer.h>
+
+#include "wilson_trace.h"
+
+namespace NWilson {
+
+ enum class ERelation {
+ FollowsFrom,
+ ChildOf,
+ };
+
+ namespace NTraceProto = opentelemetry::proto::trace::v1;
+ namespace NCommonProto = opentelemetry::proto::common::v1;
+
+ struct TArrayValue;
+ struct TKeyValueList;
+ struct TBytes;
+
+ using TAttributeValue = std::variant<
+ TString,
+ bool,
+ i64,
+ double,
+ TArrayValue,
+ TKeyValueList,
+ TBytes
+ >;
+
+ struct TArrayValue : std::vector<TAttributeValue> {};
+ struct TKeyValueList : THashMap<TString, TAttributeValue> {};
+ struct TBytes : TString {};
+
+ void SerializeKeyValue(TString key, TAttributeValue value, NCommonProto::KeyValue *pb);
+
+ class TSpan {
+ struct TData {
+ const TInstant StartTime;
+ const ui64 StartCycles;
+ const TTraceId TraceId;
+ NTraceProto::Span Span;
+
+ TData(TInstant startTime, ui64 startCycles, TTraceId traceId)
+ : StartTime(startTime)
+ , StartCycles(startCycles)
+ , TraceId(std::move(traceId))
+ {}
+ };
+
+ std::unique_ptr<TData> Data;
+
+ public:
+ TSpan() = default;
+ TSpan(const TSpan&) = delete;
+ TSpan(TSpan&&) = default;
+
+ TSpan(ui8 verbosity, ERelation /*relation*/, TTraceId parentId, TInstant now, std::optional<TString> name)
+ : Data(parentId ? std::make_unique<TData>(now, GetCycleCount(), parentId.Span(verbosity)) : nullptr)
+ {
+ if (*this) {
+ if (!parentId.IsRoot()) {
+ Data->Span.set_parent_span_id(parentId.GetSpanIdPtr(), parentId.GetSpanIdSize());
+ }
+ Data->Span.set_start_time_unix_nano(now.NanoSeconds());
+
+ if (name) {
+ Name(std::move(*name));
+ }
+ }
+ }
+
+ TSpan& operator =(const TSpan&) = delete;
+ TSpan& operator =(TSpan&&) = default;
+
+ operator bool() const {
+ return static_cast<bool>(Data);
+ }
+
+ TSpan& Name(TString name) {
+ if (*this) {
+ Data->Span.set_name(std::move(name));
+ }
+ return *this;
+ }
+
+ TSpan& Attribute(TString name, TAttributeValue value) {
+ if (*this) {
+ SerializeKeyValue(std::move(name), std::move(value), Data->Span.add_attributes());
+ }
+ return *this;
+ }
+
+ TSpan& Event(TString name, TKeyValueList attributes) {
+ if (*this) {
+ auto *event = Data->Span.add_events();
+ event->set_time_unix_nano(TimeUnixNano());
+ event->set_name(std::move(name));
+ for (auto&& [key, value] : attributes) {
+ SerializeKeyValue(std::move(key), std::move(value), event->add_attributes());
+ }
+ }
+ return *this;
+ }
+
+ TSpan& Link(const TTraceId& traceId, TKeyValueList attributes) {
+ if (*this) {
+ auto *link = Data->Span.add_links();
+ link->set_trace_id(traceId.GetTraceIdPtr(), traceId.GetTraceIdSize());
+ link->set_span_id(traceId.GetSpanIdPtr(), traceId.GetSpanIdSize());
+ for (auto&& [key, value] : attributes) {
+ SerializeKeyValue(std::move(key), std::move(value), link->add_attributes());
+ }
+ }
+ return *this;
+ }
+
+ void EndOk() {
+ if (*this) {
+ auto *status = Data->Span.mutable_status();
+ status->set_code(NTraceProto::Status::STATUS_CODE_OK);
+ }
+ End();
+ }
+
+ void EndError(TString error) {
+ if (*this) {
+ auto *status = Data->Span.mutable_status();
+ status->set_code(NTraceProto::Status::STATUS_CODE_ERROR);
+ status->set_message(std::move(error));
+ }
+ End();
+ }
+
+ void End() {
+ if (*this) {
+ Data->Span.set_end_time_unix_nano(TimeUnixNano());
+ Data->Span.set_trace_id(Data->TraceId.GetTraceIdPtr(), Data->TraceId.GetTraceIdSize());
+ Data->Span.set_span_id(Data->TraceId.GetSpanIdPtr(), Data->TraceId.GetSpanIdSize());
+ Send();
+ Data.reset(); // tracing finished
+ }
+ }
+
+ operator TTraceId() const {
+ return Data ? TTraceId(Data->TraceId) : TTraceId();
+ }
+
+ private:
+ void Send();
+
+ ui64 TimeUnixNano() const {
+ const TInstant now = Data->StartTime + CyclesToDuration(GetCycleCount() - Data->StartCycles);
+ return now.NanoSeconds();
+ }
+ };
+
+} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h
index cfbf93059b..9937c1f807 100644
--- a/library/cpp/actors/wilson/wilson_trace.h
+++ b/library/cpp/actors/wilson/wilson_trace.h
@@ -4,158 +4,160 @@
#include <util/stream/output.h>
#include <util/random/random.h>
+#include <util/random/fast.h>
#include <util/string/printf.h>
+#include <array>
+
namespace NWilson {
class TTraceId {
- ui64 TraceId; // Random id of topmost client request
- ui64 SpanId; // Span id of part of request currently being executed
+ using TTrace = std::array<ui64, 2>;
+
+ TTrace TraceId; // Random id of topmost client request
+ union {
+ struct {
+ ui64 SpanId : 48; // Span id of part of request currently being executed
+ ui64 Verbosity : 4;
+ ui64 TimeToLive : 12;
+ };
+ ui64 Raw;
+ };
private:
- TTraceId(ui64 traceId, ui64 spanId)
+ TTraceId(TTrace traceId, ui64 spanId, ui8 verbosity, ui32 timeToLive)
: TraceId(traceId)
- , SpanId(spanId)
{
+ SpanId = spanId;
+ Verbosity = verbosity;
+ TimeToLive = timeToLive;
}
- static ui64 GenerateTraceId() {
- ui64 traceId = 0;
- while (!traceId) {
- traceId = RandomNumber<ui64>();
+ static TTrace GenerateTraceId() {
+ for (;;) {
+ TTrace res;
+ ui32 *p = reinterpret_cast<ui32*>(res.data());
+
+ TReallyFastRng32 rng(RandomNumber<ui64>());
+ p[0] = rng();
+ p[1] = rng();
+ p[2] = rng();
+ p[3] = rng();
+
+ if (res[0] || res[1]) {
+ return res;
+ }
}
- return traceId;
}
static ui64 GenerateSpanId() {
- return RandomNumber<ui64>();
+ for (;;) {
+ if (const ui64 res = RandomNumber<ui64>(); res) { // SpanId can't be zero
+ return res;
+ }
+ }
}
public:
- using TSerializedTraceId = char[2 * sizeof(ui64)];
+ using TSerializedTraceId = char[sizeof(TTrace) + sizeof(ui64)];
public:
- TTraceId()
- : TraceId(0)
- , SpanId(0)
- {
- }
+ TTraceId(ui64) // NBS stub
+ : TTraceId()
+ {}
- explicit TTraceId(ui64 traceId)
- : TraceId(traceId)
- , SpanId(0)
- {
+ TTraceId() {
+ TraceId.fill(0);
+ Raw = 0;
}
- TTraceId(const TSerializedTraceId& in)
- : TraceId(reinterpret_cast<const ui64*>(in)[0])
- , SpanId(reinterpret_cast<const ui64*>(in)[1])
+ explicit TTraceId(TTrace traceId)
+ : TraceId(traceId)
{
+ Raw = 0;
}
// allow move semantic
TTraceId(TTraceId&& other)
: TraceId(other.TraceId)
- , SpanId(other.SpanId)
+ , Raw(other.Raw)
{
- other.TraceId = 0;
- other.SpanId = 1; // explicitly mark invalid
+ other.TraceId.fill(0);
+ }
+
+ // explicit copy
+ explicit TTraceId(const TTraceId& other)
+ : TraceId(other.TraceId)
+ , Raw(other.Raw)
+ {}
+
+ TTraceId(const TSerializedTraceId& in) {
+ auto p = reinterpret_cast<const ui64*>(in);
+ TraceId = {p[0], p[1]};
+ Raw = p[2];
+ }
+
+ void Serialize(TSerializedTraceId* out) const {
+ auto p = reinterpret_cast<ui64*>(*out);
+ p[0] = TraceId[0];
+ p[1] = TraceId[1];
+ p[2] = Raw;
}
TTraceId& operator=(TTraceId&& other) {
TraceId = other.TraceId;
- SpanId = other.SpanId;
- other.TraceId = 0;
- other.SpanId = 1; // explicitly mark invalid
+ other.TraceId.fill(0);
+ Raw = other.Raw;
return *this;
}
// do not allow implicit copy of trace id
- TTraceId(const TTraceId& other) = delete;
TTraceId& operator=(const TTraceId& other) = delete;
- static TTraceId NewTraceId() {
- return TTraceId(GenerateTraceId(), 0);
- }
-
- // create separate branch from this point
- TTraceId SeparateBranch() const {
- return Clone();
+ static TTraceId NewTraceId(ui8 verbosity, ui32 timeToLive) {
+ return TTraceId(GenerateTraceId(), 0, verbosity, timeToLive);
}
- TTraceId Clone() const {
- return TTraceId(TraceId, SpanId);
+ static TTraceId NewTraceId() { // NBS stub
+ return TTraceId();
}
- TTraceId Span() const {
- return *this ? TTraceId(TraceId, GenerateSpanId()) : TTraceId();
+ TTraceId Span(ui8 verbosity) const {
+ return *this && TimeToLive && verbosity <= Verbosity
+ ? TTraceId(TraceId, GenerateSpanId(), Verbosity, TimeToLive - 1)
+ : TTraceId();
}
- ui64 GetTraceId() const {
- return TraceId;
+ TTraceId Span() const { // compatibility stub
+ return {};
}
// Check if request tracing is enabled
explicit operator bool() const {
- return TraceId != 0;
- }
-
- // Output trace id into a string stream
- void Output(IOutputStream& s, const TTraceId& parentTraceId) const {
- union {
- ui8 buffer[3 * sizeof(ui64)];
- struct {
- ui64 traceId;
- ui64 spanId;
- ui64 parentSpanId;
- } x;
- };
-
- x.traceId = TraceId;
- x.spanId = SpanId;
- x.parentSpanId = parentTraceId.SpanId;
-
- const size_t base64size = Base64EncodeBufSize(sizeof(x));
- char base64[base64size];
- char* end = Base64Encode(base64, buffer, sizeof(x));
- s << TStringBuf(base64, end);
+ return TraceId[0] || TraceId[1];
}
- // output just span id into stream
- void OutputSpanId(IOutputStream& s) const {
- const size_t base64size = Base64EncodeBufSize(sizeof(SpanId));
- char base64[base64size];
- char* end = Base64Encode(base64, reinterpret_cast<const ui8*>(&SpanId), sizeof(SpanId));
-
- // cut trailing padding character
- Y_VERIFY(end > base64 && end[-1] == '=');
- --end;
-
- s << TStringBuf(base64, end);
- }
-
- void CheckConsistency() {
- // if TraceId is zero, then SpanId must be zero too
- Y_VERIFY_DEBUG(*this || !SpanId);
+ bool IsRoot() const {
+ return !SpanId;
}
friend bool operator==(const TTraceId& x, const TTraceId& y) {
- return x.TraceId == y.TraceId && x.SpanId == y.SpanId;
+ return x.TraceId == y.TraceId && x.Raw == y.Raw;
}
- TString ToString() const {
- return Sprintf("%" PRIu64 ":%" PRIu64, TraceId, SpanId);
+ ui8 GetVerbosity() const {
+ return Verbosity;
}
- bool IsFromSameTree(const TTraceId& other) const {
- return TraceId == other.TraceId;
- }
+ const void *GetTraceIdPtr() const { return TraceId.data(); }
+ static constexpr size_t GetTraceIdSize() { return sizeof(TTrace); }
+ const void *GetSpanIdPtr() const { return &Raw; }
+ static constexpr size_t GetSpanIdSize() { return sizeof(ui64); }
- void Serialize(TSerializedTraceId* out) const {
- ui64* p = reinterpret_cast<ui64*>(*out);
- p[0] = TraceId;
- p[1] = SpanId;
- }
+ // for compatibility with NBS
+ TTraceId Clone() const { return NWilson::TTraceId(*this); }
+ ui64 GetTraceId() const { return 0; }
+ void OutputSpanId(IOutputStream&) const {}
};
}
diff --git a/ydb/core/actorlib_impl/test_interconnect_ut.cpp b/ydb/core/actorlib_impl/test_interconnect_ut.cpp
index 0d9d3535ef..75bc226027 100644
--- a/ydb/core/actorlib_impl/test_interconnect_ut.cpp
+++ b/ydb/core/actorlib_impl/test_interconnect_ut.cpp
@@ -618,7 +618,7 @@ Y_UNIT_TEST_SUITE(TInterconnectTest) {
const_cast<TString::value_type*>(blob.data())[i] =
TString::value_type(i % 256);
- auto sentTraceId = NWilson::TTraceId::NewTraceId();
+ auto sentTraceId = NWilson::TTraceId::NewTraceId(15, 4095);
runtime.Send(new IEventHandle(edge, edge,
new TEvents::TEvBlob(blob),
@@ -630,7 +630,6 @@ Y_UNIT_TEST_SUITE(TInterconnectTest) {
UNIT_ASSERT_EQUAL(handle->Cookie, 13);
UNIT_ASSERT_EQUAL(event->Blob, blob);
UNIT_ASSERT_EQUAL((bool)handle->TraceId, true);
- UNIT_ASSERT(handle->TraceId.IsFromSameTree(sentTraceId));
}
Y_UNIT_TEST(TestAddressResolve) {
diff --git a/ydb/core/base/CMakeLists.txt b/ydb/core/base/CMakeLists.txt
index 1346c75550..e125480825 100644
--- a/ydb/core/base/CMakeLists.txt
+++ b/ydb/core/base/CMakeLists.txt
@@ -15,6 +15,7 @@ target_link_libraries(ydb-core-base PUBLIC
cpp-actors-helpers
cpp-actors-interconnect
cpp-actors-protos
+ cpp-actors-wilson
cpp-deprecated-enum_codegen
library-cpp-logger
library-cpp-lwtrace
@@ -30,8 +31,6 @@ target_link_libraries(ydb-core-base PUBLIC
ydb-library-login
ydb-library-pdisk_io
library-pretty_types_print-protobuf
- library-pretty_types_print-wilson
- ydb-library-wilson
api-protos-out
library-yql-minikql
library-cpp-resource
@@ -89,6 +88,7 @@ target_link_libraries(ydb-core-base.global PUBLIC
cpp-actors-helpers
cpp-actors-interconnect
cpp-actors-protos
+ cpp-actors-wilson
cpp-deprecated-enum_codegen
library-cpp-logger
library-cpp-lwtrace
@@ -104,8 +104,6 @@ target_link_libraries(ydb-core-base.global PUBLIC
ydb-library-login
ydb-library-pdisk_io
library-pretty_types_print-protobuf
- library-pretty_types_print-wilson
- ydb-library-wilson
api-protos-out
library-yql-minikql
library-cpp-resource
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index 58011ce1bf..c9e6504c45 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -14,8 +14,7 @@
#include <ydb/core/protos/blobstorage_config.pb.h>
#include <ydb/core/util/yverify_stream.h>
-#include <ydb/library/wilson/wilson_event.h>
-
+#include <library/cpp/actors/wilson/wilson_trace.h>
#include <library/cpp/lwtrace/shuttle.h>
#include <util/stream/str.h>
diff --git a/ydb/core/blobstorage/backpressure/defs.h b/ydb/core/blobstorage/backpressure/defs.h
index d510a3545e..d179e1ba76 100644
--- a/ydb/core/blobstorage/backpressure/defs.h
+++ b/ydb/core/blobstorage/backpressure/defs.h
@@ -7,7 +7,6 @@
#include <ydb/core/blobstorage/vdisk/common/vdisk_costmodel.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/blobstorage/base/blobstorage_events.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
#include <ydb/core/protos/blobstorage.pb.h>
#include <ydb/core/base/interconnect_channels.h>
@@ -18,4 +17,5 @@
#include <library/cpp/actors/core/mailbox.h>
#include <library/cpp/actors/core/mon.h>
#include <library/cpp/containers/intrusive_rb_tree/rb_tree.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <google/protobuf/message.h>
diff --git a/ydb/core/blobstorage/backpressure/queue.cpp b/ydb/core/blobstorage/backpressure/queue.cpp
index e1574116a0..6e29552d81 100644
--- a/ydb/core/blobstorage/backpressure/queue.cpp
+++ b/ydb/core/blobstorage/backpressure/queue.cpp
@@ -115,7 +115,7 @@ void TBlobStorageQueue::SetItemQueue(TItem& item, EItemQueue newQueue) {
}
}
-void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, IActor *actor) {
+void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui32 vdiskOrderNumber) {
const TInstant now = ctx.Now();
const bool sendMeCostSettings = now >= CostSettingsUpdate;
@@ -198,10 +198,11 @@ void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& re
++*QueueItemsSent;
// send item
- WILSON_TRACE_FROM_ACTOR(ctx, *actor, &item.TraceId, EvBlobStorageQueueForward,
- InQueueWaitingItems = GetItemsWaiting(), InQueueWaitingBytes = GetBytesWaiting());
+ item.Span.Event("SendToVDisk", {{
+ {"VDiskOrderNumber", vdiskOrderNumber}
+ }});
item.Event.SendToVDisk(ctx, remoteVDisk, item.QueueCookie, item.MsgId, item.SequenceId, sendMeCostSettings,
- item.TraceId.Clone(), ClientId, item.ProcessingTimer);
+ item.Span, ClientId, item.ProcessingTimer);
// update counters as far as item got sent
++NextMsgId;
@@ -217,6 +218,7 @@ void TBlobStorageQueue::ReplyWithError(TItem& item, NKikimrProto::EReplyStatus s
<< " cookie# " << item.Event.GetCookie()
<< " processingTime# " << processingTime);
+ item.Span.EndError(TStringBuilder() << NKikimrProto::EReplyStatus_Name(status) << ": " << errorReason);
ctx.Send(item.Event.GetSender(), item.Event.MakeErrorReply(status, errorReason, QueueDeserializedItems,
QueueDeserializedBytes), 0, item.Event.GetCookie());
@@ -247,6 +249,7 @@ bool TBlobStorageQueue::OnResponse(ui64 msgId, ui64 sequenceId, ui64 cookie, TAc
it->Event.GetByteSize(), !relevant);
InFlightLookup.erase(lookupIt);
+ it->Span.EndOk();
EraseItem(Queues.InFlight, it);
// unpause execution when InFlight queue gets empty
@@ -323,6 +326,7 @@ void TBlobStorageQueue::OnConnect() {
TBlobStorageQueue::TItemList::iterator TBlobStorageQueue::EraseItem(TItemList& queue, TItemList::iterator it) {
SetItemQueue(*it, EItemQueue::NotSet);
+ it->Span.EndError("EraseItem called");
TItemList::iterator nextIter = std::next(it);
if (Queues.Unused.size() < MaxUnusedItems) {
Queues.Unused.splice(Queues.Unused.end(), queue, it);
diff --git a/ydb/core/blobstorage/backpressure/queue.h b/ydb/core/blobstorage/backpressure/queue.h
index 996975c9c3..d59c035a58 100644
--- a/ydb/core/blobstorage/backpressure/queue.h
+++ b/ydb/core/blobstorage/backpressure/queue.h
@@ -43,7 +43,7 @@ class TBlobStorageQueue {
{
EItemQueue Queue;
TCostModel::TMessageCostEssence CostEssence;
- NWilson::TTraceId TraceId;
+ NWilson::TSpan Span;
TEventHolder Event;
ui64 MsgId;
ui64 SequenceId;
@@ -54,15 +54,16 @@ class TBlobStorageQueue {
THPTimer ProcessingTimer;
TTrackableList<TItem>::iterator Iterator;
- template<typename TPtr>
- TItem(TPtr& event, TInstant deadline,
+ template<typename TEvent>
+ TItem(TAutoPtr<TEventHandle<TEvent>>& event, TInstant deadline,
const NMonitoring::TDynamicCounters::TCounterPtr& serItems,
const NMonitoring::TDynamicCounters::TCounterPtr& serBytes,
const TBSProxyContextPtr& bspctx, ui32 interconnectChannel,
- bool local)
+ bool local, TInstant now)
: Queue(EItemQueue::NotSet)
, CostEssence(*event->Get())
- , TraceId(std::move(event->TraceId))
+ , Span(9 /*verbosity*/, NWilson::ERelation::ChildOf, std::move(event->TraceId), now, TStringBuilder()
+ << "Backpressure(" << TypeName<TEvent>() << ")")
, Event(event, serItems, serBytes, bspctx, interconnectChannel, local)
, MsgId(Max<ui64>())
, SequenceId(0)
@@ -183,7 +184,7 @@ public:
void SetItemQueue(TItem& item, EItemQueue newQueue);
- void SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, IActor *actor);
+ void SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui32 vdiskOrderNumber);
void ReplyWithError(TItem& item, NKikimrProto::EReplyStatus status, const TString& errorReason, const TActorContext& ctx);
bool Expecting(ui64 msgId, ui64 sequenceId) const;
@@ -196,13 +197,13 @@ public:
void OnConnect();
template<typename TPtr>
- void Enqueue(const TActorContext &ctx, TPtr& event, TInstant deadline, bool local) {
+ void Enqueue(const TActorContext &ctx, TPtr& event, TInstant deadline, bool local, TInstant now) {
Y_UNUSED(ctx);
TItemList::iterator newIt;
if (Queues.Unused.empty()) {
newIt = Queues.Waiting.emplace(Queues.Waiting.end(), event, deadline,
- QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local);
+ QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local, now);
++*QueueSize;
} else {
newIt = Queues.Unused.begin();
@@ -211,7 +212,7 @@ public:
TItem& item = *newIt;
item.~TItem();
new(&item) TItem(event, deadline, QueueSerializedItems, QueueSerializedBytes, BSProxyCtx,
- InterconnectChannel, local);
+ InterconnectChannel, local, now);
}
newIt->Iterator = newIt;
diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
index acdd648a53..ed38409713 100644
--- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
+++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
@@ -36,6 +36,7 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
const TVDiskIdShort VDiskIdShort;
TActorId RemoteVDisk;
TVDiskID VDiskId;
+ ui32 VDiskOrderNumber;
NKikimrBlobStorage::EVDiskQueueId QueueId;
const TDuration QueueWatchdogTimeout;
ui64 CheckReadinessCookie = 1;
@@ -108,6 +109,7 @@ private:
Y_VERIFY(info.Type.GetErasure() == GType.GetErasure());
VDiskId = info.CreateVDiskID(VDiskIdShort);
RemoteVDisk = info.GetActorId(VDiskIdShort);
+ VDiskOrderNumber = info.GetOrderNumber(VDiskIdShort);
LogPrefix = Sprintf("[%s TargetVDisk# %s Queue# %s]", SelfId().ToString().data(), VDiskId.ToString().data(), QueueName.data());
RecentGroup = info.Group;
}
@@ -123,7 +125,7 @@ private:
void Pump(const TActorContext &ctx) {
// if in 'Running' state, then send messages to VDisk
if (IsReady()) {
- Queue.SendToVDisk(ctx, RemoteVDisk, this);
+ Queue.SendToVDisk(ctx, RemoteVDisk, VDiskOrderNumber);
}
}
@@ -197,11 +199,7 @@ private:
<< " cookie# " << ev->Cookie);
if (IsReady()) {
- // trace wilson event if tracing is enabled for this request
- WILSON_TRACE_FROM_ACTOR(ctx, *this, &ev->TraceId, EvBlobStorageQueuePut,
- InQueueWaitingItems = Queue.GetItemsWaiting(), InQueueWaitingBytes = Queue.GetBytesWaiting());
-
- Queue.Enqueue(ctx, ev, deadline, RemoteVDisk.NodeId() == SelfId().NodeId());
+ Queue.Enqueue(ctx, ev, deadline, RemoteVDisk.NodeId() == SelfId().NodeId(), TActivationContext::Now());
Pump(ctx);
UpdateRequestTrackingStats(ctx);
} else {
diff --git a/ydb/core/blobstorage/base/CMakeLists.txt b/ydb/core/blobstorage/base/CMakeLists.txt
index 19c7abdfc7..2874ba8173 100644
--- a/ydb/core/blobstorage/base/CMakeLists.txt
+++ b/ydb/core/blobstorage/base/CMakeLists.txt
@@ -11,9 +11,9 @@ add_library(core-blobstorage-base)
target_link_libraries(core-blobstorage-base PUBLIC
contrib-libs-cxxsupp
yutil
+ cpp-actors-wilson
library-cpp-lwtrace
ydb-core-protos
- ydb-library-wilson
)
target_sources(core-blobstorage-base PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/base/blobstorage_vdiskid.cpp
diff --git a/ydb/core/blobstorage/base/wilson_events.h b/ydb/core/blobstorage/base/wilson_events.h
deleted file mode 100644
index 7ed9528463..0000000000
--- a/ydb/core/blobstorage/base/wilson_events.h
+++ /dev/null
@@ -1,112 +0,0 @@
-#pragma once
-
-
-#include "blobstorage_vdiskid.h"
-#include <ydb/library/wilson/wilson_event.h>
-
-#define WILSON_TRACE_FROM_ACTOR(CTX, ACTOR, TRACE_ID, EVENT_TYPE, ...) \
- WILSON_TRACE(CTX, TRACE_ID, EVENT_TYPE, \
- ActivityType = static_cast<NKikimrServices::TActivity::EType>((ACTOR).GetActivityType()), \
- ActorId = (ACTOR).SelfId(), \
- ##__VA_ARGS__);
-
-#define DECLARE_ACTOR_EVENT(EVENT_TYPE, ...) \
- DECLARE_WILSON_EVENT(EVENT_TYPE, \
- (::NKikimrServices::TActivity::EType, ActivityType), \
- (::NActors::TActorId, ActorId), \
- ##__VA_ARGS__ \
- )
-
-namespace NKikimr {
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // DSPROXY
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- DECLARE_ACTOR_EVENT(MultiGetReceived);
- DECLARE_ACTOR_EVENT(EvGetSent);
- DECLARE_ACTOR_EVENT(EvGetReceived);
- DECLARE_ACTOR_EVENT(EvVGetSent);
- DECLARE_ACTOR_EVENT(EvVGetReceived);
- DECLARE_ACTOR_EVENT(EvVGetResultSent);
- DECLARE_ACTOR_EVENT(EvVGetResultReceived, (::NWilson::TTraceId, MergedNode));
- DECLARE_ACTOR_EVENT(EvGetResultSent, (::NKikimrProto::EReplyStatus, ReplyStatus), (ui32, ResponseSize));
- DECLARE_ACTOR_EVENT(EvGetResultReceived, (::NWilson::TTraceId, MergedNode));
- DECLARE_ACTOR_EVENT(MultiGetResultSent);
-
- DECLARE_ACTOR_EVENT(RangeGetReceived);
- DECLARE_ACTOR_EVENT(RangeGetResultSent, (::NKikimrProto::EReplyStatus, ReplyStatus), (ui32, ResponseSize));
-
- DECLARE_ACTOR_EVENT(EvPutReceived, (ui32, Size), (TLogoBlobID, LogoBlobId));
- DECLARE_ACTOR_EVENT(EvVPutSent);
- DECLARE_ACTOR_EVENT(EvVPutResultReceived, (::NWilson::TTraceId, MergedNode));
- DECLARE_ACTOR_EVENT(EvPutResultSent, (::NKikimrProto::EReplyStatus, ReplyStatus));
-
- DECLARE_ACTOR_EVENT(EvDiscoverReceived, (ui32, GroupId), (TLogoBlobID, From), (TLogoBlobID, To));
- DECLARE_ACTOR_EVENT(EvDiscoverResultSent);
-
- DECLARE_ACTOR_EVENT(EvVGetBlockSent);
- DECLARE_ACTOR_EVENT(EvVGetBlockResultReceived, (::NWilson::TTraceId, MergedNode));
-
- DECLARE_ACTOR_EVENT(ReadBatcherStart);
- DECLARE_ACTOR_EVENT(ReadBatcherFinish, (::NWilson::TTraceId, MergedNode));
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // VDISK
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- DECLARE_ACTOR_EVENT(EvVPutReceived, (TVDiskID, VDiskId), (ui32, PDiskId), (ui32, VDiskSlotId));
- DECLARE_ACTOR_EVENT(EvPutIntoEmergQueue);
- DECLARE_ACTOR_EVENT(EvVPutResultSent);
- DECLARE_ACTOR_EVENT(EvVMultiPutResultSent);
-
- DECLARE_ACTOR_EVENT(EvChunkReadSent, (ui32, ChunkIdx), (ui32, Offset), (ui32, Size), (void*, YardCookie));
-
- DECLARE_ACTOR_EVENT(EvChunkReadResultReceived, (void*, YardCookie), (::NWilson::TTraceId, MergedNode));
-
- DECLARE_ACTOR_EVENT(EvHullWriteHugeBlobSent);
- DECLARE_ACTOR_EVENT(EvHullLogHugeBlobReceived);
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // BS_QUEUE
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- DECLARE_ACTOR_EVENT(EvBlobStorageQueuePut, (ui64, InQueueWaitingItems), (ui64, InQueueWaitingBytes));
- DECLARE_ACTOR_EVENT(EvBlobStorageQueueForward, (ui64, InQueueWaitingItems), (ui64, InQueueWaitingBytes));
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // SKELETON FRONT
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- DECLARE_ACTOR_EVENT(EvSkeletonFrontEnqueue);
- DECLARE_ACTOR_EVENT(EvSkeletonFrontProceed);
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // YARD
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- DECLARE_WILSON_EVENT(EvChunkReadReceived, (ui32, ChunkIdx), (ui32, Offset), (ui32, Size));
- DECLARE_WILSON_EVENT(AsyncReadScheduled, (ui64, DiskOffset), (ui32, Size));
-
- DECLARE_WILSON_EVENT(EvChunkWriteReceived, (ui32, ChunkIdx), (ui32, Offset), (ui32, Size));
-
- DECLARE_WILSON_EVENT(EvLogReceived);
- DECLARE_WILSON_EVENT(EnqueueLogWrite);
- DECLARE_WILSON_EVENT(RouteLogWrite);
-
- DECLARE_WILSON_EVENT(BlockPwrite, (ui64, DiskOffset), (ui32, Size));
- DECLARE_WILSON_EVENT(BlockPread, (ui64, DiskOffset), (ui32, Size));
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // LIBAIO
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- DECLARE_WILSON_EVENT(AsyncIoInQueue);
- DECLARE_WILSON_EVENT(AsyncIoFinished);
-
-} // NKikimr
-
-template<>
-inline void Out<NKikimrServices::TActivity::EType>(IOutputStream& os, NKikimrServices::TActivity::EType status) {
- os << NKikimrServices::TActivity::EType_Name(status);
-}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h
index f206d7ad21..451e7fa315 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy.h
@@ -13,9 +13,9 @@
#include <ydb/core/blobstorage/base/batched_vec.h>
#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/blobstorage/base/transparent.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/backpressure/queue_backpressure_client.h>
#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/group_stat.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -140,13 +140,13 @@ public:
TBlobStorageGroupRequestActor(TIntrusivePtr<TBlobStorageGroupInfo> info, TIntrusivePtr<TGroupQueues> groupQueues,
TIntrusivePtr<TBlobStorageGroupProxyMon> mon, const TActorId& source, ui64 cookie, NWilson::TTraceId traceId,
NKikimrServices::EServiceKikimr logComponent, bool logAccEnabled, TMaybe<TGroupStat::EKind> latencyQueueKind,
- TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, ui32 restartCounter)
+ TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, ui32 restartCounter, TString name)
: Info(std::move(info))
, GroupQueues(std::move(groupQueues))
, Mon(std::move(mon))
, PoolCounters(storagePoolCounters)
, LogCtx(logComponent, logAccEnabled)
- , TraceId(std::move(traceId))
+ , Span(8 /*verbosity*/, NWilson::ERelation::ChildOf, std::move(traceId), now, std::move(name))
, RestartCounter(restartCounter)
, Source(source)
, Cookie(cookie)
@@ -155,6 +155,9 @@ public:
, RacingDomains(&Info->GetTopology())
{
TDerived::ActiveCounter(Mon)->Inc();
+ Span
+ .Attribute("GroupId", Info->GroupID)
+ .Attribute("RestartCounter", RestartCounter);
}
template<typename T>
@@ -258,7 +261,7 @@ public:
Y_VERIFY_DEBUG(RestartCounter < 100);
auto q = self.RestartQuery(RestartCounter + 1);
++*Mon->NodeMon->RestartHisto[Min<size_t>(Mon->NodeMon->RestartHisto.size() - 1, RestartCounter)];
- TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, std::move(TraceId)));
+ TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, Span));
PassAway();
return true;
}
@@ -314,11 +317,11 @@ public:
}
template<typename T>
- void SendToQueue(std::unique_ptr<T> event, ui64 cookie, NWilson::TTraceId traceId, bool timeStatsEnabled = false) {
+ void SendToQueue(std::unique_ptr<T> event, ui64 cookie, bool timeStatsEnabled = false) {
if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus>) {
event->MessageRelevanceTracker = MessageRelevanceTracker;
}
- const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, std::move(traceId),
+ const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, Span,
timeStatsEnabled);
++RequestsInFlight;
}
@@ -332,13 +335,12 @@ public:
void SendToQueues(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets, bool timeStatsEnabled) {
for (auto& request : vGets) {
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetSent);
Y_VERIFY(request->Record.HasCookie());
ui64 messageCookie = request->Record.GetCookie();
CountEvent(*request);
const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000;
request->Record.MutableTimestamps()->SetSentByDSProxyUs(GetCycleCountFast() / cyclesPerUs);
- SendToQueue(std::move(request), messageCookie, TraceId.SeparateBranch(), timeStatsEnabled);
+ SendToQueue(std::move(request), messageCookie, timeStatsEnabled);
}
}
@@ -370,7 +372,6 @@ public:
template <typename TEvent>
void SendToQueues(TDeque<std::unique_ptr<TEvent>> &events, bool timeStatsEnabled) {
for (auto& request : events) {
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVPutSent);
Y_VERIFY(request->Record.HasCookie());
ui64 messageCookie = request->Record.GetCookie();
CountEvent(*request);
@@ -380,18 +381,18 @@ public:
TVDiskID vDiskId = VDiskIDFromVDiskID(request->Record.GetVDiskID());
LWTRACK(DSProxyPutVPutIsSent, request->Orbit, Info->GetFailDomainOrderNumber(vDiskId),
Info->GroupID, id.Channel(), id.PartId(), id.ToString(), id.BlobSize());
- SendToQueue(std::move(request), messageCookie, TraceId.SeparateBranch(), timeStatsEnabled);
+ SendToQueue(std::move(request), messageCookie, timeStatsEnabled);
}
}
void SendResponseAndDie(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source,
- ui64 cookie, NWilson::TTraceId traceId) {
- SendResponse(std::move(ev), timeStats, source, cookie, std::move(traceId));
+ ui64 cookie) {
+ SendResponse(std::move(ev), timeStats, source, cookie);
PassAway();
}
void SendResponseAndDie(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats = nullptr) {
- SendResponseAndDie(std::move(ev), timeStats, Source, Cookie, std::move(TraceId));
+ SendResponseAndDie(std::move(ev), timeStats, Source, Cookie);
}
TActorId GetProxyActorId() const {
@@ -406,15 +407,21 @@ public:
TActorBootstrapped<TDerived>::PassAway();
}
- void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, ui64 cookie,
- NWilson::TTraceId traceId) {
+ void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, ui64 cookie) {
const TInstant now = TActivationContext::Now();
+ NKikimrProto::EReplyStatus status;
+ TString errorReason;
+
switch (ev->Type()) {
#define XX(T) \
- case TEvBlobStorage::Ev##T##Result: \
- Mon->RespStat##T->Account(static_cast<TEvBlobStorage::TEv##T##Result&>(*ev).Status); \
- break;
+ case TEvBlobStorage::Ev##T##Result: { \
+ auto& msg = static_cast<TEvBlobStorage::TEv##T##Result&>(*ev); \
+ status = msg.Status; \
+ errorReason = msg.ErrorReason; \
+ Mon->RespStat##T->Account(status); \
+ break; \
+ }
XX(Put)
XX(Get)
@@ -450,12 +457,18 @@ public:
static_cast<TEvBlobStorage::TEvGetResult&>(*ev).Sent = now;
}
+ if (status == NKikimrProto::OK) {
+ Span.EndOk();
+ } else {
+ Span.EndError(std::move(errorReason));
+ }
+
// send the reply to original request sender
- Derived().Send(source, ev.release(), 0, cookie, std::move(traceId));
+ Derived().Send(source, ev.release(), 0, cookie, Span);
};
void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats = nullptr) {
- SendResponse(std::move(ev), timeStats, Source, Cookie, std::move(TraceId));
+ SendResponse(std::move(ev), timeStats, Source, Cookie);
}
static double GetStartTime(const NKikimrBlobStorage::TTimestamps& timestamps) {
@@ -489,7 +502,7 @@ protected:
TIntrusivePtr<TBlobStorageGroupProxyMon> Mon;
TIntrusivePtr<TStoragePoolCounters> PoolCounters;
TLogContext LogCtx;
- NWilson::TTraceId TraceId;
+ NWilson::TSpan Span;
TStackVec<std::pair<TDiskResponsivenessTracker::TDiskId, TDuration>, 16> Responsiveness;
TString ErrorReason;
TMaybe<TStoragePoolCounters::EHandleClass> RequestHandleClass;
@@ -565,7 +578,6 @@ IActor* CreateBlobStorageGroupIndexRestoreGetRequest(const TIntrusivePtr<TBlobSt
ui64 cookie, NWilson::TTraceId traceId, TMaybe<TGroupStat::EKind> latencyQueueKind,
TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
-
IActor* CreateBlobStorageGroupDiscoverRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvDiscover *ev,
@@ -584,22 +596,22 @@ IActor* CreateBlobStorageGroupMirror3of4DiscoverRequest(const TIntrusivePtr<TBlo
IActor* CreateBlobStorageGroupCollectGarbageRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev,
- ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
IActor* CreateBlobStorageGroupMultiCollectRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev,
- ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
IActor* CreateBlobStorageGroupBlockRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvBlock *ev,
- ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
IActor* CreateBlobStorageGroupStatusRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvStatus *ev,
- ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp
index dba0a2fd43..cf742ad23e 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp
@@ -88,7 +88,7 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor<TBlob
Y_FAIL("unexpected newStatus# %s", NKikimrProto::EReplyStatus_Name(newStatus).data());
}
for (const TVDiskID& vdiskId : queryStatus) {
- SendToQueue(std::make_unique<TEvBlobStorage::TEvVStatus>(vdiskId), 0, NWilson::TTraceId());
+ SendToQueue(std::make_unique<TEvBlobStorage::TEvVStatus>(vdiskId), 0);
}
for (const TVDiskID& vdiskId : resend) {
SendBlockRequest(vdiskId);
@@ -113,7 +113,7 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor<TBlob
<< " node# " << Info->GetActorId(vdiskId).NodeId());
auto msg = std::make_unique<TEvBlobStorage::TEvVBlock>(TabletId, Generation, vdiskId, Deadline, IssuerGuid);
- SendToQueue(std::move(msg), cookie, NWilson::TTraceId()); // FIXME: wilson
+ SendToQueue(std::move(msg), cookie);
}
std::unique_ptr<IEventBase> RestartQuery(ui32 counter) {
@@ -135,10 +135,11 @@ public:
TBlobStorageGroupBlockRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvBlock *ev,
- ui64 cookie, TInstant now,
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
- : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, NWilson::TTraceId(),
- NKikimrServices::BS_PROXY_BLOCK, false, {}, now, storagePoolCounters, ev->RestartCounter)
+ : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
+ NKikimrServices::BS_PROXY_BLOCK, false, {}, now, storagePoolCounters, ev->RestartCounter,
+ "DSProxy.Block")
, TabletId(ev->TabletId)
, Generation(ev->Generation)
, Deadline(ev->Deadline)
@@ -177,8 +178,8 @@ public:
IActor* CreateBlobStorageGroupBlockRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvBlock *ev,
- ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
- return new TBlobStorageGroupBlockRequest(info, state, source, mon, ev, cookie, now, storagePoolCounters);
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
+ return new TBlobStorageGroupBlockRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, storagePoolCounters);
}
} // NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
index d12c819a74..492d57bb0b 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
@@ -91,7 +91,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
Y_FAIL("unexpected newStatus# %s", NKikimrProto::EReplyStatus_Name(newStatus).data());
}
for (const TVDiskID& vdiskId : queryStatus) {
- SendToQueue(std::make_unique<TEvBlobStorage::TEvVStatus>(vdiskId), 0, NWilson::TTraceId());
+ SendToQueue(std::make_unique<TEvBlobStorage::TEvVStatus>(vdiskId), 0);
RequestsSent++;
}
for (const TVDiskID& vdiskId : resend) {
@@ -118,7 +118,7 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
const ui64 cookie = TVDiskIdShort(vdiskId).GetRaw();
auto msg = std::make_unique<TEvBlobStorage::TEvVCollectGarbage>(TabletId, RecordGeneration, PerGenerationCounter,
Channel, Collect, CollectGeneration, CollectStep, Hard, Keep.get(), DoNotKeep.get(), vdiskId, Deadline);
- SendToQueue(std::move(msg), cookie, NWilson::TTraceId()); // FIXME: wilson
+ SendToQueue(std::move(msg), cookie);
RequestsSent++;
}
@@ -142,9 +142,10 @@ public:
TBlobStorageGroupCollectGarbageRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev, ui64 cookie,
- TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
- : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, NWilson::TTraceId(),
- NKikimrServices::BS_PROXY_COLLECT, false, {}, now, storagePoolCounters, ev->RestartCounter)
+ NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
+ : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
+ NKikimrServices::BS_PROXY_COLLECT, false, {}, now, storagePoolCounters, ev->RestartCounter,
+ "DSProxy.CollectGarbage")
, TabletId(ev->TabletId)
, RecordGeneration(ev->RecordGeneration)
, PerGenerationCounter(ev->PerGenerationCounter)
@@ -204,8 +205,9 @@ public:
IActor* CreateBlobStorageGroupCollectGarbageRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev,
- ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
- return new TBlobStorageGroupCollectGarbageRequest(info, state, source, mon, ev, cookie, now, storagePoolCounters);
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
+ return new TBlobStorageGroupCollectGarbageRequest(info, state, source, mon, ev, cookie, std::move(traceId), now,
+ storagePoolCounters);
}
} // NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
index 1f810a385d..dad2ecd745 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
@@ -3,7 +3,6 @@
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
namespace NKikimr {
@@ -300,7 +299,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
const TDuration duration = TActivationContext::Now() - StartTime;
Mon->CountDiscoverResponseTime(duration);
const bool success = result->Status == NKikimrProto::OK;
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvDiscoverResultSent);
LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvDiscover, 0, duration.SecondsFloat() * 1000.0,
TabletId, Info->GroupID, TLogoBlobID::MaxChannel, "", success);
SendResponseAndDie(std::move(result));
@@ -317,7 +315,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
void Handle(TEvBlobStorage::TEvVGetBlockResult::TPtr &ev) {
ProcessReplyFromQueue(ev);
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetBlockResultReceived, MergedNode = std::move(ev->TraceId));
TotalRecieved++;
NKikimrBlobStorage::TEvVGetBlockResult &record = ev->Get()->Record;
@@ -362,8 +359,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
ProcessReplyFromQueue(ev);
CountEvent(*ev->Get());
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetResultReceived, MergedNode = std::move(ev->TraceId));
-
TotalRecieved++;
NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record;
Y_VERIFY(record.HasStatus());
@@ -381,8 +376,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
ProcessReplyFromQueue(ev);
CountEvent(*ev->Get());
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetResultReceived, MergedNode = std::move(ev->TraceId));
-
TotalRecieved++;
NKikimrBlobStorage::TEvVGetResult &record = ev->Get()->Record;
Y_VERIFY(record.HasStatus());
@@ -609,8 +602,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
getRequest->IsInternal = true;
getRequest->TabletId = TabletId;
getRequest->AcquireBlockedGeneration = true;
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetSent);
- bool isSent = SendToBSProxy(SelfId(), Info->GroupID, getRequest.release(), 0, TraceId.SeparateBranch());
+ bool isSent = SendToBSProxy(SelfId(), Info->GroupID, getRequest.release(), 0, Span);
Y_VERIFY(isSent);
TotalSent++;
@@ -722,7 +714,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
<< " msg# " << msg->ToString()
<< " cookie# " << cookie);
CountEvent(*msg);
- SendToQueue(std::move(msg), cookie, NWilson::TTraceId()); // FIXME: wilson
+ SendToQueue(std::move(msg), cookie);
TotalSent++;
curVDisk.IsMoreRequested = true;
@@ -743,8 +735,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
}
void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev) {
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetResultReceived, MergedNode = std::move(ev->TraceId));
-
TotalRecieved++;
TEvBlobStorage::TEvGetResult *msg = ev->Get();
const NKikimrProto::EReplyStatus status = msg->Status;
@@ -897,7 +887,8 @@ public:
ui64 cookie, NWilson::TTraceId traceId, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
- NKikimrServices::BS_PROXY_DISCOVER, true, {}, now, storagePoolCounters, ev->RestartCounter)
+ NKikimrServices::BS_PROXY_DISCOVER, true, {}, now, storagePoolCounters, ev->RestartCounter,
+ "DSProxy.Discover")
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, ReadBody(ev->ReadBody)
@@ -921,13 +912,10 @@ public:
<< " FromLeader# " << (FromLeader ? "true" : "false")
<< " RestartCounter# " << RestartCounter);
- const ui32 groupId = Info->GroupID;
const TLogoBlobID from = TLogoBlobID(TabletId, Max<ui32>(), Max<ui32>(), 0,
TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxChannel, TLogoBlobID::MaxPartId);
const TLogoBlobID to = TLogoBlobID(TabletId, MinGeneration, 0, 0, 0, 0, 1);
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvDiscoverReceived, GroupId = groupId, From = from, To = to);
-
for (const auto& vdisk : Info->GetVDisks()) {
auto vd = Info->GetVDiskId(vdisk.OrderNumber);
if (!IsGetBlockDone) {
@@ -937,8 +925,7 @@ public:
<< " vDiskId# " << vd
<< " cookie# " << cookie
<< " node# " << Info->GetActorId(vd).NodeId());
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetBlockSent);
- SendToQueue(std::move(getBlock), cookie, TraceId.SeparateBranch());
+ SendToQueue(std::move(getBlock), cookie);
TotalSent++;
}
@@ -953,9 +940,8 @@ public:
<< " node# " << Info->GetActorId(vd).NodeId()
<< " msg# " << msg->ToString()
<< " cookie# " << cookie);
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetSent);
CountEvent(*msg);
- SendToQueue(std::move(msg), cookie, TraceId.SeparateBranch());
+ SendToQueue(std::move(msg), cookie);
TotalSent++;
TVDiskInfo &curVDisk = VDiskInfo[TVDiskIdShort(vd).GetRaw()];
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
index 7c10b1e25a..590a92bdf0 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
@@ -462,7 +462,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie,
std::move(traceId), NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters,
- ev->RestartCounter)
+ ev->RestartCounter, "DSProxy.Discover(mirror-3-dc)")
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, StartTime(now)
@@ -505,7 +505,7 @@ public:
A_LOG_DEBUG_S("DSPDM06", "sending TEvVGetBlock# " << query->ToString());
- SendToQueue(std::move(query), 0, NWilson::TTraceId());
+ SendToQueue(std::move(query), 0);
++RequestsInFlight;
}
}
@@ -523,7 +523,7 @@ public:
A_LOG_DEBUG_S("DSPDM07", "sending TEvVGet# " << msg->ToString());
CountEvent(*msg);
- SendToQueue(std::move(msg), 0, NWilson::TTraceId());
+ SendToQueue(std::move(msg), 0);
++RequestsInFlight;
}
Msgs.clear();
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
index 22412d0081..5fdc6c30d9 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
@@ -36,7 +36,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(std::move(info), std::move(state), std::move(mon), source, cookie,
std::move(traceId), NKikimrServices::BS_PROXY_DISCOVER, false, {}, now, storagePoolCounters,
- ev->RestartCounter)
+ ev->RestartCounter, "DSProxy.Discover(mirror-3of4)")
, TabletId(ev->TabletId)
, MinGeneration(ev->MinGeneration)
, StartTime(now)
@@ -156,7 +156,7 @@ public:
const TLogoBlobID to = TLogoBlobID(TabletId, MinGeneration, 0, 0, 0, 0);
SendToQueue(TEvBlobStorage::TEvVGet::CreateRangeIndexQuery(state.VDiskId, Deadline, HandleClass,
TEvBlobStorage::TEvVGet::EFlags::None, Nothing(), state.From, to, MaxBlobsAtOnce, nullptr,
- ForceBlockedGeneration), 0, {} /*traceId*/);
+ ForceBlockedGeneration), 0);
const EDiskState prev = std::exchange(state.State, EDiskState::READ_PENDING);
Y_VERIFY(prev == EDiskState::IDLE);
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
index 9b08b41861..e4043c3675 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
@@ -3,7 +3,6 @@
#include "root_cause.h"
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <library/cpp/digest/crc32c/crc32c.h>
#include <util/generic/set.h>
@@ -231,9 +230,6 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
}
DiskCounters[orderNumber].Received++;
- // generate wilson event with merging into trunk
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetResultReceived, MergedNode = std::move(ev->TraceId));
-
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
TAutoPtr<TEvBlobStorage::TEvGetResult> getResult;
ResponsesReceived++;
@@ -351,8 +347,6 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
void HandleVPutResult(typename TPutEventResult::TPtr &ev) {
Y_VERIFY(ev->Get()->Record.HasStatus());
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVPutResultReceived, MergedNode = std::move(ev->TraceId));
-
const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000;
ev->Get()->Record.MutableTimestamps()->SetReceivedByDSProxyUs(GetCycleCountFast() / cyclesPerUs);
const auto &record = ev->Get()->Record;
@@ -451,15 +445,12 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
}
void SendReplyAndDie(TAutoPtr<TEvBlobStorage::TEvGetResult> &evResult) {
- const NKikimrProto::EReplyStatus status = evResult->Status;
const TInstant now = TActivationContext::Now();
const TDuration duration = (now > StartTime) ? (now - StartTime) : TDuration::MilliSeconds(0);
Mon->CountGetResponseTime(Info->GetDeviceType(), GetImpl.GetHandleClass(), evResult->PayloadSizeBytes(), duration);
*Mon->ActiveGetCapacity -= ReportedBytes;
ReportedBytes = 0;
bool success = evResult->Status == NKikimrProto::OK;
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetResultSent, ReplyStatus = status,
- ResponseSize = GetImpl.GetReplyBytes());
ui64 requestSize = 0;
ui64 tabletId = 0;
ui32 channel = 0;
@@ -502,7 +493,7 @@ public:
TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters, bool isVMultiPutMode)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_GET, ev->IsVerboseNoDataEnabled || ev->CollectDebugInfo,
- latencyQueueKind, now, storagePoolCounters, ev->RestartCounter)
+ latencyQueueKind, now, storagePoolCounters, ev->RestartCounter, "DSProxy.Get")
, GetImpl(info, state, ev, std::move(nodeLayout), LogCtx.RequestPrefix)
, Orbit(std::move(ev->Orbit))
, Deadline(ev->Deadline)
@@ -537,7 +528,6 @@ public:
<< " RestartCounter# " << RestartCounter);
LWTRACK(DSProxyGetBootstrap, Orbit);
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetReceived);
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
index 4908ae9bda..815ec33c28 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
@@ -5,7 +5,6 @@
#include "dsproxy_cookies.h"
#include "dsproxy_mon.h"
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <util/generic/set.h>
namespace NKikimr {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
index 25eee78a6e..324f7d1d1f 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
@@ -219,7 +219,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
if (rate) {
const ui64 num = RandomNumber<ui64>(1000000); // in range [0, 1000000)
if (num < rate) {
- ev->TraceId = NWilson::TTraceId::NewTraceId();
+ ev->TraceId = NWilson::TTraceId::NewTraceId(15, 4095);
}
}
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
index e1a634e559..1b4868cf26 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_indexrestoreget.cpp
@@ -3,7 +3,6 @@
#include "dsproxy_quorum_tracker.h"
#include "dsproxy_blob_tracker.h"
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <util/generic/set.h>
namespace NKikimr {
@@ -261,7 +260,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_INDEXRESTOREGET, false, latencyQueueKind, now, storagePoolCounters,
- ev->RestartCounter)
+ ev->RestartCounter, "DSProxy.IndexRestoreGet")
, QuerySize(ev->QuerySize)
, Queries(ev->Queries.Release())
, Deadline(ev->Deadline)
@@ -285,8 +284,6 @@ public:
}
void Bootstrap() {
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetReceived);
-
auto makeQueriesList = [this] {
TStringStream str;
str << "{";
@@ -314,7 +311,7 @@ public:
if (vget) {
const ui64 cookie = TVDiskIdShort(vd).GetRaw();
CountEvent(*vget);
- SendToQueue(std::move(vget), cookie, NWilson::TTraceId()); // FIXME: wilson
+ SendToQueue(std::move(vget), cookie);
vget.reset();
++VGetsInFlight;
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
index 54a74eba58..dfa437be49 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
@@ -93,9 +93,10 @@ public:
TBlobStorageGroupMultiCollectRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev, ui64 cookie,
- TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
- : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, NWilson::TTraceId(),
- NKikimrServices::BS_PROXY_MULTICOLLECT, false, {}, now, storagePoolCounters, 0)
+ NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
+ : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
+ NKikimrServices::BS_PROXY_MULTICOLLECT, false, {}, now, storagePoolCounters, 0,
+ "DSProxy.MultiCollect")
, TabletId(ev->TabletId)
, RecordGeneration(ev->RecordGeneration)
, PerGenerationCounter(ev->PerGenerationCounter)
@@ -154,7 +155,7 @@ public:
R_LOG_DEBUG_S("BPMC3", "SendRequest idx# " << idx
<< " isLast# " << isLast
<< " ev# " << ev->ToString());
- SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie);
+ SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span);
if (isLast) {
CollectRequestsInFlight++;
@@ -205,8 +206,8 @@ public:
IActor* CreateBlobStorageGroupMultiCollectRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvCollectGarbage *ev,
- ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
- return new TBlobStorageGroupMultiCollectRequest(info, state, source, mon, ev, cookie, now,
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
+ return new TBlobStorageGroupMultiCollectRequest(info, state, source, mon, ev, cookie, std::move(traceId), now,
storagePoolCounters);
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
index 02d7849855..dd4ab92d58 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
@@ -1,7 +1,6 @@
#include "dsproxy.h"
#include "dsproxy_mon.h"
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/vdisk/query/query_spacetracker.h>
#include <util/generic/set.h>
@@ -39,8 +38,6 @@ class TBlobStorageGroupMultiGetRequest : public TBlobStorageGroupRequestActor<TB
void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev) {
RequestsInFlight--;
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetResultReceived, MergedNode = std::move(ev->TraceId));
-
const TEvBlobStorage::TEvGetResult &res = *ev->Get();
if (res.Status != NKikimrProto::OK) {
R_LOG_ERROR_S("BPMG1", "Handle TEvGetResult status# " << NKikimrProto::EReplyStatus_Name(res.Status));
@@ -72,7 +69,6 @@ class TBlobStorageGroupMultiGetRequest : public TBlobStorageGroupRequestActor<TB
}
ev->ErrorReason = ErrorReason;
Mon->CountGetResponseTime(Info->GetDeviceType(), GetHandleClass, ev->PayloadSizeBytes(), TActivationContext::Now() - StartTime);
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, MultiGetResultSent);
Y_VERIFY(status != NKikimrProto::OK);
SendResponseAndDie(std::move(ev));
}
@@ -96,7 +92,8 @@ public:
NWilson::TTraceId traceId, TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now,
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
- NKikimrServices::BS_PROXY_MULTIGET, false, latencyQueueKind, now, storagePoolCounters, 0)
+ NKikimrServices::BS_PROXY_MULTIGET, false, latencyQueueKind, now, storagePoolCounters, 0,
+ "DSProxy.MultiGet")
, QuerySize(ev->QuerySize)
, Queries(ev->Queries.Release())
, Deadline(ev->Deadline)
@@ -125,22 +122,18 @@ public:
void SendRequests() {
for (; RequestsInFlight < MaxRequestsInFlight && !PendingGets.empty(); ++RequestsInFlight, PendingGets.pop_front()) {
auto& [ev, cookie] = PendingGets.front();
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetSent);
- SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, TraceId.SeparateBranch());
+ SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span);
}
if (!RequestsInFlight && PendingGets.empty()) {
auto ev = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, 0, Info->GroupID);
ev->ResponseSz = QuerySize;
ev->Responses = std::move(Responses);
Mon->CountGetResponseTime(Info->GetDeviceType(), GetHandleClass, ev->PayloadSizeBytes(), TActivationContext::Now() - StartTime);
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, MultiGetResultSent);
SendResponseAndDie(std::move(ev));
}
}
void Bootstrap() {
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, MultiGetReceived);
-
auto dumpQuery = [this] {
TStringStream str;
str << "{";
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
index fcced0b6f8..b246214495 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
@@ -93,7 +93,7 @@ public:
const TActorId &proxyId, bool useVPatch = false)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_PATCH, false, {}, now, storagePoolCounters,
- ev->RestartCounter)
+ ev->RestartCounter, "DSProxy.Patch")
, ProxyActorId(proxyId)
, OriginalGroupId(ev->OriginalGroupId)
, OriginalId(ev->OriginalId)
@@ -138,7 +138,6 @@ public:
void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev) {
TEvBlobStorage::TEvGetResult *result = ev->Get();
Orbit = std::move(result->Orbit);
- TraceId = std::move(ev->TraceId);
ui32 patchedIdHash = PatchedId.Hash();
bool incorrectCookie = ev->Cookie != patchedIdHash;
@@ -173,13 +172,12 @@ public:
std::unique_ptr<TEvBlobStorage::TEvPut> put = std::make_unique<TEvBlobStorage::TEvPut>(PatchedId, Buffer, Deadline,
NKikimrBlobStorage::AsyncBlob, TEvBlobStorage::TEvPut::TacticDefault);
put->Orbit = std::move(Orbit);
- Send(ProxyActorId, put.release(), 0, OriginalId.Hash(), std::move(TraceId));
+ Send(ProxyActorId, put.release(), 0, OriginalId.Hash(), Span);
}
void Handle(TEvBlobStorage::TEvPutResult::TPtr &ev) {
TEvBlobStorage::TEvPutResult *result = ev->Get();
Orbit = std::move(result->Orbit);
- TraceId = std::move(ev->TraceId);
StatusFlags = result->StatusFlags;
ApproximateFreeSpaceShare = result->ApproximateFreeSpaceShare;
@@ -224,7 +222,6 @@ public:
NKikimrBlobStorage::TEvVMovedPatchResult &record = result->Record;
PullOutStatusFlagsAndFressSpace(record);
Orbit = std::move(result->Orbit);
- TraceId = std::move(ev->TraceId);
ui64 expectedCookie = ((ui64)OriginalId.Hash() << 32) | PatchedId.Hash();
bool incorrectCookie = ev->Cookie != expectedCookie;
@@ -531,9 +528,9 @@ public:
NKikimrBlobStorage::AsyncRead);
get->Orbit = std::move(Orbit);
if (OriginalGroupId == Info->GroupID) {
- Send(ProxyActorId, get.release(), 0, PatchedId.Hash(), std::move(TraceId));
+ Send(ProxyActorId, get.release(), 0, PatchedId.Hash(), Span);
} else {
- SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), std::move(TraceId));
+ SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), Span);
}
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
index d9e0e2b320..df51c9a78e 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
@@ -5,7 +5,6 @@
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <util/generic/ymath.h>
#include <util/system/datetime.h>
@@ -142,8 +141,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
"BPP01", "received " << ev->Get()->ToString() << " from# " << VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID()));
ProcessReplyFromQueue(ev);
- // generate wilson event about request completion
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVPutResultReceived, MergedNode = std::move(ev->TraceId));
ResponsesReceived++;
const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000;
@@ -220,8 +217,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
void Handle(TEvBlobStorage::TEvVMultiPutResult::TPtr &ev) {
ProcessReplyFromQueue(ev);
- // generate wilson event about request completion
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVPutResultReceived, MergedNode = std::move(ev->TraceId));
ResponsesReceived++;
const ui64 cyclesPerUs = NHPTimer::GetCyclesPerSecond() / 1000000;
@@ -363,7 +358,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
const TDuration duration = TActivationContext::Now() - StartTime;
TLogoBlobID blobId = putResult->Id;
TLogoBlobID origBlobId = TLogoBlobID(blobId, 0);
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &ItemsInfo[blobIdx].TraceId, EvPutResultSent, ReplyStatus = status);
Mon->CountPutPesponseTime(Info->GetDeviceType(), HandleClass, ItemsInfo[blobIdx].BufferSize, duration);
*Mon->ActivePutCapacity -= ReportedBytes;
Y_VERIFY(PutImpl.GetHandoffPartsSent() <= Info->Type.TotalPartCount() * MaxHandoffNodes * ItemsInfo.size());
@@ -384,8 +378,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr);
} else {
SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr,
- ItemsInfo[blobIdx].Recipient, ItemsInfo[blobIdx].Cookie,
- std::move(ItemsInfo[blobIdx].TraceId));
+ ItemsInfo[blobIdx].Recipient, ItemsInfo[blobIdx].Cookie); // FIXME about traces
ItemsInfo[blobIdx].Replied = true;
}
}
@@ -452,7 +445,7 @@ public:
bool enableRequestMod3x3ForMinLatecy)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_PUT, false, latencyQueueKind, now, storagePoolCounters,
- ev->RestartCounter)
+ ev->RestartCounter, "DSProxy.Put")
, PutImpl(info, state, ev, mon, enableRequestMod3x3ForMinLatecy)
, WaitingVDiskResponseCount(info->GetTotalVDisksNum())
, Deadline(ev->Deadline)
@@ -497,7 +490,7 @@ public:
bool enableRequestMod3x3ForMinLatecy)
: TBlobStorageGroupRequestActor(info, state, mon, TActorId(), 0, NWilson::TTraceId(),
NKikimrServices::BS_PROXY_PUT, false, latencyQueueKind, now, storagePoolCounters,
- MaxRestartCounter(events))
+ MaxRestartCounter(events), "DSProxy.Put")
, PutImpl(info, state, events, mon, handleClass, tactic, enableRequestMod3x3ForMinLatecy)
, WaitingVDiskResponseCount(info->GetTotalVDisksNum())
, IsManyPuts(true)
@@ -564,10 +557,6 @@ public:
Timer.Reset();
- // TODO: how correct rewrite this?
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvPutReceived, Size = RequestBytes,
- LogoBlobId = ItemsInfo[0].BlobId);
-
double wilsonSec = Timer.PassedReset();
const ui32 totalParts = Info->Type.TotalPartCount();
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
index 34f675032b..b8b74c1cde 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
@@ -10,7 +10,6 @@
#include "dsproxy_strategy_put_m3dc.h"
#include "dsproxy_strategy_put_m3of4.h"
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <util/generic/set.h>
namespace NKikimr {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
index e234f8c18e..319e7e11ca 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
@@ -4,7 +4,6 @@
#include "dsproxy_blob_tracker.h"
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <library/cpp/pop_count/popcount.h>
@@ -51,7 +50,6 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
size += resp.Buffer.size();
}
Mon->CountRangeResponseTime(TActivationContext::Now() - StartTime);
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, RangeGetResultSent, ReplyStatus = reply->Status, ResponseSize = size);
SendResponseAndDie(std::move(reply));
}
@@ -65,9 +63,8 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
msg->Record.SetSuppressBarrierCheck(true);
// trace message and send it to queue
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetSent);
CountEvent(*msg);
- SendToQueue(std::move(msg), 0, TraceId.SeparateBranch());
+ SendToQueue(std::move(msg), 0);
// add pending count
++NumVGetsPending;
@@ -88,8 +85,6 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
<< " VDiskId# " << vdisk
<< " TEvVGetResult# " << ev->Get()->ToString());
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvVGetResultReceived, MergedNode = std::move(ev->TraceId));
-
Y_VERIFY(NumVGetsPending > 0);
--NumVGetsPending;
@@ -252,8 +247,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
A_LOG_DEBUG_S("DSR08", "sending TEvGet# " << get->ToString());
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetSent);
- SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0, TraceId.SeparateBranch());
+ SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0, Span);
// switch state
Become(&TThis::StateGet);
@@ -271,8 +265,6 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
}
void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev) {
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, EvGetResultReceived, MergedNode = std::move(ev->TraceId));
-
TEvBlobStorage::TEvGetResult &getResult = *ev->Get();
NKikimrProto::EReplyStatus status = getResult.Status;
if (status != NKikimrProto::OK) {
@@ -346,7 +338,7 @@ public:
TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_RANGE, false, {}, now, storagePoolCounters,
- ev->RestartCounter)
+ ev->RestartCounter, "DSProxy.Range")
, TabletId(ev->TabletId)
, From(ev->From)
, To(ev->To)
@@ -370,8 +362,6 @@ public:
<< " ForceBlockedGeneration# " << ForceBlockedGeneration
<< " RestartCounter# " << RestartCounter);
- WILSON_TRACE_FROM_ACTOR(*TlsActivationContext, *this, &TraceId, RangeGetReceived);
-
// ensure we are querying ranges for the same tablet
Y_VERIFY(TabletId == From.TabletID());
Y_VERIFY(TabletId == To.TabletID());
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
index 49986464a2..3ef82b2c92 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
@@ -144,9 +144,8 @@ namespace NKikimr {
void TBlobStorageGroupProxy::HandleNormal(TEvBlobStorage::TEvBlock::TPtr &ev) {
EnsureMonitoring(ev->Get()->IsMonitored);
Mon->EventBlock->Inc();
- const TActorId reqID = Register(
- CreateBlobStorageGroupBlockRequest(Info, Sessions->GroupQueues, ev->Sender, Mon,
- ev->Get(), ev->Cookie, TActivationContext::Now(), StoragePoolCounters));
+ const TActorId reqID = Register(CreateBlobStorageGroupBlockRequest(Info, Sessions->GroupQueues, ev->Sender, Mon,
+ ev->Get(), ev->Cookie, std::move(ev->TraceId), TActivationContext::Now(), StoragePoolCounters));
ActiveRequests.insert(reqID);
}
@@ -206,15 +205,15 @@ namespace NKikimr {
if (!ev->Get()->IsMultiCollectAllowed || ev->Get()->PerGenerationCounterStepSize() == 1) {
Mon->EventCollectGarbage->Inc();
- const TActorId reqID = Register(
- CreateBlobStorageGroupCollectGarbageRequest(Info, Sessions->GroupQueues, ev->Sender, Mon,
- ev->Get(), ev->Cookie, TActivationContext::Now(), StoragePoolCounters));
+ const TActorId reqID = Register(CreateBlobStorageGroupCollectGarbageRequest(Info, Sessions->GroupQueues,
+ ev->Sender, Mon, ev->Get(), ev->Cookie, std::move(ev->TraceId), TActivationContext::Now(),
+ StoragePoolCounters));
ActiveRequests.insert(reqID);
} else {
Mon->EventMultiCollect->Inc();
- const TActorId reqID = Register(
- CreateBlobStorageGroupMultiCollectRequest(Info, Sessions->GroupQueues, ev->Sender, Mon,
- ev->Get(), ev->Cookie, TActivationContext::Now(), StoragePoolCounters));
+ const TActorId reqID = Register(CreateBlobStorageGroupMultiCollectRequest(Info, Sessions->GroupQueues,
+ ev->Sender, Mon, ev->Get(), ev->Cookie, std::move(ev->TraceId), TActivationContext::Now(),
+ StoragePoolCounters));
ActiveRequests.insert(reqID);
}
}
@@ -227,9 +226,8 @@ namespace NKikimr {
}
EnsureMonitoring(true);
Mon->EventStatus->Inc();
- const TActorId reqID = Register(
- CreateBlobStorageGroupStatusRequest(Info, Sessions->GroupQueues, ev->Sender, Mon,
- ev->Get(), ev->Cookie, TActivationContext::Now(), StoragePoolCounters));
+ const TActorId reqID = Register(CreateBlobStorageGroupStatusRequest(Info, Sessions->GroupQueues, ev->Sender, Mon,
+ ev->Get(), ev->Cookie, std::move(ev->TraceId), TActivationContext::Now(), StoragePoolCounters));
ActiveRequests.insert(reqID);
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp
index 2b4ec35fd5..979cd28767 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_status.cpp
@@ -79,10 +79,10 @@ public:
TBlobStorageGroupStatusRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvStatus *ev,
- ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
- : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, NWilson::TTraceId(),
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters)
+ : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_STATUS, false, {}, now, storagePoolCounters,
- ev->RestartCounter)
+ ev->RestartCounter, "DSProxy.Status")
, Deadline(ev->Deadline)
, Requests(0)
, Responses(0)
@@ -105,7 +105,7 @@ public:
<< " node# " << Info->GetActorId(vd).NodeId());
auto msg = std::make_unique<TEvBlobStorage::TEvVStatus>(vd);
- SendToQueue(std::move(msg), cookie, NWilson::TTraceId()); // FIXME: wilson
+ SendToQueue(std::move(msg), cookie);
++Requests;
}
@@ -129,8 +129,8 @@ public:
IActor* CreateBlobStorageGroupStatusRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvStatus *ev,
- ui64 cookie, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
- return new TBlobStorageGroupStatusRequest(info, state, source, mon, ev, cookie, now, storagePoolCounters);
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters) {
+ return new TBlobStorageGroupStatusRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, storagePoolCounters);
}
} // NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/root_cause.h b/ydb/core/blobstorage/dsproxy/root_cause.h
index 5126190cc8..0a1049e8ee 100644
--- a/ydb/core/blobstorage/dsproxy/root_cause.h
+++ b/ydb/core/blobstorage/dsproxy/root_cause.h
@@ -3,7 +3,6 @@
#include "defs.h"
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/base/appdata.h>
namespace NKikimr {
diff --git a/ydb/core/blobstorage/pdisk/CMakeLists.txt b/ydb/core/blobstorage/pdisk/CMakeLists.txt
index 2ac16b5fcd..80196bdbd4 100644
--- a/ydb/core/blobstorage/pdisk/CMakeLists.txt
+++ b/ydb/core/blobstorage/pdisk/CMakeLists.txt
@@ -15,6 +15,7 @@ target_link_libraries(core-blobstorage-pdisk PUBLIC
cpp-actors-core
cpp-actors-protos
cpp-actors-util
+ cpp-actors-wilson
cpp-containers-stack_vector
library-cpp-lwtrace
monlib-dynamic_counters-percentile
@@ -32,7 +33,6 @@ target_link_libraries(core-blobstorage-pdisk PUBLIC
ydb-library-schlab
library-schlab-mon
library-schlab-schine
- ydb-library-wilson
tools-enum_parser-enum_serialization_runtime
)
target_sources(core-blobstorage-pdisk PRIVATE
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
index d92b19c6cc..5ff29d8685 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp
@@ -16,7 +16,6 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/counters.h>
#include <ydb/core/blobstorage/base/html.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/base/blobstorage_events.h>
#include <ydb/core/blobstorage/crypto/secured_block.h>
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h
index 650b2e6fc5..7469ec76d8 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice.h
@@ -10,7 +10,6 @@
#include <ydb/library/pdisk_io/aio.h>
#include <ydb/library/pdisk_io/drivedata.h>
#include <ydb/library/pdisk_io/sector_map.h>
-#include <ydb/library/wilson/wilson_event.h>
namespace NActors {
class TActorSystem;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
index a86f62bef1..d80dae0868 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_async.cpp
@@ -9,7 +9,6 @@
#include "blobstorage_pdisk_util_idlecounter.h"
#include <ydb/core/base/appdata.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
#include <ydb/core/protos/services.pb.h>
#include <ydb/core/util/cache.h>
@@ -943,9 +942,6 @@ protected:
REQUEST_VALGRIND_CHECK_MEM_IS_ADDRESSABLE(data, size);
}
- if (ActorSystem) {
- WILSON_TRACE(*ActorSystem, traceId, BlockPread, DiskOffset = offset, Size = size);
- }
IAsyncIoOperation* op = IoContext->CreateAsyncIoOperation(completionAction, reqId, traceId);
IoContext->PreparePRead(op, data, size, offset);
Submit(op);
@@ -963,9 +959,6 @@ protected:
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(data, size);
}
- if (ActorSystem) {
- WILSON_TRACE(*ActorSystem, traceId, BlockPwrite, DiskOffset = offset, Size = size);
- }
IAsyncIoOperation* op = IoContext->CreateAsyncIoOperation(completionAction, reqId, traceId);
IoContext->PreparePWrite(op, const_cast<void*>(data), size, offset);
Submit(op);
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h
index 148856c3d5..393c7f021c 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion.h
@@ -1,11 +1,11 @@
#pragma once
#include <ydb/library/pdisk_io/aio.h>
-#include <ydb/library/wilson/wilson_event.h>
#include <util/system/hp_timer.h>
#include <util/generic/string.h>
#include <library/cpp/lwtrace/shuttle.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
namespace NKikimr::NPDisk {
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
index d00fc0f220..0707ce828f 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
@@ -973,7 +973,6 @@ TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr<TChunkRead> &
ui64 readOffset = Format.Offset(read->ChunkIdx, read->FirstSector, currentSectorOffset);
// TODO: Get this from the drive
- WILSON_TRACE(*ActorSystem, &read->TraceId, AsyncReadScheduled, DiskOffset = readOffset, Size = bytesToRead);
THolder<TCompletionChunkReadPart> completion(new TCompletionChunkReadPart(this, read, bytesToRead,
payloadBytesToRead, payloadOffset, read->FinalCompletion, isTheLastPart, Cfg->UseT1ha0HashInFooter));
completion->CostNs = DriveModel.TimeForSizeNs(bytesToRead, read->ChunkIdx, TDriveModel::OP_TYPE_READ);
@@ -2297,7 +2296,6 @@ void TPDisk::PrepareLogError(TLogWrite *logWrite, TStringStream& err, NKikimrPro
logWrite->Result.Reset(new NPDisk::TEvLogResult(status,
GetStatusFlags(logWrite->Owner, logWrite->OwnerGroupType), err.Str()));
logWrite->Result->Results.push_back(NPDisk::TEvLogResult::TRecord(logWrite->Lsn, logWrite->Cookie));
- WILSON_TRACE(*ActorSystem, &logWrite->TraceId, EnqueueLogWrite);
}
NKikimrProto::EReplyStatus TPDisk::CheckOwnerAndRound(TRequestBase* req, TStringStream& err) {
@@ -2533,7 +2531,6 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
log->SetOwnerGroupType(ownerData.IsStaticGroupOwner());
ownerData.SetLastSeenLsn(log->Lsn);
ownerData.WriteThroughput.Increment(log->Data.size(), ActorSystem->Timestamp());
- WILSON_TRACE(*ActorSystem, &log->TraceId, EnqueueLogWrite);
break;
}
case ERequestType::RequestYardInit:
@@ -2832,7 +2829,6 @@ void TPDisk::RouteRequest(TRequestBase *request) {
if (log->Signature.HasCommitRecord()) {
JointCommits.push_back(log);
}
- WILSON_TRACE(*ActorSystem, &log->TraceId, RouteLogWrite);
log = batch;
}
break;
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
index a9b910cb0c..c8487df8a6 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
@@ -21,7 +21,6 @@
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/control/immediate_control_board_wrapper.h>
#include <ydb/library/schlab/schine/scheduler.h>
#include <ydb/library/schlab/schine/job_kind.h>
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h
index af5402a4f8..737ad571c7 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h
@@ -2,7 +2,6 @@
#include "defs.h"
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include "blobstorage_pdisk.h"
#include "blobstorage_pdisk_gate.h"
@@ -217,7 +216,6 @@ public:
if (ev.Data.size() > (1 << 20)) {
Mon->WriteHugeLog.CountRequest();
}
- // WILSON_TRACE(*ActorSystem, &traceId, EvLogReceived); // TODO
return NewRequest(new TLogWrite(ev, sender, AtomicGet(*EstimatedLogChunkIdx), reqId, std::move(traceId)), &burstMs);
}
@@ -229,7 +227,6 @@ public:
Mon->QueueRequests->Inc();
*Mon->QueueBytes += ev.Size;
Mon->GetReadCounter(ev.PriorityClass)->CountRequest(ev.Size);
- WILSON_TRACE(*ActorSystem, &traceId, EvChunkReadReceived, ChunkIdx = ev.ChunkIdx, Offset = ev.Offset, Size = ev.Size);
auto read = new TChunkRead(ev, sender, reqId, std::move(traceId));
read->SelfPointer = read;
return NewRequest(read, &burstMs);
@@ -245,7 +242,6 @@ public:
ev.Validate();
*Mon->QueueBytes += size;
Mon->GetWriteCounter(ev.PriorityClass)->CountRequest(size);
- WILSON_TRACE(*ActorSystem, &traceId, EvChunkWriteReceived, ChunkIdx = ev.ChunkIdx, Offset = ev.Offset, Size = size);
return NewRequest(new TChunkWrite(ev, sender, reqId, std::move(traceId)), &burstMs);
}
};
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.cpp
index e6e279c4a5..d7731c4e8b 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.cpp
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_tools.cpp
@@ -14,7 +14,6 @@
#include "blobstorage_pdisk_util_countedqueueoneone.h"
#include "blobstorage_pdisk_writer.h"
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <ydb/core/util/queue_oneone_inplace.h>
diff --git a/ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp b/ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp
index 4d770a4037..f26ac7bc31 100644
--- a/ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp
+++ b/ydb/core/blobstorage/vdisk/common/blobstorage_status.cpp
@@ -36,7 +36,7 @@ namespace NKikimr {
SetRacingGroupInfo(record, Result->Record, GroupInfo);
LOG_DEBUG(ctx, BS_VDISK_OTHER, VDISKP(VCtx->VDiskLogPrefix, "TEvVStatusResult Request# {%s} Response# {%s}",
SingleLineProto(record).data(), SingleLineProto(Result->Record).data()));
- SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, Ev->Cookie, Ev->GetChannel());
+ SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, Ev->GetChannel());
Die(ctx);
return;
}
@@ -71,7 +71,7 @@ namespace NKikimr {
ctx.Send(NotifyId, new TEvents::TEvActorDied());
LOG_DEBUG(ctx, BS_VDISK_GET,
VDISKP(VCtx->VDiskLogPrefix, "TEvVStatusResult"));
- SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, Ev->Cookie, Ev->GetChannel());
+ SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie, Ev->GetChannel());
Die(ctx);
}
}
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
index 384931d305..6555ca4631 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
@@ -1,22 +1,19 @@
#include "vdisk_response.h"
#include "vdisk_events.h"
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/base/interconnect_channels.h>
#include <util/system/datetime.h>
namespace NKikimr {
-void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, const IActor& actor,
- ui64 cookie) {
+void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie) {
ui32 channel = TInterconnectChannels::IC_BLOBSTORAGE;
if (TEvVResultBase *base = dynamic_cast<TEvVResultBase *>(ev)) {
channel = base->GetChannelToSend();
}
- SendVDiskResponse(ctx, recipient, ev, actor, cookie, channel);
+ SendVDiskResponse(ctx, recipient, ev, cookie, channel);
}
-void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, const IActor& actor,
- ui64 cookie, ui32 channel) {
+void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel) {
NWilson::TTraceId traceId;
switch (const ui32 type = ev->Type()) {
@@ -24,7 +21,6 @@ void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEve
case TEvBlobStorage::T::EventType: { \
TEvBlobStorage::T *event = static_cast<TEvBlobStorage::T *>(ev); \
traceId = std::move(event->TraceId); \
- WILSON_TRACE_FROM_ACTOR(ctx, actor, &traceId, EV); \
const double usPerCycle = 1000000.0 / NHPTimer::GetCyclesPerSecond(); \
event->Record.MutableTimestamps()->SetSentByVDiskUs(GetCycleCountFast() * usPerCycle); \
break; \
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_response.h b/ydb/core/blobstorage/vdisk/common/vdisk_response.h
index 1ac0452b8e..6ecc700ac7 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_response.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_response.h
@@ -3,10 +3,8 @@
namespace NKikimr {
-void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, const IActor& actor,
- ui64 cookie);
+void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie);
-void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, const IActor& actor,
- ui64 cookie, ui32 channel);
+void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel);
}//NKikimr
diff --git a/ydb/core/blobstorage/vdisk/query/query_barrier.cpp b/ydb/core/blobstorage/vdisk/query/query_barrier.cpp
index c469bb8f40..dc63806674 100644
--- a/ydb/core/blobstorage/vdisk/query/query_barrier.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_barrier.cpp
@@ -33,7 +33,7 @@ namespace NKikimr {
LOG_DEBUG(ctx, BS_VDISK_GC,
VDISKP(HullCtx->VCtx->VDiskLogPrefix,
"TEvVGetBarrierResult: %s", Result->ToString().data()));
- SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, Ev->Cookie);
+ SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie);
ctx.Send(ParentId, new TEvents::TEvActorDied);
Die(ctx);
}
diff --git a/ydb/core/blobstorage/vdisk/query/query_base.h b/ydb/core/blobstorage/vdisk/query/query_base.h
index 092b93964f..54391bea8d 100644
--- a/ydb/core/blobstorage/vdisk/query/query_base.h
+++ b/ydb/core/blobstorage/vdisk/query/query_base.h
@@ -5,7 +5,6 @@
#include "query_spacetracker.h"
#include <ydb/core/blobstorage/vdisk/hulldb/hull_ds_all_snap.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_response.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
namespace NKikimr {
@@ -110,7 +109,7 @@ namespace NKikimr {
ctx.Send(ReplSchedulerId, new TEvBlobStorage::TEvEnrichNotYet(BatcherCtx->OrigEv, std::move(Result)));
} else {
// send reply event to sender
- SendVDiskResponse(ctx, BatcherCtx->OrigEv->Sender, Result.release(), *self, BatcherCtx->OrigEv->Cookie);
+ SendVDiskResponse(ctx, BatcherCtx->OrigEv->Sender, Result.release(), BatcherCtx->OrigEv->Cookie);
}
ctx.Send(ParentId, new TEvents::TEvActorDied);
diff --git a/ydb/core/blobstorage/vdisk/query/query_dumpdb.h b/ydb/core/blobstorage/vdisk/query/query_dumpdb.h
index bba8a7fbdc..620903bfcc 100644
--- a/ydb/core/blobstorage/vdisk/query/query_dumpdb.h
+++ b/ydb/core/blobstorage/vdisk/query/query_dumpdb.h
@@ -47,7 +47,7 @@ namespace NKikimr {
// send result
Result->SetResult(str.Str());
- SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, Ev->Cookie);
+ SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie);
TThis::Die(ctx);
}
diff --git a/ydb/core/blobstorage/vdisk/query/query_extr.cpp b/ydb/core/blobstorage/vdisk/query/query_extr.cpp
index 196b6d2054..6315bd47f2 100644
--- a/ydb/core/blobstorage/vdisk/query/query_extr.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_extr.cpp
@@ -305,7 +305,6 @@ namespace NKikimr {
}
void HandleReadCompletion(TEvents::TEvCompleted::TPtr& ev, const TActorContext &ctx) {
- WILSON_TRACE_FROM_ACTOR(ctx, *this, &Result->TraceId, ReadBatcherFinish, MergedNode = std::move(ev->TraceId));
ActiveActors.Erase(ev->Sender);
Finish(ctx);
}
@@ -346,9 +345,8 @@ namespace NKikimr {
SendResponseAndDie(ctx, this);
} else {
ui8 priority = PDiskPriority();
- WILSON_TRACE_FROM_ACTOR(ctx, *this, &Result->TraceId, ReadBatcherStart);
- std::unique_ptr<IActor> a(Batcher.CreateAsyncDataReader(ctx.SelfID, priority, Result->TraceId.SeparateBranch(),
- IsRepl()));
+ std::unique_ptr<IActor> a(Batcher.CreateAsyncDataReader(ctx.SelfID, priority, std::move(Result->TraceId),
+ IsRepl(), TActivationContext::Now()));
if (a) {
auto aid = ctx.Register(a.release());
ActiveActors.Insert(aid);
diff --git a/ydb/core/blobstorage/vdisk/query/query_public.cpp b/ydb/core/blobstorage/vdisk/query/query_public.cpp
index 25761b685e..db91316a88 100644
--- a/ydb/core/blobstorage/vdisk/query/query_public.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_public.cpp
@@ -152,14 +152,13 @@ namespace NKikimr {
const TVDiskContextPtr &vctx,
const TActorContext &ctx,
TEvBlobStorage::TEvVDbStat::TPtr &ev,
- std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result,
- const IActor& actor)
+ std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result)
{
result->SetError();
LOG_DEBUG(ctx, NKikimrServices::BS_VDISK_OTHER,
VDISKP(vctx->VDiskLogPrefix,
"TEvVDbStatResult: %s", result->ToString().data()));
- SendVDiskResponse(ctx, ev->Sender, result.release(), actor, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie);
}
template <class TKey, class TMemRec>
@@ -169,8 +168,7 @@ namespace NKikimr {
TLevelIndexSnapshot<TKey, TMemRec> &&levelSnap,
const TActorId &parentId,
TEvBlobStorage::TEvVDbStat::TPtr &ev,
- std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result,
- const IActor& actor)
+ std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result)
{
const NKikimrBlobStorage::TEvVDbStat &record = ev->Get()->Record;
switch (record.GetAction()) {
@@ -183,7 +181,7 @@ namespace NKikimr {
return new TStatActor(hullCtx, parentId, std::move(levelSnap), ev, std::move(result));
}
default: {
- DbStatError(hullCtx->VCtx, ctx, ev, std::move(result), actor);
+ DbStatError(hullCtx->VCtx, ctx, ev, std::move(result));
return nullptr;
}
}
@@ -201,29 +199,27 @@ namespace NKikimr {
THullDsSnap &&fullSnap,
const TActorId &parentId,
TEvBlobStorage::TEvVDbStat::TPtr &ev,
- std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result,
- const IActor& actor) {
+ std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result) {
const NKikimrBlobStorage::TEvVDbStat &record = ev->Get()->Record;
switch (record.GetType()) {
- case NKikimrBlobStorage::StatLogoBlobs: {
- return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.LogoBlobsSnap), parentId, ev, std::move(result), actor);
- }
- case NKikimrBlobStorage::StatBlocks: {
- return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.BlocksSnap), parentId, ev, std::move(result), actor);
- }
- case NKikimrBlobStorage::StatBarriers: {
- return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.BarriersSnap), parentId, ev, std::move(result), actor);
- }
- case NKikimrBlobStorage::StatTabletType: {
+ case NKikimrBlobStorage::StatLogoBlobs:
+ return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.LogoBlobsSnap), parentId, ev, std::move(result));
+
+ case NKikimrBlobStorage::StatBlocks:
+ return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.BlocksSnap), parentId, ev, std::move(result));
+
+ case NKikimrBlobStorage::StatBarriers:
+ return RunDbStatAction(hullCtx, ctx, std::move(fullSnap.BarriersSnap), parentId, ev, std::move(result));
+
+ case NKikimrBlobStorage::StatTabletType:
return CreateTabletStatActor(hullCtx, parentId, std::move(fullSnap), ev, std::move(result));
- }
- case NKikimrBlobStorage::StatHugeType: {
+
+ case NKikimrBlobStorage::StatHugeType:
return CreateHugeStatActor(hullCtx, hugeBlobCtx, parentId, std::move(fullSnap), ev, std::move(result));
- }
- default: {
- DbStatError(hullCtx->VCtx, ctx, ev, std::move(result), actor);
+
+ default:
+ DbStatError(hullCtx->VCtx, ctx, ev, std::move(result));
return nullptr;
- }
}
}
diff --git a/ydb/core/blobstorage/vdisk/query/query_public.h b/ydb/core/blobstorage/vdisk/query/query_public.h
index 38e498ad71..6367ef107e 100644
--- a/ydb/core/blobstorage/vdisk/query/query_public.h
+++ b/ydb/core/blobstorage/vdisk/query/query_public.h
@@ -83,8 +83,7 @@ namespace NKikimr {
THullDsSnap &&fullSnap,
const TActorId &parentId,
TEvBlobStorage::TEvVDbStat::TPtr &ev,
- std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result,
- const IActor& actor);
+ std::unique_ptr<TEvBlobStorage::TEvVDbStatResult> result);
IActor *CreateMonStreamActor(THullDsSnap&& fullSnap, TEvBlobStorage::TEvMonStreamQuery::TPtr& ev);
diff --git a/ydb/core/blobstorage/vdisk/query/query_readactor.cpp b/ydb/core/blobstorage/vdisk/query/query_readactor.cpp
index dfe5cff3c0..9a27de1b3f 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readactor.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_readactor.cpp
@@ -1,6 +1,6 @@
#include "query_readbatch.h"
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/base/vdisk_priorities.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <util/generic/algorithm.h>
using namespace NKikimrServices;
@@ -17,7 +17,7 @@ namespace NKikimr {
const TActorId NotifyID;
std::shared_ptr<TReadBatcherResult> Result;
const ui8 Priority;
- NWilson::TTraceId TraceId;
+ NWilson::TSpan Span;
const bool IsRepl;
ui32 Counter = 0;
@@ -37,10 +37,6 @@ namespace NKikimr {
// cookie for this request
void *cookie = &*it;
- // generate wilson event with query details
- WILSON_TRACE_FROM_ACTOR(ctx, *this, &TraceId, EvChunkReadSent, ChunkIdx = it->Part.ChunkIdx,
- Offset = it->Part.Offset, Size = it->Part.Size, YardCookie = cookie);
-
// create request
std::unique_ptr<NPDisk::TEvChunkRead> msg(new NPDisk::TEvChunkRead(Ctx->PDiskCtx->Dsk->Owner,
Ctx->PDiskCtx->Dsk->OwnerRound, it->Part.ChunkIdx, it->Part.Offset, it->Part.Size,
@@ -51,7 +47,7 @@ namespace NKikimr {
// send request
TReplQuoter::QuoteMessage(quoter, std::make_unique<IEventHandle>(Ctx->PDiskCtx->PDiskId, SelfId(),
- msg.release(), 0, 0, nullptr, TraceId.SeparateBranch()), it->Part.Size);
+ msg.release(), 0, 0, nullptr, Span), it->Part.Size);
Counter++;
}
@@ -62,7 +58,7 @@ namespace NKikimr {
VDISKP(Ctx->VCtx->VDiskLogPrefix, "GLUEREAD FINISHED(%p): actualReadN# %" PRIu32
" origReadN# %" PRIu32, this, ui32(Result->GlueReads.size()),
ui32(Result->DiskDataItemPtrs.size())));
- ctx.Send(NotifyID, new TEvents::TEvCompleted(), 0, 0, std::move(TraceId));
+ ctx.Send(NotifyID, new TEvents::TEvCompleted);
Die(ctx);
}
@@ -87,8 +83,6 @@ namespace NKikimr {
}
NPDisk::TEvChunkReadResult *msg = ev->Get();
- WILSON_TRACE_FROM_ACTOR(ctx, *this, &TraceId, EvChunkReadResultReceived, YardCookie = msg->Cookie,
- MergedNode = std::move(ev->TraceId));
TGlueRead *glueRead = static_cast<TGlueRead *>(msg->Cookie);
glueRead->Data = std::move(msg->Data);
@@ -122,13 +116,14 @@ namespace NKikimr {
std::shared_ptr<TReadBatcherResult> result,
ui8 priority,
NWilson::TTraceId traceId,
- bool isRepl)
+ bool isRepl,
+ TInstant now)
: TActorBootstrapped<TTReadBatcherActor>()
, Ctx(ctx)
, NotifyID(notifyID)
, Result(std::move(result))
, Priority(priority)
- , TraceId(std::move(traceId))
+ , Span(12, NWilson::ERelation::ChildOf, std::move(traceId), now, "VDisk.TReadBatcherActor")
, IsRepl(isRepl)
{}
};
@@ -139,9 +134,10 @@ namespace NKikimr {
std::shared_ptr<TReadBatcherResult> result,
ui8 priority,
NWilson::TTraceId traceId,
- bool isRepl)
+ bool isRepl,
+ TInstant now)
{
- return new TTReadBatcherActor(ctx, notifyID, result, priority, std::move(traceId), isRepl);
+ return new TTReadBatcherActor(ctx, notifyID, result, priority, std::move(traceId), isRepl, now);
}
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/query/query_readactor.h b/ydb/core/blobstorage/vdisk/query/query_readactor.h
index 614625ee8b..daee28aaf6 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readactor.h
+++ b/ydb/core/blobstorage/vdisk/query/query_readactor.h
@@ -11,7 +11,8 @@ namespace NKikimr {
std::shared_ptr<TReadBatcherResult> result,
ui8 priority,
NWilson::TTraceId traceId,
- bool isRepl);
+ bool isRepl,
+ TInstant now);
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
index a883191f15..566af60d43 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
@@ -1,6 +1,5 @@
#include "query_readbatch.h"
#include "query_readactor.h"
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/base/vdisk_priorities.h>
#include <util/generic/algorithm.h>
@@ -218,10 +217,8 @@ namespace NKikimr {
return true;
}
- IActor *TReadBatcher::CreateAsyncDataReader(const TActorId &notifyID,
- ui8 priority,
- NWilson::TTraceId traceId,
- bool isRepl) {
+ IActor *TReadBatcher::CreateAsyncDataReader(const TActorId &notifyID, ui8 priority, NWilson::TTraceId traceId,
+ bool isRepl, TInstant now) {
if (Result->DiskDataItemPtrs.empty())
return nullptr;
else {
@@ -242,7 +239,7 @@ namespace NKikimr {
PDiskReadBytes += size;
}
// start reader
- return CreateReadBatcherActor(Ctx, notifyID, Result, priority, std::move(traceId), isRepl);
+ return CreateReadBatcherActor(Ctx, notifyID, Result, priority, std::move(traceId), isRepl, now);
}
}
diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.h b/ydb/core/blobstorage/vdisk/query/query_readbatch.h
index b0592a9a20..fba173b809 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readbatch.h
+++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.h
@@ -329,7 +329,8 @@ namespace NKikimr {
// creates an actor for efficient async data reads, returns nullptr
// if no read required
- IActor *CreateAsyncDataReader(const TActorId &notifyID, ui8 priority, NWilson::TTraceId traceId, bool isRepl);
+ IActor *CreateAsyncDataReader(const TActorId &notifyID, ui8 priority, NWilson::TTraceId traceId, bool isRepl,
+ TInstant now);
const TReadBatcherResult &GetResult() const { return *Result; }
ui64 GetPDiskReadBytes() const { return PDiskReadBytes; }
diff --git a/ydb/core/blobstorage/vdisk/query/query_statdb.h b/ydb/core/blobstorage/vdisk/query/query_statdb.h
index 67aa2aeec5..b8a57ebd89 100644
--- a/ydb/core/blobstorage/vdisk/query/query_statdb.h
+++ b/ydb/core/blobstorage/vdisk/query/query_statdb.h
@@ -28,7 +28,7 @@ namespace NKikimr {
const bool prettyPrint = Ev->Get()->Record.GetPrettyPrint();
CalculateStat(str, prettyPrint);
Result->SetResult(str.Str());
- SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, Ev->Cookie);
+ SendVDiskResponse(ctx, Ev->Sender, Result.release(), Ev->Cookie);
ctx.Send(ParentId, new TEvents::TEvActorDied);
TThis::Die(ctx);
}
diff --git a/ydb/core/blobstorage/vdisk/query/query_stathuge.cpp b/ydb/core/blobstorage/vdisk/query/query_stathuge.cpp
index e4f14dba80..08c30752ab 100644
--- a/ydb/core/blobstorage/vdisk/query/query_stathuge.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_stathuge.cpp
@@ -22,7 +22,7 @@ namespace NKikimr {
CalculateUsedHugeChunks(str, prettyPrint);
Result->SetResult(str.Str());
- SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, 0);
+ SendVDiskResponse(ctx, Ev->Sender, Result.release(), 0);
ctx.Send(ParentId, new TEvents::TEvActorDied);
TThis::Die(ctx);
}
diff --git a/ydb/core/blobstorage/vdisk/query/query_stattablet.cpp b/ydb/core/blobstorage/vdisk/query/query_stattablet.cpp
index 16e25fb1a6..5e4817b059 100644
--- a/ydb/core/blobstorage/vdisk/query/query_stattablet.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_stattablet.cpp
@@ -24,7 +24,7 @@ namespace NKikimr {
ProcessLogoBlobs(str, tabletId, prettyPrint);
Result->SetResult(str.Str());
- SendVDiskResponse(ctx, Ev->Sender, Result.release(), *this, 0);
+ SendVDiskResponse(ctx, Ev->Sender, Result.release(), 0);
ctx.Send(ParentId, new TEvents::TEvActorDied);
TThis::Die(ctx);
}
diff --git a/ydb/core/blobstorage/vdisk/repl/query_donor.h b/ydb/core/blobstorage/vdisk/repl/query_donor.h
index 7479d607c9..aa8ead2005 100644
--- a/ydb/core/blobstorage/vdisk/repl/query_donor.h
+++ b/ydb/core/blobstorage/vdisk/repl/query_donor.h
@@ -91,7 +91,7 @@ namespace NKikimr {
void PassAway() override {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::BS_VDISK_GET, SelfId() << " finished query");
Send(ParentId, new TEvents::TEvActorDied);
- SendVDiskResponse(TActivationContext::AsActorContext(), Sender, Result.release(), *this, Cookie);
+ SendVDiskResponse(TActivationContext::AsActorContext(), Sender, Result.release(), Cookie);
TActorBootstrapped::PassAway();
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index 728a196965..54b755bf77 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -15,7 +15,6 @@
#include "skeleton_events.h"
#include "skeleton_capturevdisklayout.h"
#include "skeleton_compactionstate.h"
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_iter.h>
#include <ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.h>
#include <ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h>
@@ -139,7 +138,7 @@ namespace NKikimr {
template <class TOrigEv>
void SendReply(const TActorContext &ctx, std::unique_ptr<IEventBase> result, TOrigEv &orig, EServiceKikimr logService) {
Y_UNUSED(logService);
- SendVDiskResponse(ctx, orig->Sender, result.release(), *this, orig->Cookie);
+ SendVDiskResponse(ctx, orig->Sender, result.release(), orig->Cookie);
}
////////////////////////////////////////////////////////////////////////
@@ -147,7 +146,7 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////
void Handle(TEvBlobStorage::TEvVMovedPatch::TPtr &ev, const TActorContext &ctx) {
- const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this);
+ const bool postpone = OverloadHandler->PostponeEvent(ev);
if (!postpone) {
PrivateHandle(ev, ctx);
}
@@ -187,7 +186,7 @@ namespace NKikimr {
}
void Handle(TEvBlobStorage::TEvVPatchStart::TPtr &ev, const TActorContext &ctx) {
- const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this);
+ const bool postpone = OverloadHandler->PostponeEvent(ev);
if (!postpone) {
PrivateHandle(ev, ctx);
}
@@ -267,7 +266,7 @@ namespace NKikimr {
}
void Handle(TEvBlobStorage::TEvVMultiPut::TPtr &ev, const TActorContext &ctx) {
- const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this);
+ const bool postpone = OverloadHandler->PostponeEvent(ev);
if (!postpone) {
PrivateHandle(ev, ctx);
}
@@ -348,13 +347,11 @@ namespace NKikimr {
return logMsg;
}
- std::unique_ptr<TEvHullWriteHugeBlob> CreateHullWriteHugeBlob(const TActorContext &ctx, NActors::TActorId sender,
- ui64 cookie, NWilson::TTraceId &traceId, bool ignoreBlock,
- NKikimrBlobStorage::EPutHandleClass handleClass, TVPutInfo &info,
+ std::unique_ptr<TEvHullWriteHugeBlob> CreateHullWriteHugeBlob(NActors::TActorId sender,
+ ui64 cookie, bool ignoreBlock, NKikimrBlobStorage::EPutHandleClass handleClass, TVPutInfo &info,
std::unique_ptr<TEvBlobStorage::TEvVPutResult> res)
{
Y_VERIFY_DEBUG(info.HullStatus.Status == NKikimrProto::OK);
- WILSON_TRACE_FROM_ACTOR(ctx, *this, &traceId, EvHullWriteHugeBlobSent);
info.Buffer = TDiskBlob::Create(info.BlobId.BlobSize(), info.BlobId.PartId(), Db->GType.TotalPartCount(),
std::move(info.Buffer), *Arena);
UpdatePDiskWriteBytes(info.Buffer.GetSize());
@@ -417,8 +414,6 @@ namespace NKikimr {
}
void PrivateHandle(TEvBlobStorage::TEvVMultiPut::TPtr &ev, const TActorContext &ctx) {
- WILSON_TRACE_FROM_ACTOR(ctx, *this, &ev->TraceId, EvVPutReceived, VDiskId = SelfVDiskId,
- PDiskId = Config->BaseInfo.PDiskId, VDiskSlotId = Config->BaseInfo.VDiskSlotId);
IFaceMonGroup->MultiPutMsgs()++;
IFaceMonGroup->PutTotalBytes() += ev->GetSize();
@@ -522,6 +517,7 @@ namespace NKikimr {
std::unique_ptr<NPDisk::TEvMultiLog> evLogs = std::make_unique<NPDisk::TEvMultiLog>();
ui64 cookie = ev->Cookie;
+ const NWilson::TTraceId traceId(ev->TraceId);
IActor* vMultiPutActor = CreateSkeletonVMultiPutActor(SelfId(), statuses, oosStatus, ev,
SkeletonFrontIDPtr, IFaceMonGroup->MultiPutResMsgsPtr(), Db->GetVDiskIncarnationGuid());
@@ -553,7 +549,6 @@ namespace NKikimr {
if (info.IsHugeBlob) {
// pass the work to huge blob writer
- NWilson::TTraceId traceId;
TInstant deadline = (record.HasMsgQoS() && record.GetMsgQoS().HasDeadlineSeconds()) ?
TInstant::Seconds(record.GetMsgQoS().GetDeadlineSeconds()) :
TInstant::Max();
@@ -565,9 +560,9 @@ namespace NKikimr {
VCtx->Histograms.GetHistogram(handleClass), info.Buffer.GetSize(),
NWilson::TTraceId(), Db->GetVDiskIncarnationGuid(), errorReason));
if (info.Buffer) {
- auto hugeWrite = CreateHullWriteHugeBlob(ctx, vMultiPutActorId, cookie, traceId, ignoreBlock,
- handleClass, info, std::move(result));
- ctx.Send(Db->HugeKeeperID, hugeWrite.release());
+ auto hugeWrite = CreateHullWriteHugeBlob(vMultiPutActorId, cookie, ignoreBlock, handleClass,
+ info, std::move(result));
+ ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, NWilson::TTraceId(traceId));
} else {
ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(),
ignoreBlock, vMultiPutActorId, cookie, std::move(result)));
@@ -614,16 +609,13 @@ namespace NKikimr {
}
void Handle(TEvBlobStorage::TEvVPut::TPtr &ev, const TActorContext &ctx) {
- const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this);
+ const bool postpone = OverloadHandler->PostponeEvent(ev);
if (!postpone) {
PrivateHandle(ev, ctx);
}
}
void PrivateHandle(TEvBlobStorage::TEvVPut::TPtr &ev, const TActorContext &ctx) {
- WILSON_TRACE_FROM_ACTOR(ctx, *this, &ev->TraceId, EvVPutReceived, VDiskId = SelfVDiskId,
- PDiskId = Config->BaseInfo.PDiskId, VDiskSlotId = Config->BaseInfo.VDiskSlotId);
-
IFaceMonGroup->PutMsgs()++;
IFaceMonGroup->PutTotalBytes() += ev->GetSize();
TInstant now = TAppData::TimeProvider->Now();
@@ -698,9 +690,9 @@ namespace NKikimr {
} else if (info.Buffer) {
// pass the work to huge blob writer
NKikimrBlobStorage::EPutHandleClass handleClass = record.GetHandleClass();
- auto hugeWrite = CreateHullWriteHugeBlob(ctx, ev->Sender, ev->Cookie, ev->TraceId, ignoreBlock,
- handleClass, info, std::move(result));
- ctx.Send(Db->HugeKeeperID, hugeWrite.release());
+ auto hugeWrite = CreateHullWriteHugeBlob(ev->Sender, ev->Cookie, ignoreBlock, handleClass, info,
+ std::move(result));
+ ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(ev->TraceId));
} else {
ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(),
ignoreBlock, ev->Sender, ev->Cookie, std::move(result)));
@@ -710,8 +702,6 @@ namespace NKikimr {
void Handle(TEvHullLogHugeBlob::TPtr &ev, const TActorContext &ctx) {
TEvHullLogHugeBlob *msg = ev->Get();
- WILSON_TRACE_FROM_ACTOR(ctx, *this, &msg->Result->TraceId, EvHullLogHugeBlobReceived);
-
// update hull write duration
msg->Result->MarkHugeWriteTime();
auto status = Hull->CheckLogoBlob(ctx, msg->LogoBlobID, msg->IgnoreBlock);
@@ -727,7 +717,7 @@ namespace NKikimr {
Hull->PostponeReplyUntilCommitted(msg->Result.release(), msg->OrigClient, msg->OrigCookie,
status.Lsn);
} else {
- SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), *this, msg->OrigCookie);
+ SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), msg->OrigCookie);
}
return;
@@ -824,12 +814,10 @@ namespace NKikimr {
using namespace NErrBuilder;
std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId,
Db->GetVDiskIncarnationGuid(), GInfo));
- SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie);
}
void Handle(TEvBlobStorage::TEvVGet::TPtr &ev, const TActorContext &ctx) {
- WILSON_TRACE_FROM_ACTOR(ctx, *this, &ev->TraceId, EvVGetReceived);
-
IFaceMonGroup->GetMsgs()++;
TInstant now = TAppData::TimeProvider->Now();
NKikimrBlobStorage::TEvVGet &record = ev->Get()->Record;
@@ -990,7 +978,7 @@ namespace NKikimr {
LOG_DEBUG_S(ctx, BS_VDISK_BLOCK, VCtx->VDiskLogPrefix
<< "TEvVGetBlockResult: " << result->ToString()
<< " Marker# BSVS17");
- SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie);
}
////////////////////////////////////////////////////////////////////////
@@ -1064,7 +1052,7 @@ namespace NKikimr {
using namespace NErrBuilder;
std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId,
Db->GetVDiskIncarnationGuid(), GInfo));
- SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie);
}
void Handle(TEvBlobStorage::TEvVGetBarrier::TPtr &ev, const TActorContext &ctx) {
@@ -1127,7 +1115,7 @@ namespace NKikimr {
using namespace NErrBuilder;
std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId,
Db->GetVDiskIncarnationGuid(), GInfo));
- SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie);
}
void Handle(TEvBlobStorage::TEvVDbStat::TPtr &ev, const TActorContext &ctx) {
@@ -1144,7 +1132,7 @@ namespace NKikimr {
IFaceMonGroup->DbStatResMsgsPtr(), nullptr, std::move(ev->TraceId));
THullDsSnap fullSnap = Hull->GetIndexSnapshot();
IActor *actor = CreateDbStatActor(HullCtx, HugeBlobCtx, ctx, std::move(fullSnap),
- ctx.SelfID, ev, std::move(result), *this);
+ ctx.SelfID, ev, std::move(result));
if (actor) {
auto aid = ctx.Register(actor);
ActiveActors.Insert(aid);
@@ -1181,7 +1169,7 @@ namespace NKikimr {
void Reply(const NKikimrProto::EReplyStatus status, const TString& /*errorReason*/, TEvBlobStorage::TEvVCompact::TPtr &ev,
const TActorContext &ctx, const TInstant &/*now*/) {
auto result = std::make_unique<TEvBlobStorage::TEvVCompactResult>(status, SelfVDiskId);
- SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie);
}
void ReplyError(NKikimrProto::EReplyStatus status, const TString& errorReason, TEvBlobStorage::TEvVCompact::TPtr &ev,
@@ -1239,7 +1227,7 @@ namespace NKikimr {
void Handle(TEvHullCompactResult::TPtr &ev, const TActorContext &ctx) {
Y_VERIFY(VDiskCompactionState);
- VDiskCompactionState->Compacted(ctx, *this, ev->Get()->RequestId, ev->Get()->Type);
+ VDiskCompactionState->Compacted(ctx, ev->Get()->RequestId, ev->Get()->Type);
}
////////////////////////////////////////////////////////////////////////
@@ -1248,7 +1236,7 @@ namespace NKikimr {
void Reply(const NKikimrProto::EReplyStatus status, const TString& /*errorReason*/,
TEvBlobStorage::TEvVBaldSyncLog::TPtr &ev, const TActorContext &ctx, const TInstant &/*now*/) {
auto result = std::make_unique<TEvBlobStorage::TEvVBaldSyncLogResult>(status, SelfVDiskId);
- SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie);
}
void ReplyError(NKikimrProto::EReplyStatus status, const TString& errorReason, TEvBlobStorage::TEvVBaldSyncLog::TPtr &ev,
@@ -1282,7 +1270,7 @@ namespace NKikimr {
using namespace NErrBuilder;
std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId,
Db->GetVDiskIncarnationGuid(), GInfo));
- SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie);
}
void Handle(TEvBlobStorage::TEvVSync::TPtr &ev, const TActorContext &ctx) {
@@ -1297,7 +1285,7 @@ namespace NKikimr {
using namespace NErrBuilder;
std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId,
Db->GetVDiskIncarnationGuid(), GInfo));
- SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie);
}
// FIXME: check for RACE in other handlers!!!
@@ -1336,7 +1324,7 @@ namespace NKikimr {
}
void Handle(TEvLocalSyncData::TPtr &ev, const TActorContext &ctx) {
- const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this);
+ const bool postpone = OverloadHandler->PostponeEvent(ev);
if (!postpone) {
PrivateHandle(ev, ctx);
}
@@ -1391,7 +1379,7 @@ namespace NKikimr {
}
void Handle(TEvAnubisOsirisPut::TPtr &ev, const TActorContext &ctx) {
- const bool postpone = OverloadHandler->PostponeEvent(ev, ctx, this);
+ const bool postpone = OverloadHandler->PostponeEvent(ev);
if (!postpone) {
PrivateHandle(ev, ctx);
}
@@ -1461,7 +1449,7 @@ namespace NKikimr {
using namespace NErrBuilder;
std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId,
Db->GetVDiskIncarnationGuid(), GInfo));
- SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie);
}
void Handle(TEvBlobStorage::TEvVSyncFull::TPtr &ev, const TActorContext &ctx) {
@@ -1488,7 +1476,7 @@ namespace NKikimr {
std::unique_ptr<ILoggedRec> loggedRec(LoggedRecsVault.Extract(loggedRecId));
Db->LsnMngr->ConfirmLsnForHull(loggedRec->Seg, loggedRec->ConfirmSyncLogAlso);
- loggedRec->Replay(*Hull, ctx, *this);
+ loggedRec->Replay(*Hull, ctx);
}
if (VDiskCompactionState && !results.empty()) {
VDiskCompactionState->Logged(ctx, results.back().Lsn);
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
index 9ed7de23f7..7f21fefcdc 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
@@ -3,7 +3,6 @@
#include "blobstorage_skeletonerr.h"
#include "blobstorage_skeleton.h"
#include <ydb/core/blobstorage/base/blobstorage_events.h>
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/base/utility.h>
#include <ydb/core/blobstorage/base/html.h>
@@ -99,21 +98,13 @@ namespace NKikimr {
NKikimrBlobStorage::EVDiskQueueId ExtQueueId;
NBackpressure::TQueueClientId ClientId;
TActorId ActorId;
+ NWilson::TSpan Span;
- TRecord()
- : Ev()
- , ReceivedTime()
- , Deadline()
- , ByteSize(0)
- , MsgId()
- , Cost(0)
- , ExtQueueId(NKikimrBlobStorage::EVDiskQueueId::Unknown)
- , ClientId()
- {}
+ TRecord() = default;
TRecord(std::unique_ptr<IEventHandle> ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId,
ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId,
- const NBackpressure::TQueueClientId& clientId)
+ const NBackpressure::TQueueClientId& clientId, TString name)
: Ev(std::move(ev))
, ReceivedTime(now)
, Deadline(deadline)
@@ -123,7 +114,10 @@ namespace NKikimr {
, ExtQueueId(extQueueId)
, ClientId(clientId)
, ActorId(Ev->Sender)
- {}
+ , Span(9 /*verbosity*/, NWilson::ERelation::FollowsFrom, std::move(Ev->TraceId), now, "VDisk.PutInQueue")
+ {
+ Span.Attribute("QueueName", std::move(name));
+ }
};
using TMyQueueBackpressure = NBackpressure::TQueueBackpressure<NBackpressure::TQueueClientId>;
@@ -224,10 +218,9 @@ namespace NKikimr {
++*SkeletonFrontDelayedCount;
*SkeletonFrontDelayedBytes += recByteSize;
- WILSON_TRACE_FROM_ACTOR(ctx, front, &converted->TraceId, EvSkeletonFrontEnqueue);
-
TInstant now = TAppData::TimeProvider->Now();
- Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId, clientId));
+ Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
+ clientId, Name));
}
}
@@ -256,13 +249,15 @@ namespace NKikimr {
TDuration inQueue = now - rec->ReceivedTime;
ApplyToRecord(*rec->Ev, TUpdateInQueueTime(inQueue));
+ // trace end of in-queue span
+ rec->Span.EndOk();
+
if (forceError) {
front.GetExtQueue(rec->ExtQueueId).DroppedWithError(ctx, rec, now, front);
} else if (now >= rec->Deadline) {
++Deadlines;
front.GetExtQueue(rec->ExtQueueId).DeadlineHappened(ctx, rec, now, front);
} else {
- WILSON_TRACE_FROM_ACTOR(ctx, front, &rec->Ev->TraceId, EvSkeletonFrontProceed);
ctx.ExecutorThread.Send(rec->Ev.release());
++InFlightCount;
@@ -1298,7 +1293,7 @@ namespace NKikimr {
TInstant now) {
using namespace NErrBuilder;
auto res = ErroneousResult(VCtx, status, errorReason, ev, now, nullptr, SelfVDiskId, VDiskIncarnationGuid, GInfo);
- SendVDiskResponse(ctx, ev->Sender, res.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie);
}
void Reply(TEvBlobStorage::TEvVCheckReadiness::TPtr &ev, const TActorContext &ctx,
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfull.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfull.cpp
index 3d4f78d008..8733f30643 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfull.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfull.cpp
@@ -125,7 +125,7 @@ namespace NKikimr {
Result->Record.SetBlockTabletFrom(KeyBlock.TabletId);
KeyBarrier.Serialize(*Result->Record.MutableBarrierFrom());
// send reply
- SendVDiskResponse(ctx, Recipient, Result.release(), *this, 0);
+ SendVDiskResponse(ctx, Recipient, Result.release(), 0);
// notify parent about death
ctx.Send(ParentId, new TEvents::TEvActorDied);
Die(ctx);
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp
index 11f708e717..8452a57134 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp
@@ -59,7 +59,7 @@ namespace NKikimr {
auto result = std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(NKikimrProto::ERROR, SelfVDiskId,
Record.GetCookie(), Now, IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, std::move(Ev->TraceId),
Ev->GetChannel());
- SendVDiskResponse(ctx, recipient, result.release(), *this, cookie);
+ SendVDiskResponse(ctx, recipient, result.release(), cookie);
Die(ctx);
return;
}
@@ -74,7 +74,7 @@ namespace NKikimr {
auto result = std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(NKikimrProto::NODATA, SelfVDiskId,
TSyncState(Db->GetVDiskIncarnationGuid(), DbBirthLsn), Record.GetCookie(), Now,
IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, std::move(Ev->TraceId), Ev->GetChannel());
- SendVDiskResponse(ctx, recipient, result.release(), *this, cookie);
+ SendVDiskResponse(ctx, recipient, result.release(), cookie);
Die(ctx);
return;
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.cpp
index daaa498926..cc7967978f 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.cpp
@@ -53,7 +53,6 @@ namespace NKikimr {
void TVDiskCompactionState::Compacted(
const TActorContext &ctx,
- const IActor& actor,
i64 reqId,
EHullDbType dbType) {
auto it = Requests.find(reqId);
@@ -68,7 +67,7 @@ namespace NKikimr {
}
if (req.AllDone()) {
- SendVDiskResponse(ctx, req.ClientId, req.Reply.release(), actor, req.ClientCookie);
+ SendVDiskResponse(ctx, req.ClientId, req.Reply.release(), req.ClientCookie);
// delete req from Request, we handled it
Requests.erase(it);
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.h
index 1a0a39677b..2b9777609a 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.h
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_compactionstate.h
@@ -24,7 +24,7 @@ namespace NKikimr {
// setup input compaction request
void Setup(const TActorContext &ctx, std::optional<ui64> lsn, TCompactionReq cState);
// when hull db reports compaction finish we change state by calling this function
- void Compacted(const TActorContext &ctx, const IActor& actor, i64 reqId, EHullDbType dbType);
+ void Compacted(const TActorContext &ctx, i64 reqId, EHullDbType dbType);
// when data is flushed to recovery log run compaction
void Logged(const TActorContext &ctx, ui64 lsn) {
if (Triggered && lsn >= LsnToCommit) {
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp
index ce4615fb5d..474714864b 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp
@@ -35,7 +35,7 @@ namespace NKikimr {
, RecipientCookie(recipientCookie)
{}
- void TLoggedRecVPut::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) {
+ void TLoggedRecVPut::Replay(THull &hull, const TActorContext &ctx) {
TLogoBlobID genId(Id, 0);
hull.AddLogoBlob(ctx, genId, Id.PartId(), Ingress, Buffer, Seg.Point());
@@ -44,7 +44,7 @@ namespace NKikimr {
<< " msg# " << Result->ToString()
<< " Marker# BSVSLR01");
- SendVDiskResponse(ctx, Recipient, Result.release(), actor, RecipientCookie);
+ SendVDiskResponse(ctx, Recipient, Result.release(), RecipientCookie);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -68,8 +68,7 @@ namespace NKikimr {
, RecipientCookie(recipientCookie)
{}
- void TLoggedRecVMultiPutItem::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) {
- Y_UNUSED(actor);
+ void TLoggedRecVMultiPutItem::Replay(THull &hull, const TActorContext &ctx) {
TLogoBlobID genId(Id, 0);
hull.AddLogoBlob(ctx, genId, Id.PartId(), Ingress, Buffer, Seg.Point());
@@ -95,7 +94,7 @@ namespace NKikimr {
, Ev(ev)
{}
- void TLoggedRecVPutHuge::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) {
+ void TLoggedRecVPutHuge::Replay(THull &hull, const TActorContext &ctx) {
TEvHullLogHugeBlob *msg = Ev->Get();
TLogoBlobID genId(msg->LogoBlobID, 0);
@@ -108,7 +107,7 @@ namespace NKikimr {
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PUT, hull.GetHullCtx()->VCtx->VDiskLogPrefix
<< "TEvVPut: realtime# false result# " << msg->Result->ToString()
<< " Marker# BSVSLR03");
- SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), actor, msg->OrigCookie);
+ SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), msg->OrigCookie);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -132,9 +131,9 @@ namespace NKikimr {
, RecipientCookie(recipientCookie)
{}
- void TLoggedRecVBlock::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) {
- auto replySender = [&ctx, &actor] (const TActorId &id, ui64 cookie, IEventBase *msg) {
- SendVDiskResponse(ctx, id, msg, actor, cookie);
+ void TLoggedRecVBlock::Replay(THull &hull, const TActorContext &ctx) {
+ auto replySender = [&ctx] (const TActorId &id, ui64 cookie, IEventBase *msg) {
+ SendVDiskResponse(ctx, id, msg, cookie);
};
hull.AddBlockCmd(ctx, TabletId, Gen, IssuerGuid, Seg.Point(), replySender);
@@ -142,7 +141,7 @@ namespace NKikimr {
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_BLOCK, hull.GetHullCtx()->VCtx->VDiskLogPrefix
<< "TEvVBlock: result# " << Result->ToString()
<< " Marker# BSVSLR04");
- SendVDiskResponse(ctx, Recipient, Result.release(), actor, RecipientCookie);
+ SendVDiskResponse(ctx, Recipient, Result.release(), RecipientCookie);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -160,14 +159,14 @@ namespace NKikimr {
, OrigEv(origEv)
{}
- void TLoggedRecVCollectGarbage::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) {
+ void TLoggedRecVCollectGarbage::Replay(THull &hull, const TActorContext &ctx) {
NKikimrBlobStorage::TEvVCollectGarbage &record = OrigEv->Get()->Record;
hull.AddGCCmd(ctx, record, Ingress, Seg);
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_GC, hull.GetHullCtx()->VCtx->VDiskLogPrefix
<< "TEvVCollectGarbage: result# " << Result->ToString()
<< " Marker# BSVSLR05");
- SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), actor, OrigEv->Cookie);
+ SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), OrigEv->Cookie);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -183,9 +182,9 @@ namespace NKikimr {
, OrigEv(origEv)
{}
- void TLoggedRecLocalSyncData::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) {
- auto replySender = [&ctx, &actor] (const TActorId &id, ui64 cookie, IEventBase *msg) {
- SendVDiskResponse(ctx, id, msg, actor, cookie);
+ void TLoggedRecLocalSyncData::Replay(THull &hull, const TActorContext &ctx) {
+ auto replySender = [&ctx] (const TActorId &id, ui64 cookie, IEventBase *msg) {
+ SendVDiskResponse(ctx, id, msg, cookie);
};
#ifdef UNPACK_LOCALSYNCDATA
@@ -193,7 +192,7 @@ namespace NKikimr {
#else
hull.AddSyncDataCmd(ctx, OrigEv->Get()->Data, Seg, replySender);
#endif
- SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), actor, OrigEv->Cookie);
+ SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), OrigEv->Cookie);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -211,9 +210,9 @@ namespace NKikimr {
, OrigEv(origEv)
{}
- void TLoggedRecAnubisOsirisPut::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) {
+ void TLoggedRecAnubisOsirisPut::Replay(THull &hull, const TActorContext &ctx) {
hull.AddAnubisOsirisLogoBlob(ctx, Insert.Id, Insert.Ingress, Seg);
- SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), actor, OrigEv->Cookie);
+ SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), OrigEv->Cookie);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -227,8 +226,7 @@ namespace NKikimr {
, OrigEv(origEv)
{}
- void TLoggedRecPhantoms::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) {
- Y_UNUSED(actor);
+ void TLoggedRecPhantoms::Replay(THull &hull, const TActorContext &ctx) {
TEvDetectedPhantomBlob *msg = OrigEv->Get();
hull.CollectPhantoms(ctx, msg->Phantoms, Seg);
}
@@ -248,9 +246,9 @@ namespace NKikimr {
, RecipientCookie(recipientCookie)
{}
- void TLoggedRecDelLogoBlobDataSyncLog::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) {
+ void TLoggedRecDelLogoBlobDataSyncLog::Replay(THull &hull, const TActorContext &ctx) {
Y_UNUSED(hull);
- SendVDiskResponse(ctx, Recipient, Result.release(), actor, RecipientCookie);
+ SendVDiskResponse(ctx, Recipient, Result.release(), RecipientCookie);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -264,9 +262,9 @@ namespace NKikimr {
, OrigEv(ev)
{}
- void TLoggedRecAddBulkSst::Replay(THull &hull, const TActorContext &ctx, const IActor& actor) {
+ void TLoggedRecAddBulkSst::Replay(THull &hull, const TActorContext &ctx) {
hull.AddBulkSst(ctx, OrigEv->Get()->Essence, Seg);
- SendVDiskResponse(ctx, OrigEv->Sender, new TEvAddBulkSstResult, actor, OrigEv->Cookie);
+ SendVDiskResponse(ctx, OrigEv->Sender, new TEvAddBulkSstResult, OrigEv->Cookie);
}
///////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h
index 50a7a06d2a..34c3c4a5df 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h
@@ -36,7 +36,7 @@ namespace NKikimr {
ILoggedRec(TLsnSeg seg, bool confirmSyncLogAlso);
virtual ~ILoggedRec() = default;
// a method that replays changes that has been written to the recovery log
- virtual void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) = 0;
+ virtual void Replay(THull &hull, const TActorContext &ctx) = 0;
const TLsnSeg Seg;
const bool ConfirmSyncLogAlso;
@@ -50,7 +50,7 @@ namespace NKikimr {
TLoggedRecVPut(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id, const TIngress &ingress,
TRope &&buffer, std::unique_ptr<TEvBlobStorage::TEvVPutResult> result, const TActorId &recipient,
ui64 recipientCookie);
- void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override;
+ void Replay(THull &hull, const TActorContext &ctx) override;
private:
TLogoBlobID Id;
@@ -69,7 +69,7 @@ namespace NKikimr {
TLoggedRecVMultiPutItem(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id, const TIngress &ingress,
TRope &&buffer, std::unique_ptr<TEvVMultiPutItemResult> result, const TActorId &recipient,
ui64 recipientCookie);
- void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override;
+ void Replay(THull &hull, const TActorContext &ctx) override;
private:
TLogoBlobID Id;
@@ -87,7 +87,7 @@ namespace NKikimr {
public:
TLoggedRecVPutHuge(TLsnSeg seg, bool confirmSyncLogAlso, const TActorId &hugeKeeperId,
TEvHullLogHugeBlob::TPtr ev);
- void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override;
+ void Replay(THull &hull, const TActorContext &ctx) override;
private:
const TActorId HugeKeeperId;
@@ -101,7 +101,7 @@ namespace NKikimr {
public:
TLoggedRecVBlock(TLsnSeg seg, bool confirmSyncLogAlso, ui64 tabletId, ui32 gen, ui64 issuerGuid,
std::unique_ptr<TEvBlobStorage::TEvVBlockResult> result, const TActorId &recipient, ui64 recipientCookie);
- void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override;
+ void Replay(THull &hull, const TActorContext &ctx) override;
private:
ui64 TabletId;
@@ -120,7 +120,7 @@ namespace NKikimr {
TLoggedRecVCollectGarbage(TLsnSeg seg, bool confirmSyncLogAlso, TBarrierIngress ingress,
std::unique_ptr<TEvBlobStorage::TEvVCollectGarbageResult> result,
TEvBlobStorage::TEvVCollectGarbage::TPtr origEv);
- void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override;
+ void Replay(THull &hull, const TActorContext &ctx) override;
private:
TBarrierIngress Ingress;
@@ -135,7 +135,7 @@ namespace NKikimr {
public:
TLoggedRecLocalSyncData(TLsnSeg seg, bool confirmSyncLogAlso, std::unique_ptr<TEvLocalSyncDataResult> result,
TEvLocalSyncData::TPtr origEv);
- void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override;
+ void Replay(THull &hull, const TActorContext &ctx) override;
private:
std::unique_ptr<TEvLocalSyncDataResult> Result;
@@ -150,7 +150,7 @@ namespace NKikimr {
TLoggedRecAnubisOsirisPut(TLsnSeg seg, bool confirmSyncLogAlso,
const TEvAnubisOsirisPut::THullDbInsert &insert, std::unique_ptr<TEvAnubisOsirisPutResult> result,
TEvAnubisOsirisPut::TPtr origEv);
- void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override;
+ void Replay(THull &hull, const TActorContext &ctx) override;
private:
TEvAnubisOsirisPut::THullDbInsert Insert;
@@ -164,7 +164,7 @@ namespace NKikimr {
class TLoggedRecPhantoms : public ILoggedRec {
public:
TLoggedRecPhantoms(TLsnSeg seg, bool confirmSyncLogAlso, TEvDetectedPhantomBlob::TPtr origEv);
- void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override;
+ void Replay(THull &hull, const TActorContext &ctx) override;
private:
TEvDetectedPhantomBlob::TPtr OrigEv;
@@ -178,7 +178,7 @@ namespace NKikimr {
TLoggedRecDelLogoBlobDataSyncLog(TLsnSeg seg, bool confirmSyncLogAlso,
std::unique_ptr<TEvDelLogoBlobDataSyncLogResult> result, const TActorId &recipient,
ui64 recipientCookie);
- void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override;
+ void Replay(THull &hull, const TActorContext &ctx) override;
private:
std::unique_ptr<TEvDelLogoBlobDataSyncLogResult> Result;
@@ -193,7 +193,7 @@ namespace NKikimr {
class TLoggedRecAddBulkSst : public ILoggedRec {
public:
TLoggedRecAddBulkSst(TLsnSeg seg, bool confirmSyncLogAlso, TEvAddBulkSst::TPtr ev);
- void Replay(THull &hull, const TActorContext &ctx, const IActor& actor) override;
+ void Replay(THull &hull, const TActorContext &ctx) override;
private:
TEvAddBulkSst::TPtr OrigEv;
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp
index 45968b8d54..fbe53c1968 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp
@@ -1,5 +1,4 @@
#include "skeleton_overload_handler.h"
-#include <ydb/core/blobstorage/base/wilson_events.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_pdiskctx.h>
#include <ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hullsatisfactionrank.h>
#include <ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h>
@@ -210,30 +209,6 @@ namespace NKikimr {
return proceedFurther;
}
- bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVMovedPatch::TPtr &ev, const TActorContext &ctx, IActor *skeleton) {
- return PostponeEventPrivate(ev, ctx, skeleton);
- }
-
- bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVPatchStart::TPtr &ev, const TActorContext &ctx, IActor *skeleton) {
- return PostponeEventPrivate(ev, ctx, skeleton);
- }
-
- bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton) {
- return PostponeEventPrivate(ev, ctx, skeleton);
- }
-
- bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVMultiPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton) {
- return PostponeEventPrivate(ev, ctx, skeleton);
- }
-
- bool TOverloadHandler::PostponeEvent(TEvLocalSyncData::TPtr &ev, const TActorContext &ctx, IActor *skeleton) {
- return PostponeEventPrivate(ev, ctx, skeleton);
- }
-
- bool TOverloadHandler::PostponeEvent(TEvAnubisOsirisPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton) {
- return PostponeEventPrivate(ev, ctx, skeleton);
- }
-
void TOverloadHandler::ToWhiteboard(const TOverloadHandler *this_, NKikimrWhiteboard::TVDiskSatisfactionRank &v) {
if (this_) {
this_->DynamicPDiskWeightsManager->ToWhiteboard(v);
@@ -258,9 +233,8 @@ namespace NKikimr {
}
template <class TEv>
- inline bool TOverloadHandler::PostponeEventPrivate(TEv &ev, const TActorContext &ctx, IActor *skeleton) {
+ inline bool TOverloadHandler::PostponeEvent(TAutoPtr<TEventHandle<TEv>> &ev) {
if (DynamicPDiskWeightsManager->StopPuts() || !EmergencyQueue->Empty()) {
- WILSON_TRACE_FROM_ACTOR(ctx, *skeleton, &ev->TraceId, EvPutIntoEmergQueue);
EmergencyQueue->Push(ev);
return true;
} else {
@@ -268,4 +242,11 @@ namespace NKikimr {
}
}
+ template bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVMovedPatch::TPtr &ev);
+ template bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVPatchStart::TPtr &ev);
+ template bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVPut::TPtr &ev);
+ template bool TOverloadHandler::PostponeEvent(TEvBlobStorage::TEvVMultiPut::TPtr &ev);
+ template bool TOverloadHandler::PostponeEvent(TEvLocalSyncData::TPtr &ev);
+ template bool TOverloadHandler::PostponeEvent(TEvAnubisOsirisPut::TPtr &ev);
+
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.h
index 00707bb803..a20d375ef5 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.h
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.h
@@ -78,12 +78,8 @@ namespace NKikimr {
bool ProcessPostponedEvents(const TActorContext &ctx, int batchSize, bool actualizeLevels);
// Postpone event in case of overload
- bool PostponeEvent(TEvBlobStorage::TEvVMovedPatch::TPtr &ev, const TActorContext &ctx, IActor *skeleton);
- bool PostponeEvent(TEvBlobStorage::TEvVPatchStart::TPtr &ev, const TActorContext &ctx, IActor *skeleton);
- bool PostponeEvent(TEvBlobStorage::TEvVPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton);
- bool PostponeEvent(TEvBlobStorage::TEvVMultiPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton);
- bool PostponeEvent(TEvLocalSyncData::TPtr &ev, const TActorContext &ctx, IActor *skeleton);
- bool PostponeEvent(TEvAnubisOsirisPut::TPtr &ev, const TActorContext &ctx, IActor *skeleton);
+ template<typename TEv>
+ bool PostponeEvent(TAutoPtr<TEventHandle<TEv>> &ev);
static void ToWhiteboard(const TOverloadHandler *this_, NKikimrWhiteboard::TVDiskSatisfactionRank &v);
ui32 GetIntegralRankPercent() const;
@@ -95,9 +91,6 @@ namespace NKikimr {
NMonGroup::TSkeletonOverloadGroup Mon;
std::unique_ptr<TEmergencyQueue> EmergencyQueue;
std::shared_ptr<TDynamicPDiskWeightsManager> DynamicPDiskWeightsManager;
-
- template <class TEv>
- bool PostponeEventPrivate(TEv &ev, const TActorContext &ctx, IActor *skeleton);
};
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp
index cc24115b3f..78870536aa 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp
@@ -98,7 +98,7 @@ namespace NKikimr {
<< " ErrorReason# " << ErrorReason
<< " Marker# BSVSP01");
}
- SendVDiskResponse(ctx, Event->Sender, vMovedPatchResult.release(), *this, Event->Cookie);
+ SendVDiskResponse(ctx, Event->Sender, vMovedPatchResult.release(), Event->Cookie);
PassAway();
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp
index ac7dcd7d58..bbea6637af 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp
@@ -90,7 +90,7 @@ namespace NKikimr {
vMultiPutResult->Record.SetStatusFlags(OOSStatus.Flags);
- SendVDiskResponse(ctx, Event->Sender, vMultiPutResult.release(), *this, Event->Cookie);
+ SendVDiskResponse(ctx, Event->Sender, vMultiPutResult.release(), Event->Cookie);
PassAway();
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp
index b04e8d6d8f..0e1f8eb796 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp
@@ -184,7 +184,7 @@ namespace NKikimr::NPrivate {
FoundPartsEvent->AddPart(part);
}
FoundPartsEvent->SetStatus(status);
- SendVDiskResponse(TActivationContext::AsActorContext(), Sender, FoundPartsEvent.release(), *this, Cookie);
+ SendVDiskResponse(TActivationContext::AsActorContext(), Sender, FoundPartsEvent.release(), Cookie);
}
void PullOriginalPart(ui64 pullingPart) {
@@ -254,7 +254,7 @@ namespace NKikimr::NPrivate {
(ErrorReason, ErrorReason));
Y_VERIFY(ResultEvent);
ResultEvent->SetStatus(status, ErrorReason);
- SendVDiskResponse(TActivationContext::AsActorContext(), Sender, ResultEvent.release(), *this, Cookie);
+ SendVDiskResponse(TActivationContext::AsActorContext(), Sender, ResultEvent.release(), Cookie);
}
void HandleVGetResult(TEvBlobStorage::TEvVGetResult::TPtr &ev) {
@@ -316,7 +316,7 @@ namespace NKikimr::NPrivate {
for (ui32 idx = ReceivedXorDiffs.size(); idx != 0; --idx) {
auto &[diffs, partId, result, sender, cookie] = ReceivedXorDiffs.back();
GType.ApplyXorDiff(TErasureType::CrcModeNone, dataSize, buffer, diffs, partId - 1, toPart - 1);
- SendVDiskResponse(TActivationContext::AsActorContext(), sender, result.release(), *this, cookie);
+ SendVDiskResponse(TActivationContext::AsActorContext(), sender, result.release(), cookie);
ReceivedXorDiffs.pop_back();
}
@@ -528,7 +528,7 @@ namespace NKikimr::NPrivate {
auto resultEvent = std::make_unique<TEvBlobStorage::TEvVPatchXorDiffResult>(
NKikimrProto::ERROR, now, &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr,
std::move(ev->TraceId));
- SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), *this, ev->Cookie);
+ SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), ev->Cookie);
}
void Handle(TEvBlobStorage::TEvVPatchXorDiff::TPtr &ev) {
@@ -554,9 +554,9 @@ namespace NKikimr::NPrivate {
if (!CheckDiff(xorDiffs, "XorDiff from datapart")) {
for (auto &[diffs, partId, result, sender, cookie] : ReceivedXorDiffs) {
- SendVDiskResponse(TActivationContext::AsActorContext(), sender, result.release(), *this, cookie);
+ SendVDiskResponse(TActivationContext::AsActorContext(), sender, result.release(), cookie);
}
- SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), *this, ev->Cookie);
+ SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), ev->Cookie);
if (ResultEvent) {
SendVPatchResult(NKikimrProto::ERROR);
@@ -577,7 +577,7 @@ namespace NKikimr::NPrivate {
}
xorDiffs.clear();
- SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), *this, ev->Cookie);
+ SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), ev->Cookie);
} else {
ReceivedXorDiffs.emplace_back(std::move(xorDiffs), fromPart, std::move(resultEvent),
ev->Sender, ev->Cookie);
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp
index 907b3873f8..9b7dd49ebb 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp
@@ -142,7 +142,7 @@ namespace NKikimr {
TSyncState(), true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now,
SlCtx->CountersMonGroup.VDiskCheckFailedPtr(), nullptr, std::move(ev->TraceId),
ev->GetChannel());
- SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie);
return;
}
@@ -161,7 +161,7 @@ namespace NKikimr {
auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(NKikimrProto::BLOCKED, SelfVDiskId,
TSyncState(), true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now,
SlCtx->CountersMonGroup.DiskLockedPtr(), nullptr, std::move(ev->TraceId), ev->GetChannel());
- SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie);
return;
}
@@ -181,7 +181,7 @@ namespace NKikimr {
auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(status, SelfVDiskId, syncState,
true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now,
SlCtx->CountersMonGroup.UnequalGuidPtr(), nullptr, std::move(ev->TraceId), ev->GetChannel());
- SendVDiskResponse(ctx, ev->Sender, result.release(), *this, ev->Cookie);
+ SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie);
return;
}
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp
index 2873bcb56c..6cbe05c4ec 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp
@@ -177,7 +177,7 @@ namespace NKikimr {
FragmentWriter.Finish(result->Record.MutableData());
}
- SendVDiskResponse(ctx, Ev->Sender, result.release(), *this, Ev->Cookie);
+ SendVDiskResponse(ctx, Ev->Sender, result.release(), Ev->Cookie);
ctx.Send(ParentId, new TEvSyncLogReadFinished(SourceVDisk));
Die(ctx);
}
diff --git a/ydb/library/pdisk_io/CMakeLists.darwin.txt b/ydb/library/pdisk_io/CMakeLists.darwin.txt
index 902a753e73..ef2e2d0688 100644
--- a/ydb/library/pdisk_io/CMakeLists.darwin.txt
+++ b/ydb/library/pdisk_io/CMakeLists.darwin.txt
@@ -13,10 +13,10 @@ target_link_libraries(ydb-library-pdisk_io PUBLIC
yutil
tools-enum_parser-enum_serialization_runtime
cpp-actors-core
+ cpp-actors-wilson
cpp-monlib-dynamic_counters
ydb-core-debug
library-pdisk_io-protos
- ydb-library-wilson
)
target_sources(ydb-library-pdisk_io PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/pdisk_io/aio_mtp.cpp
diff --git a/ydb/library/pdisk_io/CMakeLists.linux.txt b/ydb/library/pdisk_io/CMakeLists.linux.txt
index 845369e155..9706b4e3b5 100644
--- a/ydb/library/pdisk_io/CMakeLists.linux.txt
+++ b/ydb/library/pdisk_io/CMakeLists.linux.txt
@@ -16,10 +16,10 @@ target_link_libraries(ydb-library-pdisk_io PUBLIC
AIO::aio
$CONAN_OPTS_SEM
cpp-actors-core
+ cpp-actors-wilson
cpp-monlib-dynamic_counters
ydb-core-debug
library-pdisk_io-protos
- ydb-library-wilson
)
target_sources(ydb-library-pdisk_io PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/pdisk_io/aio_linux.cpp
diff --git a/ydb/library/pdisk_io/aio.h b/ydb/library/pdisk_io/aio.h
index 61d0ee20e6..3dfd861459 100644
--- a/ydb/library/pdisk_io/aio.h
+++ b/ydb/library/pdisk_io/aio.h
@@ -7,7 +7,7 @@
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h>
#include <ydb/core/blobstorage/pdisk/blobstorage_pdisk_util_devicemode.h>
-#include <ydb/library/wilson/wilson_event.h>
+#include <library/cpp/actors/wilson/wilson_event.h>
#include <util/system/file.h>
#include <util/generic/string.h>
diff --git a/ydb/library/pretty_types_print/wilson/CMakeLists.txt b/ydb/library/pretty_types_print/wilson/CMakeLists.txt
index 2c5b6069e0..359ebc1d57 100644
--- a/ydb/library/pretty_types_print/wilson/CMakeLists.txt
+++ b/ydb/library/pretty_types_print/wilson/CMakeLists.txt
@@ -11,7 +11,7 @@ add_library(library-pretty_types_print-wilson)
target_link_libraries(library-pretty_types_print-wilson PUBLIC
contrib-libs-cxxsupp
yutil
- ydb-library-wilson
+ cpp-actors-wilson
)
target_sources(library-pretty_types_print-wilson PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/pretty_types_print/wilson/out.cpp
diff --git a/ydb/library/pretty_types_print/wilson/out.cpp b/ydb/library/pretty_types_print/wilson/out.cpp
index 9b8c60f981..fdae4e0d11 100644
--- a/ydb/library/pretty_types_print/wilson/out.cpp
+++ b/ydb/library/pretty_types_print/wilson/out.cpp
@@ -1,4 +1,4 @@
-#include <ydb/library/wilson/wilson_trace.h>
+#include <library/cpp/actors/wilson/wilson_trace.h>
#include <util/stream/output.h>
diff --git a/ydb/library/wilson/CMakeLists.txt b/ydb/library/wilson/CMakeLists.txt
deleted file mode 100644
index 08855a190c..0000000000
--- a/ydb/library/wilson/CMakeLists.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-
-# This file was gererated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(ydb-library-wilson INTERFACE)
-target_link_libraries(ydb-library-wilson INTERFACE
- contrib-libs-cxxsupp
- yutil
- cpp-actors-wilson
-)
diff --git a/ydb/library/wilson/wilson_event.h b/ydb/library/wilson/wilson_event.h
deleted file mode 100644
index e092a2cd97..0000000000
--- a/ydb/library/wilson/wilson_event.h
+++ /dev/null
@@ -1,2 +0,0 @@
-#pragma once
-#include <library/cpp/actors/wilson/wilson_event.h>
diff --git a/ydb/library/wilson/wilson_trace.h b/ydb/library/wilson/wilson_trace.h
deleted file mode 100644
index 8c27c41fbd..0000000000
--- a/ydb/library/wilson/wilson_trace.h
+++ /dev/null
@@ -1,2 +0,0 @@
-#pragma once
-#include <library/cpp/actors/wilson/wilson_trace.h>