aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-07-28 15:33:09 +0300
committeralexvru <alexvru@ydb.tech>2022-07-28 15:33:09 +0300
commit0836b1ef7920fc395f848c298b2ea601d83dbab2 (patch)
tree6c2bb4b2c6dcd32f045c8f8620d68a57e3d1f57a
parentb333b9c0b2519d13f1b00518e4ad398b6c06ace5 (diff)
downloadydb-0836b1ef7920fc395f848c298b2ea601d83dbab2.tar.gz
Mark down DS proxy and VDisk parts to use Wilson tracing
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.cpp2
-rw-r--r--library/cpp/actors/interconnect/interconnect_channel.h5
-rw-r--r--library/cpp/actors/interconnect/interconnect_tcp_session.cpp2
-rw-r--r--library/cpp/actors/interconnect/packet.h4
-rw-r--r--library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp2
-rw-r--r--library/cpp/actors/protos/actors.proto4
-rw-r--r--library/cpp/actors/wilson/CMakeLists.txt1
-rw-r--r--library/cpp/actors/wilson/wilson_span.cpp5
-rw-r--r--library/cpp/actors/wilson/wilson_span.h85
-rw-r--r--library/cpp/actors/wilson/wilson_trace.h55
-rw-r--r--library/cpp/actors/wilson/wilson_uploader.cpp32
-rw-r--r--ydb/core/base/wilson.h14
-rw-r--r--ydb/core/blobstorage/backpressure/defs.h1
-rw-r--r--ydb/core/blobstorage/backpressure/queue.cpp7
-rw-r--r--ydb/core/blobstorage/backpressure/queue.h17
-rw-r--r--ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp2
-rw-r--r--ydb/core/blobstorage/backpressure/ut_client/skeleton_front_mock.h4
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy.h22
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp52
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h13
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_impl.h5
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp6
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put.cpp130
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp6
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h55
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_range.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp2
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h2
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp6
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp6
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp18
-rw-r--r--ydb/core/blobstorage/dsproxy/ut/dsproxy_test_state_ut.h6
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp6
-rw-r--r--ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp2
-rw-r--r--ydb/core/blobstorage/ut_vdisk/lib/vdisk_mock.cpp19
-rw-r--r--ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h6
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.cpp6
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h121
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_private_events.h5
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_response.cpp16
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp63
-rw-r--r--ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h3
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp3
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h5
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp.h21
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp_ut.cpp12
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_extr.cpp4
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readactor.cpp13
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readactor.h3
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readbatch.cpp4
-rw-r--r--ydb/core/blobstorage/vdisk/query/query_readbatch.h3
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp96
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h32
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp5
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp7
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp17
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h8
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp9
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp10
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp14
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp11
-rw-r--r--ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer.cpp5
-rw-r--r--ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h5
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp7
-rw-r--r--ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp2
-rw-r--r--ydb/core/protos/blobstorage.proto2
-rw-r--r--ydb/core/yq/libs/audit/CMakeLists.txt1
73 files changed, 653 insertions, 478 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp
index f839741cb54..fed13946866 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.cpp
+++ b/library/cpp/actors/interconnect/interconnect_channel.cpp
@@ -18,7 +18,7 @@ namespace NActors {
return false;
}
- const NWilson::TTraceId traceId(event.Span);
+ auto traceId = event.Span.GetTraceId();
event.Span.EndOk();
LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize());
diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h
index 25e996ddcae..8849d19b92c 100644
--- a/library/cpp/actors/interconnect/interconnect_channel.h
+++ b/library/cpp/actors/interconnect/interconnect_channel.h
@@ -55,12 +55,11 @@ namespace NActors {
~TEventOutputChannel() {
}
- std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TInstant now) {
+ std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) {
TEventHolder& event = Pool.Allocate(Queue);
const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1));
OutputQueueSize += bytes;
- if (auto span = NWilson::TSpan(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf,
- NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue")) {
+ if (auto span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) {
event.Span = std::move(span
.Attribute("OutputQueueItems", static_cast<i64>(Queue.size()))
.Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize)));
diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
index 0f286153ea5..feb55a16ad6 100644
--- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
+++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp
@@ -126,7 +126,7 @@ namespace NActors {
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
- const auto [dataSize, event] = oChannel.Push(*ev, TActivationContext::Now());
+ const auto [dataSize, event] = oChannel.Push(*ev);
LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize);
TotalOutputQueueSize += dataSize;
diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h
index 9161e3af710..c8909c08a76 100644
--- a/library/cpp/actors/interconnect/packet.h
+++ b/library/cpp/actors/interconnect/packet.h
@@ -138,8 +138,8 @@ struct TEventHolder : TNonCopyable {
const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr;
Span.EndError("nondelivery");
auto ev = Event
- ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span)
- : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span);
+ ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId())
+ : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId());
NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure));
}
diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
index 9b47f5b592d..565a511859e 100644
--- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
+++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp
@@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) {
auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0);
auto& ch = scheduler.GetOutputChannel(channel);
const bool wasWorking = ch.IsWorking();
- ch.Push(*ev, TInstant::Now());
+ ch.Push(*ev);
if (!wasWorking) {
scheduler.AddToHeap(ch, 0);
}
diff --git a/library/cpp/actors/protos/actors.proto b/library/cpp/actors/protos/actors.proto
index ad347dc2907..329eb998dfc 100644
--- a/library/cpp/actors/protos/actors.proto
+++ b/library/cpp/actors/protos/actors.proto
@@ -7,6 +7,10 @@ message TActorId {
required fixed64 RawX2 = 2;
}
+message TTraceId {
+ optional bytes Data = 1;
+}
+
message TCallbackException {
required TActorId ActorId = 1;
required string ExceptionMessage = 2;
diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt
index 75c7b16dff8..09a555a1317 100644
--- a/library/cpp/actors/wilson/CMakeLists.txt
+++ b/library/cpp/actors/wilson/CMakeLists.txt
@@ -12,6 +12,7 @@ target_link_libraries(cpp-actors-wilson PUBLIC
contrib-libs-cxxsupp
yutil
cpp-actors-core
+ cpp-actors-protos
actors-wilson-protos
)
target_sources(cpp-actors-wilson PRIVATE
diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp
index a97184c3f09..c932560254d 100644
--- a/library/cpp/actors/wilson/wilson_span.cpp
+++ b/library/cpp/actors/wilson/wilson_span.cpp
@@ -53,9 +53,8 @@ namespace NWilson {
}
void TSpan::Send() {
- if (TlsActivationContext) {
- TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
- }
+ TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span)));
+ Data->Sent = true;
}
} // NWilson
diff --git a/library/cpp/actors/wilson/wilson_span.h b/library/cpp/actors/wilson/wilson_span.h
index c2de2f0b68c..3c201abd381 100644
--- a/library/cpp/actors/wilson/wilson_span.h
+++ b/library/cpp/actors/wilson/wilson_span.h
@@ -1,5 +1,7 @@
#pragma once
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/wilson/protos/trace.pb.h>
#include <util/generic/hash.h>
#include <util/datetime/cputimer.h>
@@ -42,12 +44,17 @@ namespace NWilson {
const ui64 StartCycles;
const TTraceId TraceId;
NTraceProto::Span Span;
+ bool Sent = false;
TData(TInstant startTime, ui64 startCycles, TTraceId traceId)
: StartTime(startTime)
, StartCycles(startCycles)
, TraceId(std::move(traceId))
{}
+
+ ~TData() {
+ Y_VERIFY_DEBUG(Sent);
+ }
};
std::unique_ptr<TData> Data;
@@ -57,94 +64,134 @@ namespace NWilson {
TSpan(const TSpan&) = delete;
TSpan(TSpan&&) = default;
- TSpan(ui8 verbosity, ERelation /*relation*/, TTraceId parentId, TInstant now, std::optional<TString> name)
- : Data(parentId ? std::make_unique<TData>(now, GetCycleCount(), parentId.Span(verbosity)) : nullptr)
+ TSpan(ui8 verbosity, TTraceId parentId, std::optional<TString> name)
+ : Data(parentId ? std::make_unique<TData>(TInstant::Now(), GetCycleCount(), parentId.Span(verbosity)) : nullptr)
{
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
if (!parentId.IsRoot()) {
Data->Span.set_parent_span_id(parentId.GetSpanIdPtr(), parentId.GetSpanIdSize());
}
- Data->Span.set_start_time_unix_nano(now.NanoSeconds());
+ Data->Span.set_start_time_unix_nano(Data->StartTime.NanoSeconds());
+ Data->Span.set_kind(opentelemetry::proto::trace::v1::Span::SPAN_KIND_INTERNAL);
if (name) {
Name(std::move(*name));
}
+
+ Attribute("node_id", NActors::TActivationContext::ActorSystem()->NodeId);
+ }
+ }
+
+ ~TSpan() {
+ if (Y_UNLIKELY(*this)) {
+ EndError("unterminated span");
}
}
TSpan& operator =(const TSpan&) = delete;
- TSpan& operator =(TSpan&&) = default;
- operator bool() const {
- return static_cast<bool>(Data);
+ TSpan& operator =(TSpan&& other) {
+ if (this != &other) {
+ if (Y_UNLIKELY(*this)) {
+ EndError("TSpan instance incorrectly overwritten");
+ }
+ Data = std::exchange(other.Data, nullptr);
+ }
+ return *this;
+ }
+
+ explicit operator bool() const {
+ return Data && !Data->Sent;
+ }
+
+ TSpan& Relation(ERelation /*relation*/) {
+ if (Y_UNLIKELY(*this)) {
+ // update relation in data somehow
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
+ }
+ return *this;
}
TSpan& Name(TString name) {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
Data->Span.set_name(std::move(name));
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
return *this;
}
TSpan& Attribute(TString name, TAttributeValue value) {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
SerializeKeyValue(std::move(name), std::move(value), Data->Span.add_attributes());
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
return *this;
}
TSpan& Event(TString name, TKeyValueList attributes) {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
auto *event = Data->Span.add_events();
event->set_time_unix_nano(TimeUnixNano());
event->set_name(std::move(name));
for (auto&& [key, value] : attributes) {
SerializeKeyValue(std::move(key), std::move(value), event->add_attributes());
}
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
return *this;
}
TSpan& Link(const TTraceId& traceId, TKeyValueList attributes) {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
auto *link = Data->Span.add_links();
link->set_trace_id(traceId.GetTraceIdPtr(), traceId.GetTraceIdSize());
link->set_span_id(traceId.GetSpanIdPtr(), traceId.GetSpanIdSize());
for (auto&& [key, value] : attributes) {
SerializeKeyValue(std::move(key), std::move(value), link->add_attributes());
}
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
return *this;
}
void EndOk() {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
auto *status = Data->Span.mutable_status();
status->set_code(NTraceProto::Status::STATUS_CODE_OK);
+ End();
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
- End();
}
void EndError(TString error) {
- if (*this) {
+ if (Y_UNLIKELY(*this)) {
auto *status = Data->Span.mutable_status();
status->set_code(NTraceProto::Status::STATUS_CODE_ERROR);
status->set_message(std::move(error));
+ End();
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
- End();
}
void End() {
- if (*this) {
- Data->Span.set_end_time_unix_nano(TimeUnixNano());
+ if (Y_UNLIKELY(*this)) {
Data->Span.set_trace_id(Data->TraceId.GetTraceIdPtr(), Data->TraceId.GetTraceIdSize());
Data->Span.set_span_id(Data->TraceId.GetSpanIdPtr(), Data->TraceId.GetSpanIdSize());
+ Data->Span.set_end_time_unix_nano(TimeUnixNano());
Send();
- Data.reset(); // tracing finished
+ } else {
+ Y_VERIFY_DEBUG(!Data, "span has been ended");
}
}
- operator TTraceId() const {
+ TTraceId GetTraceId() const {
return Data ? TTraceId(Data->TraceId) : TTraceId();
}
diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h
index e3631a2403d..8c1fe05b082 100644
--- a/library/cpp/actors/wilson/wilson_trace.h
+++ b/library/cpp/actors/wilson/wilson_trace.h
@@ -1,5 +1,8 @@
#pragma once
+#include <library/cpp/actors/core/monotonic.h>
+#include <library/cpp/actors/protos/actors.pb.h>
+
#include <library/cpp/string_utils/base64/base64.h>
#include <util/stream/output.h>
@@ -28,6 +31,11 @@ namespace NWilson {
TTraceId(TTrace traceId, ui64 spanId, ui8 verbosity, ui32 timeToLive)
: TraceId(traceId)
{
+ if (timeToLive == Max<ui32>()) {
+ timeToLive = 4095;
+ }
+ Y_VERIFY(verbosity <= 15);
+ Y_VERIFY(timeToLive <= 4095);
SpanId = spanId;
Verbosity = verbosity;
TimeToLive = timeToLive;
@@ -86,7 +94,7 @@ namespace NWilson {
, Raw(other.Raw)
{
other.TraceId.fill(0);
- other.SpanId = 0;
+ other.SpanId = 1; // make it explicitly invalid
other.Raw = 0;
}
@@ -95,7 +103,10 @@ namespace NWilson {
: TraceId(other.TraceId)
, SpanId(other.SpanId)
, Raw(other.Raw)
- {}
+ {
+ // validate trace id only when we are making a copy
+ other.Validate();
+ }
TTraceId(const TSerializedTraceId& in) {
const char *p = in;
@@ -108,6 +119,17 @@ namespace NWilson {
Y_VERIFY_DEBUG(p - in == sizeof(TSerializedTraceId));
}
+ TTraceId(const NActorsProto::TTraceId& pb)
+ : TTraceId()
+ {
+ if (pb.HasData()) {
+ const auto& data = pb.GetData();
+ if (data.size() == sizeof(TSerializedTraceId)) {
+ *this = *reinterpret_cast<const TSerializedTraceId*>(data.data());
+ }
+ }
+ }
+
void Serialize(TSerializedTraceId *out) const {
char *p = *out;
memcpy(p, TraceId.data(), sizeof(TraceId));
@@ -119,13 +141,21 @@ namespace NWilson {
Y_VERIFY_DEBUG(p - *out == sizeof(TSerializedTraceId));
}
+ void Serialize(NActorsProto::TTraceId *pb) const {
+ if (*this) {
+ TSerializedTraceId data;
+ Serialize(&data);
+ pb->SetData(reinterpret_cast<const char*>(&data), sizeof(data));
+ }
+ }
+
TTraceId& operator=(TTraceId&& other) {
if (this != &other) {
TraceId = other.TraceId;
SpanId = other.SpanId;
Raw = other.Raw;
other.TraceId.fill(0);
- other.SpanId = 0;
+ other.SpanId = 1; // make it explicitly invalid
other.Raw = 0;
}
return *this;
@@ -138,11 +168,25 @@ namespace NWilson {
return TTraceId(GenerateTraceId(), 0, verbosity, timeToLive);
}
+ static TTraceId NewTraceIdThrottled(ui8 verbosity, ui32 timeToLive, std::atomic<NActors::TMonotonic>& counter,
+ NActors::TMonotonic now, TDuration periodBetweenSamples) {
+ static_assert(std::atomic<NActors::TMonotonic>::is_always_lock_free);
+ for (;;) {
+ NActors::TMonotonic ts = counter.load();
+ if (now < ts) {
+ return {};
+ } else if (counter.compare_exchange_strong(ts, now + periodBetweenSamples)) {
+ return NewTraceId(verbosity, timeToLive);
+ }
+ }
+ }
+
static TTraceId NewTraceId() { // NBS stub
return TTraceId();
}
TTraceId Span(ui8 verbosity) const {
+ Validate();
return *this && TimeToLive && verbosity <= Verbosity
? TTraceId(TraceId, GenerateSpanId(), Verbosity, TimeToLive - 1)
: TTraceId();
@@ -174,10 +218,13 @@ namespace NWilson {
const void *GetSpanIdPtr() const { return &SpanId; }
static constexpr size_t GetSpanIdSize() { return sizeof(ui64); }
+ void Validate() const {
+ Y_VERIFY_DEBUG(*this || !SpanId);
+ }
+
// for compatibility with NBS
TTraceId Clone() const { return NWilson::TTraceId(*this); }
ui64 GetTraceId() const { return 0; }
void OutputSpanId(IOutputStream&) const {}
};
-
}
diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp
index e87776717bb..6b4ef92b812 100644
--- a/library/cpp/actors/wilson/wilson_uploader.cpp
+++ b/library/cpp/actors/wilson/wilson_uploader.cpp
@@ -1,9 +1,11 @@
#include "wilson_uploader.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
#include <library/cpp/actors/wilson/protos/service.pb.h>
#include <library/cpp/actors/wilson/protos/service.grpc.pb.h>
#include <util/stream/file.h>
+#include <util/string/hex.h>
#include <grpc++/grpc++.h>
#include <chrono>
@@ -32,6 +34,8 @@ namespace NWilson {
NServiceProto::ExportTraceServiceResponse Response;
grpc::Status Status;
+ bool WakeupScheduled = false;
+
public:
TWilsonUploader(TString host, ui16 port, TString rootCA)
: Host(std::move(host))
@@ -50,6 +54,8 @@ namespace NWilson {
.pem_root_certs = TFileInput(RootCA).ReadAll(),
}));
Stub = NServiceProto::TraceService::NewStub(Channel);
+
+ LOG_INFO_S(*TlsActivationContext, 430, "TWilsonUploader::Bootstrap");
}
void Handle(TEvWilson::TPtr ev) {
@@ -67,6 +73,17 @@ namespace NWilson {
void SendRequest() {
Y_VERIFY(!Reader && !Context);
Context = std::make_unique<grpc::ClientContext>();
+ for (const auto& rs : Request.resource_spans()) {
+ for (const auto& ss : rs.scope_spans()) {
+ for (const auto& s : ss.spans()) {
+ LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span"
+ << " TraceId# " << HexEncode(s.trace_id())
+ << " SpanId# " << HexEncode(s.span_id())
+ << " ParentSpanId# " << HexEncode(s.parent_span_id())
+ << " Name# " << s.name());
+ }
+ }
+ }
Reader = Stub->AsyncExport(Context.get(), std::exchange(Request, {}), &CQ);
Reader->Finish(&Response, &Status, nullptr);
}
@@ -76,18 +93,33 @@ namespace NWilson {
void *tag;
bool ok;
if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) {
+ if (!Status.ok()) {
+ LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */,
+ "failed to commit traces: " << Status.error_message());
+ }
+
Reader.reset();
Context.reset();
if (Request.resource_spans_size()) {
SendRequest();
}
+ } else if (!WakeupScheduled) {
+ WakeupScheduled = true;
+ Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
}
}
}
+ void HandleWakeup() {
+ Y_VERIFY(WakeupScheduled);
+ WakeupScheduled = false;
+ CheckIfDone();
+ }
+
STRICT_STFUNC(StateFunc,
hFunc(TEvWilson, Handle);
+ cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
);
};
diff --git a/ydb/core/base/wilson.h b/ydb/core/base/wilson.h
new file mode 100644
index 00000000000..1d3f7d2680e
--- /dev/null
+++ b/ydb/core/base/wilson.h
@@ -0,0 +1,14 @@
+#pragma once
+
+namespace NKikimr {
+
+ struct TWilson {
+ enum {
+ BlobStorage = 8, // DS proxy and lower levels
+ DsProxyInternals = 7,
+ VDiskTopLevel = 6,
+ VDiskInternals = 5,
+ };
+ };
+
+} // NKikimr
diff --git a/ydb/core/blobstorage/backpressure/defs.h b/ydb/core/blobstorage/backpressure/defs.h
index d179e1ba76f..6fc9da407f3 100644
--- a/ydb/core/blobstorage/backpressure/defs.h
+++ b/ydb/core/blobstorage/backpressure/defs.h
@@ -10,6 +10,7 @@
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
#include <ydb/core/protos/blobstorage.pb.h>
#include <ydb/core/base/interconnect_channels.h>
+#include <ydb/core/base/wilson.h>
#include <library/cpp/actors/core/interconnect.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/actors/core/hfunc.h>
diff --git a/ydb/core/blobstorage/backpressure/queue.cpp b/ydb/core/blobstorage/backpressure/queue.cpp
index cead08e1b85..e768d2d8417 100644
--- a/ydb/core/blobstorage/backpressure/queue.cpp
+++ b/ydb/core/blobstorage/backpressure/queue.cpp
@@ -202,7 +202,7 @@ void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& re
{"VDiskOrderNumber", vdiskOrderNumber}
}});
item.Event.SendToVDisk(ctx, remoteVDisk, item.QueueCookie, item.MsgId, item.SequenceId, sendMeCostSettings,
- item.Span, ClientId, item.ProcessingTimer);
+ item.Span.GetTraceId(), ClientId, item.ProcessingTimer);
// update counters as far as item got sent
++NextMsgId;
@@ -219,6 +219,8 @@ void TBlobStorageQueue::ReplyWithError(TItem& item, NKikimrProto::EReplyStatus s
<< " processingTime# " << processingTime);
item.Span.EndError(TStringBuilder() << NKikimrProto::EReplyStatus_Name(status) << ": " << errorReason);
+ item.Span = {};
+
ctx.Send(item.Event.GetSender(), item.Event.MakeErrorReply(status, errorReason, QueueDeserializedItems,
QueueDeserializedBytes), 0, item.Event.GetCookie());
@@ -249,7 +251,8 @@ bool TBlobStorageQueue::OnResponse(ui64 msgId, ui64 sequenceId, ui64 cookie, TAc
it->Event.GetByteSize(), !relevant);
InFlightLookup.erase(lookupIt);
- it->Span.EndOk();
+ auto span = std::exchange(it->Span, {});
+ span.EndOk();
EraseItem(Queues.InFlight, it);
// unpause execution when InFlight queue gets empty
diff --git a/ydb/core/blobstorage/backpressure/queue.h b/ydb/core/blobstorage/backpressure/queue.h
index ef0903d4971..a855eef33c9 100644
--- a/ydb/core/blobstorage/backpressure/queue.h
+++ b/ydb/core/blobstorage/backpressure/queue.h
@@ -59,11 +59,10 @@ class TBlobStorageQueue {
const ::NMonitoring::TDynamicCounters::TCounterPtr& serItems,
const ::NMonitoring::TDynamicCounters::TCounterPtr& serBytes,
const TBSProxyContextPtr& bspctx, ui32 interconnectChannel,
- bool local, TInstant now)
+ bool local)
: Queue(EItemQueue::NotSet)
, CostEssence(*event->Get())
- , Span(9 /*verbosity*/, NWilson::ERelation::ChildOf, std::move(event->TraceId), now, TStringBuilder()
- << "Backpressure(" << TypeName<TEvent>() << ")")
+ , Span(TWilson::VDiskTopLevel, std::move(event->TraceId), "Backpressure.InFlight")
, Event(event, serItems, serBytes, bspctx, interconnectChannel, local)
, MsgId(Max<ui64>())
, SequenceId(0)
@@ -71,7 +70,11 @@ class TBlobStorageQueue {
, QueueCookie(RandomNumber<ui64>())
, Cost(0)
, DirtyCost(true)
- {}
+ {
+ Span
+ .Attribute("event", TypeName<TEvent>())
+ .Attribute("local", local);
+ }
~TItem() {
Y_VERIFY(Queue == EItemQueue::NotSet, "Queue# %" PRIu32, ui32(Queue));
@@ -197,13 +200,13 @@ public:
void OnConnect();
template<typename TPtr>
- void Enqueue(const TActorContext &ctx, TPtr& event, TInstant deadline, bool local, TInstant now) {
+ void Enqueue(const TActorContext &ctx, TPtr& event, TInstant deadline, bool local) {
Y_UNUSED(ctx);
TItemList::iterator newIt;
if (Queues.Unused.empty()) {
newIt = Queues.Waiting.emplace(Queues.Waiting.end(), event, deadline,
- QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local, now);
+ QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local);
++*QueueSize;
} else {
newIt = Queues.Unused.begin();
@@ -212,7 +215,7 @@ public:
TItem& item = *newIt;
item.~TItem();
new(&item) TItem(event, deadline, QueueSerializedItems, QueueSerializedBytes, BSProxyCtx,
- InterconnectChannel, local, now);
+ InterconnectChannel, local);
}
newIt->Iterator = newIt;
diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
index 84337e1eb4f..43625bd5ff4 100644
--- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
+++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
@@ -201,7 +201,7 @@ private:
<< " cookie# " << ev->Cookie);
if (IsReady()) {
- Queue.Enqueue(ctx, ev, deadline, RemoteVDisk.NodeId() == SelfId().NodeId(), TActivationContext::Now());
+ Queue.Enqueue(ctx, ev, deadline, RemoteVDisk.NodeId() == SelfId().NodeId());
Pump(ctx);
UpdateRequestTrackingStats(ctx);
} else {
diff --git a/ydb/core/blobstorage/backpressure/ut_client/skeleton_front_mock.h b/ydb/core/blobstorage/backpressure/ut_client/skeleton_front_mock.h
index 290c529ea56..66805e3a1dd 100644
--- a/ydb/core/blobstorage/backpressure/ut_client/skeleton_front_mock.h
+++ b/ydb/core/blobstorage/backpressure/ut_client/skeleton_front_mock.h
@@ -79,7 +79,7 @@ public:
Reply(*ev, new TEvBlobStorage::TEvVPutResult(NKikimrProto::NOTREADY,
blobId, vdiskId, record.HasCookie() ? &cookie : nullptr,
TOutOfSpaceStatus(0, 0), TActivationContext::Now(), ev->Get()->GetCachedByteSize(),
- &record, nullptr, nullptr, nullptr, record.GetBuffer().size(), {}, 0, TString()));
+ &record, nullptr, nullptr, nullptr, record.GetBuffer().size(), 0, TString()));
if (record.GetNotifyIfNotReady()) {
Notify.insert(ev->Sender);
}
@@ -91,7 +91,7 @@ public:
std::unique_ptr<TEvBlobStorage::TEvVPutResult> reply(new TEvBlobStorage::TEvVPutResult(NKikimrProto::OK,
blobId, vdiskId, record.HasCookie() ? &cookie : nullptr,
TOutOfSpaceStatus(0, 0), TActivationContext::Now(), ev->Get()->GetCachedByteSize(),
- &record, nullptr, nullptr, nullptr, record.GetBuffer().size(), {}, 0, TString()));
+ &record, nullptr, nullptr, nullptr, record.GetBuffer().size(), 0, TString()));
auto *qos = reply->Record.MutableMsgQoS();
LOG_DEBUG_S(*TlsActivationContext, NActorsServices::TEST, "Received " << SingleLineProto(*qos));
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h
index 451e7fa3154..78e7039aa67 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy.h
@@ -18,6 +18,7 @@
#include <library/cpp/actors/wilson/wilson_span.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/group_stat.h>
+#include <ydb/core/base/wilson.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <util/generic/hash_set.h>
@@ -146,7 +147,7 @@ public:
, Mon(std::move(mon))
, PoolCounters(storagePoolCounters)
, LogCtx(logComponent, logAccEnabled)
- , Span(8 /*verbosity*/, NWilson::ERelation::ChildOf, std::move(traceId), now, std::move(name))
+ , Span(TWilson::BlobStorage, std::move(traceId), std::move(name))
, RestartCounter(restartCounter)
, Source(source)
, Cookie(cookie)
@@ -261,7 +262,7 @@ public:
Y_VERIFY_DEBUG(RestartCounter < 100);
auto q = self.RestartQuery(RestartCounter + 1);
++*Mon->NodeMon->RestartHisto[Min<size_t>(Mon->NodeMon->RestartHisto.size() - 1, RestartCounter)];
- TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, Span));
+ TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, Span.GetTraceId()));
PassAway();
return true;
}
@@ -321,7 +322,7 @@ public:
if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus>) {
event->MessageRelevanceTracker = MessageRelevanceTracker;
}
- const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, Span,
+ const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, Span.GetTraceId(),
timeStatsEnabled);
++RequestsInFlight;
}
@@ -407,7 +408,8 @@ public:
TActorBootstrapped<TDerived>::PassAway();
}
- void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, ui64 cookie) {
+ void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, ui64 cookie,
+ bool term = true) {
const TInstant now = TActivationContext::Now();
NKikimrProto::EReplyStatus status;
@@ -457,14 +459,16 @@ public:
static_cast<TEvBlobStorage::TEvGetResult&>(*ev).Sent = now;
}
- if (status == NKikimrProto::OK) {
- Span.EndOk();
- } else {
- Span.EndError(std::move(errorReason));
+ if (term) {
+ if (status == NKikimrProto::OK) {
+ Span.EndOk();
+ } else {
+ Span.EndError(std::move(errorReason));
+ }
}
// send the reply to original request sender
- Derived().Send(source, ev.release(), 0, cookie, Span);
+ Derived().Send(source, ev.release(), 0, cookie);
};
void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats = nullptr) {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
index 344fde247ef..b093cfb2e3a 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
@@ -347,10 +347,10 @@ void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &i
void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer,
TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks,
- ui8 blobIdx) {
+ NWilson::TSpan *span, ui8 blobIdx) {
Y_VERIFY(diskOrderNumber < DiskRequestsForOrderNumber.size());
DiskRequestsForOrderNumber[diskOrderNumber].PutsToSend.emplace_back(id, buffer, putReason, isHandoff,
- extraBlockChecks, blobIdx);
+ extraBlockChecks, span, blobIdx);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -367,11 +367,7 @@ void TBlackboard::AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize) {
ui64 size = (inSize ? Min(ui64(inSize), maxSize) : maxSize);
//Cerr << "size " << size << " shift " << shift << Endl;
if (size > 0) {
- TBlobState &state = BlobStates[id];;
- if (!bool(state.Id)) {
- state.Init(id, *Info);
- }
- state.AddNeeded(shift, size);
+ (*this)[id].AddNeeded(shift, size);
} else {
TStringStream str;
str << "It is impossible to read 0 bytes! Do not send such requests.";
@@ -380,30 +376,18 @@ void TBlackboard::AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize) {
}
}
-void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData,
- std::vector<std::pair<ui64, ui32>> *extraBlockChecks) {
+void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData) {
Y_VERIFY(bool(id));
Y_VERIFY(id.PartId() == 0);
Y_VERIFY(id.BlobSize() != 0);
- TBlobState &state = BlobStates[id];
- if (!state.Id) {
- state.Init(id, *Info);
- state.ExtraBlockChecks = extraBlockChecks;
- } else {
- Y_VERIFY(state.ExtraBlockChecks == extraBlockChecks);
- }
- state.AddPartToPut(partIdx, partData);
+ (*this)[id].AddPartToPut(partIdx, partData);
}
void TBlackboard::MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx) {
Y_VERIFY(bool(id));
Y_VERIFY(id.PartId() == 0);
Y_VERIFY(id.BlobSize() != 0);
- TBlobState &state = BlobStates[id];;
- if (!bool(state.Id)) {
- state.Init(id, *Info);
- }
- state.MarkBlobReadyToPut(blobIdx);
+ (*this)[id].MarkBlobReadyToPut(blobIdx);
}
void TBlackboard::MoveBlobStateToDone(const TLogoBlobID &id) {
@@ -581,6 +565,30 @@ void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, T
}
}
+void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, std::vector<std::pair<ui64, ui32>> *extraBlockChecks,
+ NWilson::TSpan *span) {
+ TBlobState& state = (*this)[id];
+ if (!state.ExtraBlockChecks) {
+ state.ExtraBlockChecks = extraBlockChecks;
+ } else {
+ Y_VERIFY(state.ExtraBlockChecks == extraBlockChecks);
+ }
+ if (!state.Span) {
+ state.Span = span;
+ } else {
+ Y_VERIFY(state.Span == span);
+ }
+}
+
+TBlobState& TBlackboard::operator [](const TLogoBlobID& id) {
+ const auto [it, inserted] = BlobStates.try_emplace(id);
+ TBlobState& state = it->second;
+ if (inserted) {
+ state.Init(id, *Info);
+ }
+ return state;
+}
+
TString TBlackboard::ToString() const {
TStringStream str;
str << "{BlobStates size# " << BlobStates.size();
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
index cbffa37eb0c..cfd0eb64999 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
@@ -82,6 +82,7 @@ struct TBlobState {
bool IsChanged = false;
bool IsDone = false;
std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks = nullptr;
+ NWilson::TSpan *Span = nullptr;
void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info);
void AddNeeded(ui64 begin, ui64 size);
@@ -129,15 +130,17 @@ struct TDiskPutRequest {
bool IsHandoff;
ui8 BlobIdx;
std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks;
+ NWilson::TSpan *Span;
TDiskPutRequest(const TLogoBlobID &id, TString buffer, EPutReason reason, bool isHandoff,
- std::vector<std::pair<ui64, ui32>> *extraBlockChecks, ui8 blobIdx = 0)
+ std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span, ui8 blobIdx)
: Id(id)
, Buffer(std::move(buffer))
, Reason(reason)
, IsHandoff(isHandoff)
, BlobIdx(blobIdx)
, ExtraBlockChecks(extraBlockChecks)
+ , Span(span)
{}
};
@@ -156,7 +159,7 @@ struct TGroupDiskRequests {
void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift, const ui32 size);
void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer,
TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks,
- ui8 blobIdx = 0);
+ NWilson::TSpan *span, ui8 blobIdx);
};
struct TBlackboard;
@@ -199,7 +202,7 @@ struct TBlackboard {
{}
void AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize);
- void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData, std::vector<std::pair<ui64, ui32>> *extraBlockChecks);
+ void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData);
void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0);
void MoveBlobStateToDone(const TLogoBlobID &id);
void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data);
@@ -220,6 +223,10 @@ struct TBlackboard {
blob.IsChanged = true;
}
}
+
+ void RegisterBlobForPut(const TLogoBlobID& id, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span);
+
+ TBlobState& operator [](const TLogoBlobID& id);
};
}//NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
index dad2ecd745f..16a4730e535 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
@@ -602,7 +602,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB
getRequest->IsInternal = true;
getRequest->TabletId = TabletId;
getRequest->AcquireBlockedGeneration = true;
- bool isSent = SendToBSProxy(SelfId(), Info->GroupID, getRequest.release(), 0, Span);
+ bool isSent = SendToBSProxy(SelfId(), Info->GroupID, getRequest.release(), 0, Span.GetTraceId());
Y_VERIFY(isSent);
TotalSent++;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
index 9729b909811..283f6049d5e 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
@@ -353,7 +353,7 @@ void TGetImpl::PrepareVPuts(TLogContext &logCtx,
}
bytes += put.Buffer.size();
lastItemCount++;
- vMultiPut->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks);
+ vMultiPut->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks, NWilson::TTraceId()); // FIXME: trace
}
vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests));
++VMultiPutRequests;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp
index dbf547ae745..959f64ff16b 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp
@@ -2,6 +2,8 @@
namespace NKikimr {
+ std::atomic<TMonotonic> TBlobStorageGroupProxy::ThrottlingTimestamp;
+
TBlobStorageGroupProxy::TBlobStorageGroupProxy(TIntrusivePtr<TBlobStorageGroupInfo>&& info, bool forceWaitAllDrives,
TIntrusivePtr<TDsProxyNodeMon> &nodeMon, TIntrusivePtr<TStoragePoolCounters>&& storagePoolCounters,
const TControlWrapper &enablePutBatching, const TControlWrapper &enableVPatch)
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
index 324f7d1d1f7..e03405a42c3 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
@@ -52,6 +52,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
}
};
+ static std::atomic<TMonotonic> ThrottlingTimestamp;
+
const ui32 GroupId;
TIntrusivePtr<TBlobStorageGroupInfo> Info;
std::shared_ptr<TBlobStorageGroupInfo::TTopology> Topology;
@@ -219,7 +221,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
if (rate) {
const ui64 num = RandomNumber<ui64>(1000000); // in range [0, 1000000)
if (num < rate) {
- ev->TraceId = NWilson::TTraceId::NewTraceId(15, 4095);
+ ev->TraceId = NWilson::TTraceId::NewTraceIdThrottled(15, Max<ui32>(), ThrottlingTimestamp,
+ TMonotonic::Now(), TDuration::Seconds(1));
}
}
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
index 80405d83b15..fa59aff5c1e 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp
@@ -155,7 +155,7 @@ public:
R_LOG_DEBUG_S("BPMC3", "SendRequest idx# " << idx
<< " isLast# " << isLast
<< " ev# " << ev->ToString());
- SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span);
+ SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span.GetTraceId());
if (isLast) {
CollectRequestsInFlight++;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
index dd4ab92d589..7b73f153a35 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp
@@ -122,7 +122,7 @@ public:
void SendRequests() {
for (; RequestsInFlight < MaxRequestsInFlight && !PendingGets.empty(); ++RequestsInFlight, PendingGets.pop_front()) {
auto& [ev, cookie] = PendingGets.front();
- SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span);
+ SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span.GetTraceId());
}
if (!RequestsInFlight && PendingGets.empty()) {
auto ev = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, 0, Info->GroupID);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
index b2462144952..9dbba7dd853 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
@@ -172,7 +172,7 @@ public:
std::unique_ptr<TEvBlobStorage::TEvPut> put = std::make_unique<TEvBlobStorage::TEvPut>(PatchedId, Buffer, Deadline,
NKikimrBlobStorage::AsyncBlob, TEvBlobStorage::TEvPut::TacticDefault);
put->Orbit = std::move(Orbit);
- Send(ProxyActorId, put.release(), 0, OriginalId.Hash(), Span);
+ Send(ProxyActorId, put.release(), 0, OriginalId.Hash(), Span.GetTraceId());
}
void Handle(TEvBlobStorage::TEvPutResult::TPtr &ev) {
@@ -528,9 +528,9 @@ public:
NKikimrBlobStorage::AsyncRead);
get->Orbit = std::move(Orbit);
if (OriginalGroupId == Info->GroupID) {
- Send(ProxyActorId, get.release(), 0, PatchedId.Hash(), Span);
+ Send(ProxyActorId, get.release(), 0, PatchedId.Hash(), Span.GetTraceId());
} else {
- SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), Span);
+ SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), Span.GetTraceId());
}
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
index e2b717d13f6..db13013f02b 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
@@ -36,38 +36,12 @@ struct TEvAccelerate : public TEventLocal<TEvAccelerate, TEvBlobStorage::EvAccel
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobStorageGroupPutRequest> {
- struct TMultiPutItemInfo {
- TLogoBlobID BlobId;
- TString Buffer;
- ui64 BufferSize;
- TActorId Recipient;
- ui64 Cookie;
- NWilson::TTraceId TraceId;
- NLWTrace::TOrbit Orbit;
- bool Replied = false;
- std::vector<std::pair<ui64, ui32>> ExtraBlockChecks;
-
- TMultiPutItemInfo(TLogoBlobID id, const TString& buffer, TActorId recipient, ui64 cookie,
- NWilson::TTraceId traceId, NLWTrace::TOrbit &&orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks)
- : BlobId(id)
- , Buffer(buffer)
- , BufferSize(buffer.size())
- , Recipient(recipient)
- , Cookie(cookie)
- , TraceId(std::move(traceId))
- , Orbit(std::move(orbit))
- , ExtraBlockChecks(std::move(extraBlockChecks))
- {}
- };
-
TPutImpl PutImpl;
TRootCause RootCauseTrack;
TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount;
ui64 WaitingVDiskCount = 0;
- TBatchedVec<TMultiPutItemInfo> ItemsInfo;
-
bool IsManyPuts = false;
TInstant Deadline;
@@ -164,10 +138,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}
WaitingVDiskResponseCount[vdisk]--;
- Y_VERIFY(idx < ItemsInfo.size());
- Y_VERIFY(origBlobId == ItemsInfo[idx].BlobId);
+ Y_VERIFY(idx < PutImpl.Blobs.size());
+ Y_VERIFY(origBlobId == PutImpl.Blobs[idx].BlobId);
if (TimeStatsEnabled && record.GetMsgQoS().HasExecTimeStats()) {
- TimeStats.ApplyPut(ItemsInfo[idx].BufferSize, record.GetMsgQoS().GetExecTimeStats());
+ TimeStats.ApplyPut(PutImpl.Blobs[idx].BufferSize, record.GetMsgQoS().GetExecTimeStats());
}
Y_VERIFY(record.HasVDiskID());
@@ -314,7 +288,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}
Y_VERIFY(RequestsSent > ResponsesReceived, "RequestsSent# %" PRIu64 " ResponsesReceived# %" PRIu64
" ResponsesSent# %" PRIu64 " BlobsCount# %" PRIu64 " TPutImpl# %s", ui64(RequestsSent), ui64(ResponsesReceived),
- (ui64)ResponsesSent, (ui64)ItemsInfo.size(), PutImpl.DumpFullState().data());
+ (ui64)ResponsesSent, (ui64)PutImpl.Blobs.size(), PutImpl.DumpFullState().data());
if (!IsAccelerateScheduled && !IsAccelerated) {
if (WaitingVDiskCount == 1 && RequestsSent > 1) {
@@ -343,10 +317,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
bool ReplyAndDieWithLastResponse(TPutImpl::TPutResultVec &putResults) {
for (auto& [blobIdx, result] : putResults) {
- Y_VERIFY(ResponsesSent != ItemsInfo.size());
+ Y_VERIFY(ResponsesSent != PutImpl.Blobs.size());
SendReply(result, blobIdx);
}
- if (ResponsesSent == ItemsInfo.size()) {
+ if (ResponsesSent == PutImpl.Blobs.size()) {
PassAway();
return true;
}
@@ -357,14 +331,14 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
NKikimrProto::EReplyStatus status = putResult->Status;
A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_DEBUG : NLog::PRI_NOTICE, "BPP21",
"SendReply putResult# " << putResult->ToString() << " ResponsesSent# " << ResponsesSent
- << " ItemsInfo.size# " << ItemsInfo.size()
- << " Last# " << (ResponsesSent + 1 == ItemsInfo.size() ? "true" : "false"));
+ << " PutImpl.Blobs.size# " << PutImpl.Blobs.size()
+ << " Last# " << (ResponsesSent + 1 == PutImpl.Blobs.size() ? "true" : "false"));
const TDuration duration = TActivationContext::Now() - StartTime;
TLogoBlobID blobId = putResult->Id;
TLogoBlobID origBlobId = TLogoBlobID(blobId, 0);
- Mon->CountPutPesponseTime(Info->GetDeviceType(), HandleClass, ItemsInfo[blobIdx].BufferSize, duration);
+ Mon->CountPutPesponseTime(Info->GetDeviceType(), HandleClass, PutImpl.Blobs[blobIdx].BufferSize, duration);
*Mon->ActivePutCapacity -= ReportedBytes;
- Y_VERIFY(PutImpl.GetHandoffPartsSent() <= Info->Type.TotalPartCount() * MaxHandoffNodes * ItemsInfo.size());
+ Y_VERIFY(PutImpl.GetHandoffPartsSent() <= Info->Type.TotalPartCount() * MaxHandoffNodes * PutImpl.Blobs.size());
++*Mon->HandoffPartsSent[Min(Mon->HandoffPartsSent.size() - 1, (size_t)PutImpl.GetHandoffPartsSent())];
ReportedBytes = 0;
const bool success = (status == NKikimrProto::OK);
@@ -374,27 +348,32 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
blobId.TabletID(), Info->GroupID, blobId.Channel(),
NKikimrBlobStorage::EPutHandleClass_Name(HandleClass), success);
ResponsesSent++;
- Y_VERIFY(ResponsesSent <= ItemsInfo.size());
- RootCauseTrack.RenderTrack(ItemsInfo[blobIdx].Orbit);
- LWTRACK(DSProxyPutReply, ItemsInfo[blobIdx].Orbit);
- putResult->Orbit = std::move(ItemsInfo[blobIdx].Orbit);
+ Y_VERIFY(ResponsesSent <= PutImpl.Blobs.size());
+ RootCauseTrack.RenderTrack(PutImpl.Blobs[blobIdx].Orbit);
+ LWTRACK(DSProxyPutReply, PutImpl.Blobs[blobIdx].Orbit);
+ putResult->Orbit = std::move(PutImpl.Blobs[blobIdx].Orbit);
if (!IsManyPuts) {
SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr);
} else {
+ if (putResult->Status == NKikimrProto::OK) {
+ PutImpl.Blobs[blobIdx].Span.EndOk();
+ } else {
+ PutImpl.Blobs[blobIdx].Span.EndError(putResult->ErrorReason);
+ }
SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr,
- ItemsInfo[blobIdx].Recipient, ItemsInfo[blobIdx].Cookie); // FIXME about traces
- ItemsInfo[blobIdx].Replied = true;
+ PutImpl.Blobs[blobIdx].Recipient, PutImpl.Blobs[blobIdx].Cookie, false);
+ PutImpl.Blobs[blobIdx].Replied = true;
}
}
TString BlobIdSequenceToString() const {
TStringBuilder blobIdsStr;
blobIdsStr << '[';
- for (ui64 blobIdx = 0; blobIdx < ItemsInfo.size(); ++blobIdx) {
+ for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
if (blobIdx) {
blobIdsStr << ' ';
}
- blobIdsStr << ItemsInfo[blobIdx].BlobId.ToString();
+ blobIdsStr << PutImpl.Blobs[blobIdx].BlobId.ToString();
}
return blobIdsStr << ']';
}
@@ -402,7 +381,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
std::unique_ptr<IEventBase> RestartQuery(ui32 counter) {
++*Mon->NodeMon->RestartPut;
auto ev = std::make_unique<TEvBlobStorage::TEvBunchOfEvents>();
- for (auto& item : ItemsInfo) {
+ for (auto& item : PutImpl.Blobs) {
if (item.Replied) {
continue;
}
@@ -419,7 +398,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
0 /*flags*/,
item.Cookie,
nullptr /*forwardOnNondelivery*/,
- std::move(item.TraceId)
+ item.Span.GetTraceId()
));
put->RestartCounter = counter;
}
@@ -450,7 +429,7 @@ public:
: TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
NKikimrServices::BS_PROXY_PUT, false, latencyQueueKind, now, storagePoolCounters,
ev->RestartCounter, "DSProxy.Put")
- , PutImpl(info, state, ev, mon, enableRequestMod3x3ForMinLatecy)
+ , PutImpl(info, state, ev, mon, enableRequestMod3x3ForMinLatecy, source, cookie, Span.GetTraceId())
, WaitingVDiskResponseCount(info->GetTotalVDisksNum())
, Deadline(ev->Deadline)
, StartTime(now)
@@ -467,11 +446,7 @@ public:
if (ev->Orbit.HasShuttles()) {
RootCauseTrack.IsOn = true;
}
- ItemsInfo.emplace_back(ev->Id, ev->Buffer, source, cookie, NWilson::TTraceId(), std::move(ev->Orbit),
- std::move(ev->ExtraBlockChecks));
- LWPROBE(DSProxyBlobPutTactics, ItemsInfo[0].BlobId.TabletID(), Info->GroupID, ItemsInfo[0].BlobId.ToString(),
- Tactic, NKikimrBlobStorage::EPutHandleClass_Name(HandleClass));
- ReportBytes(ItemsInfo[0].Buffer.capacity() + sizeof(*this));
+ ReportBytes(PutImpl.Blobs[0].Buffer.capacity() + sizeof(*this));
RequestBytes = ev->Buffer.size();
RequestHandleClass = HandleClassToHandleClass(HandleClass);
@@ -521,21 +496,10 @@ public:
if (!msg.ExtraBlockChecks.empty()) {
RequireExtraBlockChecks = true;
}
- ItemsInfo.emplace_back(
- msg.Id,
- msg.Buffer,
- ev->Sender,
- ev->Cookie,
- std::move(ev->TraceId),
- std::move(msg.Orbit),
- std::move(msg.ExtraBlockChecks)
- );
- LWPROBE(DSProxyBlobPutTactics, ItemsInfo.back().BlobId.TabletID(), Info->GroupID,
- ItemsInfo.back().BlobId.ToString(), Tactic, NKikimrBlobStorage::EPutHandleClass_Name(HandleClass));
}
RequestBytes = 0;
- for (auto &item: ItemsInfo) {
+ for (auto &item: PutImpl.Blobs) {
ReportBytes(item.Buffer.capacity());
RequestBytes += item.BufferSize;
}
@@ -553,15 +517,15 @@ public:
A_LOG_INFO_S("BPP13", "bootstrap"
<< " ActorId# " << SelfId()
<< " Group# " << Info->GroupID
- << " BlobCount# " << ItemsInfo.size()
+ << " BlobCount# " << PutImpl.Blobs.size()
<< " BlobIDs# " << BlobIdSequenceToString()
<< " HandleClass# " << NKikimrBlobStorage::EPutHandleClass_Name(HandleClass)
<< " Tactic# " << TEvBlobStorage::TEvPut::TacticName(Tactic)
<< " Deadline# " << Deadline
<< " RestartCounter# " << RestartCounter);
- for (ui64 blobIdx = 0; blobIdx < ItemsInfo.size(); ++blobIdx) {
- LWTRACK(DSProxyPutBootstrapStart, ItemsInfo[blobIdx].Orbit);
+ for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
+ LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
}
Become(&TThis::StateWait);
@@ -573,15 +537,15 @@ public:
const ui32 totalParts = Info->Type.TotalPartCount();
TAutoPtr<TEvResume> resume(new TEvResume());
- resume->PartSets.resize(ItemsInfo.size());
+ resume->PartSets.resize(PutImpl.Blobs.size());
- for (ui64 idx = 0; idx < ItemsInfo.size(); ++idx) {
- TLogoBlobID blobId = ItemsInfo[idx].BlobId;
+ for (ui64 idx = 0; idx < PutImpl.Blobs.size(); ++idx) {
+ TLogoBlobID blobId = PutImpl.Blobs[idx].BlobId;
const ui64 partSize = Info->Type.PartSize(blobId);
- ui64 bufferSize = ItemsInfo[idx].BufferSize;
+ ui64 bufferSize = PutImpl.Blobs[idx].BufferSize;
- char *data = ItemsInfo[idx].Buffer.Detach();
+ char *data = PutImpl.Blobs[idx].Buffer.Detach();
Encrypt(data, data, 0, bufferSize, blobId, *Info);
TDataPartSet &partSet = resume->PartSets[idx];
@@ -604,8 +568,8 @@ public:
ResumeBootstrap(resume);
} else {
Send(SelfId(), resume.Release());
- for (ui64 blobIdx = 0; blobIdx < ItemsInfo.size(); ++blobIdx) {
- LWTRACK(DSProxyPutPauseBootstrap, ItemsInfo[blobIdx].Orbit);
+ for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
+ LWTRACK(DSProxyPutPauseBootstrap, PutImpl.Blobs[blobIdx].Orbit);
}
}
@@ -616,8 +580,8 @@ public:
void Handle(TEvResume::TPtr &ev) {
if (ev->Get()->Count == 0) {
// Record only the first resume to keep tracks structure simple
- for (ui64 blobIdx = 0; blobIdx < ItemsInfo.size(); ++blobIdx) {
- LWTRACK(DSProxyPutResumeBootstrap, ItemsInfo[blobIdx].Orbit);
+ for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
+ LWTRACK(DSProxyPutResumeBootstrap, PutImpl.Blobs[blobIdx].Orbit);
}
}
ResumeBootstrap(ev->Release());
@@ -662,15 +626,15 @@ public:
double waitSec = Timer.PassedReset();
resume->WaitSec += waitSec;
- Y_VERIFY(ItemsInfo.size() == resume->PartSets.size());
+ Y_VERIFY(PutImpl.Blobs.size() == resume->PartSets.size());
bool splitDone = true;
- for (ui64 idx = 0; idx < ItemsInfo.size(); ++idx) {
+ for (ui64 idx = 0; idx < PutImpl.Blobs.size(); ++idx) {
TDataPartSet &partSet = resume->PartSets[idx];
- TString &buffer = ItemsInfo[idx].Buffer;
- TLogoBlobID blobId = ItemsInfo[idx].BlobId;
+ TString &buffer = PutImpl.Blobs[idx].Buffer;
+ TLogoBlobID blobId = PutImpl.Blobs[idx].BlobId;
Info->Type.IncrementalSplitData((TErasureType::ECrcMode)blobId.CrcMode(), buffer, partSet);
if (partSet.IsSplitDone()) {
- ReportBytes(partSet.MemoryConsumed - ItemsInfo[idx].BufferSize);
+ ReportBytes(partSet.MemoryConsumed - PutImpl.Blobs[idx].BufferSize);
} else {
splitDone = false;
}
@@ -681,8 +645,8 @@ public:
LWPROBE(ProxyPutBootstrapPart, RequestBytes, waitSec * 1000.0, splitSec * 1000.0, resume->Count, resume->SplitSec * 1000.0);
if (splitDone) {
- for (ui64 blobIdx = 0; blobIdx < ItemsInfo.size(); ++blobIdx) {
- LWTRACK(DSProxyPutBootstrapDone, ItemsInfo[blobIdx].Orbit,
+ for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
+ LWTRACK(DSProxyPutBootstrapDone, PutImpl.Blobs[blobIdx].Orbit,
RequestBytes, resume->WilsonSec * 1000.0, resume->AllocateSec * 1000.0,
resume->WaitSec * 1000.0, resume->SplitSec * 1000.0, resume->Count, blobIdx);
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp
index 757b313cfb0..7eaa35790f9 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp
@@ -66,7 +66,7 @@ void TPutImpl::PrepareReply(NKikimrProto::EReplyStatus status, TLogContext &logC
continue;
}
- outPutResults.emplace_back(idx, new TEvBlobStorage::TEvPutResult(status, Blobs[idx].Id, StatusFlags,
+ outPutResults.emplace_back(idx, new TEvBlobStorage::TEvPutResult(status, Blobs[idx].BlobId, StatusFlags,
Info->GroupID, ApproximateFreeSpaceShare));
outPutResults.back().second->ErrorReason = errorReason;
@@ -87,7 +87,7 @@ void TPutImpl::PrepareReply(TLogContext &logCtx, TString errorReason,
for (auto item : finished) {
auto &[blobId, state] = *item;
const ui64 idx = state.BlobIdx;
- Y_VERIFY(blobId == Blobs[idx].Id, "BlobIdx# %" PRIu64 " BlobState# %s Blackboard# %s",
+ Y_VERIFY(blobId == Blobs[idx].BlobId, "BlobIdx# %" PRIu64 " BlobState# %s Blackboard# %s",
idx, state.ToString().c_str(), Blackboard.ToString().c_str());
Y_VERIFY(!IsDone[idx]);
Y_VERIFY(state.Status != NKikimrProto::UNKNOWN);
@@ -149,7 +149,7 @@ TString TPutImpl::DumpFullState() const {
bool TPutImpl::MarkBlobAsSent(ui64 idx) {
Y_VERIFY(idx < Blobs.size());
Y_VERIFY(!IsDone[idx]);
- Blackboard.MoveBlobStateToDone(Blobs[idx].Id);
+ Blackboard.MoveBlobStateToDone(Blobs[idx].BlobId);
IsDone[idx] = true;
DoneBlobs++;
return true;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
index b95d0cf3485..2e3ab60fc30 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
@@ -45,11 +45,30 @@ private:
const TEvBlobStorage::TEvPut::ETactic Tactic;
struct TBlobInfo {
- TLogoBlobID Id;
+ TLogoBlobID BlobId;
+ TString Buffer;
+ ui64 BufferSize;
+ TActorId Recipient;
+ ui64 Cookie;
+ NLWTrace::TOrbit Orbit;
+ bool Replied = false;
std::vector<std::pair<ui64, ui32>> ExtraBlockChecks;
+ NWilson::TSpan Span;
+
+ TBlobInfo(TLogoBlobID id, TString buffer, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId,
+ NLWTrace::TOrbit&& orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks, bool single)
+ : BlobId(id)
+ , Buffer(std::move(buffer))
+ , BufferSize(Buffer.size())
+ , Recipient(recipient)
+ , Cookie(cookie)
+ , Orbit(std::move(orbit))
+ , ExtraBlockChecks(std::move(extraBlockChecks))
+ , Span(single ? NWilson::TSpan() : NWilson::TSpan(TWilson::BlobStorage, std::move(traceId), "DSProxy.Put.Blob"))
+ {}
void Output(IOutputStream& s) const {
- s << Id;
+ s << BlobId;
if (!ExtraBlockChecks.empty()) {
s << "{";
for (auto it = ExtraBlockChecks.begin(); it != ExtraBlockChecks.end(); ++it) {
@@ -68,8 +87,11 @@ private:
return s.Str();
}
};
+
TBatchedVec<TBlobInfo> Blobs;
+ friend class TBlobStorageGroupPutRequest;
+
TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVPutResponses;
TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVMultiPutResponses;
@@ -82,7 +104,7 @@ private:
public:
TPutImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state,
TEvBlobStorage::TEvPut *ev, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon,
- bool enableRequestMod3x3ForMinLatecy)
+ bool enableRequestMod3x3ForMinLatecy, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId)
: Deadline(ev->Deadline)
, Info(info)
, Blackboard(info, state, ev->HandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead, false)
@@ -92,10 +114,13 @@ public:
, Mon(mon)
, EnableRequestMod3x3ForMinLatecy(enableRequestMod3x3ForMinLatecy)
, Tactic(ev->Tactic)
- , Blobs({{ev->Id, std::move(ev->ExtraBlockChecks)}})
{
- Y_VERIFY(Blobs.size());
- Y_VERIFY(Blobs.size() <= MaxBatchedPutRequests);
+ Blobs.emplace_back(ev->Id, std::move(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit),
+ std::move(ev->ExtraBlockChecks), true);
+
+ auto& blob = Blobs.back();
+ LWPROBE(DSProxyBlobPutTactics, blob.BlobId.TabletID(), Info->GroupID, blob.BlobId.ToString(), Tactic,
+ NKikimrBlobStorage::EPutHandleClass_Name(GetPutHandleClass()));
}
TPutImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state,
@@ -113,13 +138,20 @@ public:
, Tactic(tactic)
{
Y_VERIFY(events.size(), "TEvPut vector is empty");
+
for (auto &ev : events) {
auto& msg = *ev->Get();
Y_VERIFY(msg.HandleClass == putHandleClass);
Y_VERIFY(msg.Tactic == tactic);
- Blobs.push_back({msg.Id, std::move(msg.ExtraBlockChecks)});
+ Blobs.emplace_back(msg.Id, std::move(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId),
+ std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false);
Deadline = Max(Deadline, msg.Deadline);
+
+ auto& blob = Blobs.back();
+ LWPROBE(DSProxyBlobPutTactics, blob.BlobId.TabletID(), Info->GroupID, blob.BlobId.ToString(), Tactic,
+ NKikimrBlobStorage::EPutHandleClass_Name(GetPutHandleClass()));
}
+
Y_VERIFY(Blobs.size());
Y_VERIFY(Blobs.size() <= MaxBatchedPutRequests);
}
@@ -141,12 +173,13 @@ public:
const ui32 totalParts = Info->Type.TotalPartCount();
for (ui64 blobIdx = 0; blobIdx < Blobs.size(); ++blobIdx) {
TBlobInfo& blob = Blobs[blobIdx];
+ Blackboard.RegisterBlobForPut(blob.BlobId, &blob.ExtraBlockChecks, &blob.Span);
for (ui32 i = 0; i < totalParts; ++i) {
REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(partSets[blobIdx].Parts[i].OwnedString.Data(),
partSets[blobIdx].Parts[i].OwnedString.Size());
- Blackboard.AddPartToPut(blob.Id, i, partSets[blobIdx].Parts[i].OwnedString, &blob.ExtraBlockChecks);
+ Blackboard.AddPartToPut(blob.BlobId, i, partSets[blobIdx].Parts[i].OwnedString);
}
- Blackboard.MarkBlobReadyToPut(blob.Id, blobIdx);
+ Blackboard.MarkBlobReadyToPut(blob.BlobId, blobIdx);
}
TPutResultVec putResults;
@@ -461,7 +494,9 @@ protected:
++VPutRequests;
ReceivedVPutResponses.push_back(false);
} else if constexpr (isVMultiPut) {
- outVPutEvents.back()->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks);
+ // this request MUST originate from the TEvPut, so the Span field must be filled in
+ Y_VERIFY(put.Span);
+ outVPutEvents.back()->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks, put.Span->GetTraceId());
}
if (put.IsHandoff) {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
index 299dfa2af7d..c83f46b1080 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp
@@ -250,7 +250,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob
A_LOG_DEBUG_S("DSR08", "sending TEvGet# " << get->ToString());
- SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0, Span);
+ SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0, Span.GetTraceId());
// switch state
Become(&TThis::StateGet);
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp
index 89304e2cd34..efeedb94465 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp
@@ -334,7 +334,7 @@ void TStrategyBase::PreparePutsForPartPlacement(TLogContext &logCtx, TBlobState
Y_VERIFY(state.Parts[record.PartIdx].Data.IsMonolith());
groupDiskRequests.AddPut(disk.OrderNumber, partId, state.Parts[record.PartIdx].Data.GetMonolith(),
TDiskPutRequest::ReasonInitial, info.Type.IsHandoffInSubgroup(record.VDiskIdx),
- state.ExtraBlockChecks, state.BlobIdx);
+ state.ExtraBlockChecks, state.Span, state.BlobIdx);
disk.DiskParts[record.PartIdx].Situation = TBlobState::ESituation::Sent;
}
}
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
index 5ef2c7ddf4d..4442c48218b 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h
@@ -141,6 +141,7 @@ protected:
diskIdx == group.DiskIdx[0] ? TDiskPutRequest::ReasonInitial : TDiskPutRequest::ReasonError,
diskIdx != group.DiskIdx[0],
state.ExtraBlockChecks,
+ state.Span,
state.BlobIdx);
s = TBlobState::ESituation::Sent;
any |= {&info.GetTopology(), diskIdx};
@@ -174,6 +175,7 @@ protected:
handoff ? TDiskPutRequest::ReasonError : TDiskPutRequest::ReasonInitial,
handoff,
state.ExtraBlockChecks,
+ state.Span,
state.BlobIdx);
part.Situation = TBlobState::ESituation::Sent;
any |= {&info.GetTopology(), (ui8)diskIdx};
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
index 74bd0ed7ee8..aaf73779b16 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp
@@ -371,7 +371,7 @@ void ConductVPatchStart(TTestBasicRuntime &runtime, const TDSProxyEnv &env, cons
UNIT_ASSERT(startRecord.HasCookie());
std::unique_ptr<TEvBlobStorage::TEvVPatchFoundParts> foundParts = std::make_unique<TEvBlobStorage::TEvVPatchFoundParts>(
status, args.OriginalId, args.PatchedId, vdisk, startRecord.GetCookie(), now, "", &startRecord,
- nullptr, nullptr, nullptr, NWilson::TTraceId(), 0);
+ nullptr, nullptr, nullptr, 0);
for (auto partId : parts) {
foundParts->AddPart(partId);
}
@@ -406,7 +406,7 @@ void ConductVPatchDiff(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const
UNIT_ASSERT(diffRecord.HasCookie());
std::unique_ptr<TEvBlobStorage::TEvVPatchResult> result = std::make_unique<TEvBlobStorage::TEvVPatchResult>(
resultStatus, args.OriginalId, args.PatchedId, vdisk, diffRecord.GetCookie(), now,
- &diffRecord, nullptr, nullptr, nullptr,NWilson::TTraceId(), 0);
+ &diffRecord, nullptr, nullptr, nullptr, 0);
result->SetStatusFlagsAndFreeSpace(args.StatusFlags, args.ApproximateFreeSpaceShare);
SendByHandle(runtime, diffEv, std::move(result));
@@ -449,7 +449,7 @@ void ConductVMovedPatch(TTestBasicRuntime &runtime, const TTestArgs &args, EMove
TOutOfSpaceStatus oos(args.StatusFlags, args.ApproximateFreeSpaceShare);
std::unique_ptr<TEvBlobStorage::TEvVMovedPatchResult> vPatchResult = std::make_unique<TEvBlobStorage::TEvVMovedPatchResult>(
resultStatus, args.OriginalId, args.PatchedId, vDiskId, expectedCookie, oos,
- TAppData::TimeProvider->Now(), 0, &vPatchRecord, nullptr, nullptr, nullptr, NWilson::TTraceId(), 0, TString());
+ TAppData::TimeProvider->Now(), 0, &vPatchRecord, nullptr, nullptr, nullptr, 0, TString());
SendByHandle(runtime, handle, std::move(vPatchResult));
CTEST << "ConductVMovedPatch: Finish\n";
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp
index 48c88d53505..e94b2c398d5 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp
@@ -66,7 +66,7 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies)
TEvBlobStorage::TEvPut ev(blobId, data, TInstant::Max(), NKikimrBlobStorage::TabletLog,
TEvBlobStorage::TEvPut::TacticDefault);
- TPutImpl putImpl(group.GetInfo(), groupQueues, &ev, mon, false);
+ TPutImpl putImpl(group.GetInfo(), groupQueues, &ev, mon, false, TActorId(), 0, NWilson::TTraceId());
for (ui32 idx = 0; idx < domainCount; ++idx) {
group.SetPredictedDelayNs(idx, 1);
@@ -362,7 +362,7 @@ struct TTestPutAllOk {
TMaybe<TPutImpl> putImpl;
TPutImpl::TPutResultVec putResults;
if constexpr (IsVPut) {
- putImpl.ConstructInPlace(Group.GetInfo(), GroupQueues, events[0]->Get(), Mon, false);
+ putImpl.ConstructInPlace(Group.GetInfo(), GroupQueues, events[0]->Get(), Mon, false, TActorId(), 0, NWilson::TTraceId());
} else {
putImpl.ConstructInPlace(Group.GetInfo(), GroupQueues, events, Mon,
NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticDefault, false);
@@ -411,7 +411,7 @@ Y_UNIT_TEST(TestMirror3dcWith3x3MinLatencyMod) {
TString data = AlphaData(size);
TEvBlobStorage::TEvPut ev(blobId, data, TInstant::Max(), NKikimrBlobStorage::TabletLog,
TEvBlobStorage::TEvPut::TacticMinLatency);
- TPutImpl putImpl(env.Info, env.GroupQueues, &ev, env.Mon, true);
+ TPutImpl putImpl(env.Info, env.GroupQueues, &ev, env.Mon, true, TActorId(), 0, NWilson::TTraceId());
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
TLogContext logCtx(NKikimrServices::BS_PROXY_PUT, false);
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp
index dcb316ec9c2..397b975979a 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp
@@ -247,7 +247,7 @@ void SendVGetResult(ui32 vDiskIdx, NKikimrProto::EReplyStatus status, ui32 partI
TVDiskState *from = &subgroup[vDiskIdx];
std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult(NKikimrProto::OK, from->VDiskId,
- TAppData::TimeProvider->Now(), 0, nullptr, nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U));
+ TAppData::TimeProvider->Now(), 0, nullptr, nullptr, nullptr, nullptr, {}, 0U, 0U));
SetPredictedDelaysForAllQueues({});
ui64 queryCookie = from->QueryCookies.size() ? from->QueryCookies[0] : 0;
@@ -288,7 +288,7 @@ void SendVGetResult(ui32 blobIdx, ui32 vDiskIdx, NKikimrProto::EReplyStatus stat
|| status == NKikimrProto::VDISK_ERROR_STATE) {
std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult(
status, request.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr, nullptr, nullptr, nullptr,
- NWilson::TTraceId(), {}, 0U, 0U));
+ {}, 0U, 0U));
for (auto it = request.Queries.begin(); it != request.Queries.end(); ++it) {
result->AddResult(status, it->LogoBlobId, &it->QueryCookie);
}
@@ -300,7 +300,7 @@ void SendVGetResult(ui32 blobIdx, ui32 vDiskIdx, NKikimrProto::EReplyStatus stat
} else if (status == NKikimrProto::NODATA) {
std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult(
NKikimrProto::OK, request.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr,
- nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U));
+ nullptr, nullptr, nullptr, {}, 0U, 0U));
for (auto it = request.Queries.begin(); it != request.Queries.end(); ++it) {
result->AddResult(status, it->LogoBlobId, &it->QueryCookie);
TLogoBlobID id(it->LogoBlobId);
@@ -314,7 +314,7 @@ void SendVGetResult(ui32 blobIdx, ui32 vDiskIdx, NKikimrProto::EReplyStatus stat
} else if (status == NKikimrProto::OK) {
std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult(
NKikimrProto::OK, request.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr,
- nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U));
+ nullptr, nullptr, nullptr, {}, 0U, 0U));
for (auto it = request.Queries.begin(); it != request.Queries.end(); ++it) {
TString data;
ui32 partIdx = 0;
@@ -366,7 +366,7 @@ void SendVPutResultEvent(TTestActorRuntime &runtime, TVDiskState &vdisk, NKikimr
std::unique_ptr<TEvBlobStorage::TEvVPutResult> vPutResult(new TEvBlobStorage::TEvVPutResult(
status, vdisk.LogoBlobId, vdisk.VDiskId,
&vdisk.InnerCookie, TOutOfSpaceStatus(0u, 0.0), TAppData::TimeProvider->Now(),
- 0, nullptr, nullptr, nullptr, nullptr, 0, NWilson::TTraceId(), 0, TString()));
+ 0, nullptr, nullptr, nullptr, nullptr, 0, 0, TString()));
vPutResult->Record.MutableMsgQoS()->MutableMsgId()->SetMsgId(vdisk.MsgId);
vPutResult->Record.MutableMsgQoS()->MutableMsgId()->SetSequenceId(vdisk.SequenceId);
SetPredictedDelaysForAllQueues({});
@@ -1212,7 +1212,7 @@ Y_UNIT_TEST(TestGivenBlock42PutWhenPartialGetThenSingleDiskRequestOk) {
std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(
new TEvBlobStorage::TEvVGetResult(
NKikimrProto::OK, theRequest.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr,
- nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U));
+ nullptr, nullptr, nullptr, {}, 0U, 0U));
result->AddResult(
NKikimrProto::OK, id, query.Shift, resultData.data(),
resultData.size(), &query.QueryCookie);
@@ -1241,7 +1241,7 @@ Y_UNIT_TEST(TestGivenBlock42PutWhenPartialGetThenSingleDiskRequestOk) {
const auto &request = item.second;
std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult(
NKikimrProto::RACE, request.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr,
- nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U));
+ nullptr, nullptr, nullptr, {}, 0U, 0U));
result->Record.MutableMsgQoS()->MutableMsgId()->SetMsgId(request.MsgId);
result->Record.MutableMsgQoS()->MutableMsgId()->SetSequenceId(request.SequenceId);
result->Record.SetCookie(request.RecordCookie);
@@ -1301,7 +1301,7 @@ Y_UNIT_TEST(TestGivenBlock42Put6PartsOnOneVDiskWhenDiscoverThenRecoverFirst) {
TGetRangeQuery &query = req.RangeQueries[0];
std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult(
NKikimrProto::OK, req.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr, nullptr, nullptr,
- nullptr, NWilson::TTraceId(), {}, 0U, 0U));
+ nullptr, {}, 0U, 0U));
TIngress ingress;
for (ui32 partIdx = 0; partIdx < 6; ++partIdx) {
TLogoBlobID blobPartId(logoblobid, partIdx + 1);
@@ -1325,7 +1325,7 @@ Y_UNIT_TEST(TestGivenBlock42Put6PartsOnOneVDiskWhenDiscoverThenRecoverFirst) {
//TGetRangeQuery &query = req.RangeQueries[0];
std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult(
NKikimrProto::OK, req.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr, nullptr, nullptr,
- nullptr, NWilson::TTraceId(), {}, 0U, 0U));
+ nullptr, {}, 0U, 0U));
result->Record.MutableMsgQoS()->MutableMsgId()->SetMsgId(req.MsgId);
result->Record.MutableMsgQoS()->MutableMsgId()->SetSequenceId(req.SequenceId);
runtime.Send(
diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_test_state_ut.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_test_state_ut.h
index bc805ac12b7..e6b8d2de143 100644
--- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_test_state_ut.h
+++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_test_state_ut.h
@@ -172,7 +172,7 @@ struct TTestState {
ui64 *cookie = record.HasCookie() ? &cookieValue : nullptr;
std::unique_ptr<TEvBlobStorage::TEvVPutResult> result(new TEvBlobStorage::TEvVPutResult(status, blobId, vDiskId,
cookie, TOutOfSpaceStatus(0u, 0.0), TAppData::TimeProvider->Now(),
- 0, nullptr, nullptr, nullptr, nullptr, 0, NWilson::TTraceId(), 0, TString()));
+ 0, nullptr, nullptr, nullptr, nullptr, 0, 0, TString()));
return result;
}
@@ -184,7 +184,7 @@ struct TTestState {
ui64 *cookie = record.HasCookie() ? &cookieValue : nullptr;
std::unique_ptr<TEvBlobStorage::TEvVMultiPutResult> result(
new TEvBlobStorage::TEvVMultiPutResult(status, vDiskId, cookie, TAppData::TimeProvider->Now(), 0,
- nullptr, nullptr, nullptr, nullptr, 0, NWilson::TTraceId(), 0, TString()));
+ nullptr, nullptr, nullptr, nullptr, 0, 0, TString()));
result->Record.SetStatusFlags(TOutOfSpaceStatus(0u, 0.0).Flags);
return result;
}
@@ -224,7 +224,7 @@ struct TTestState {
TVDiskID vDiskId = VDiskIDFromVDiskID(ev->Record.GetVDiskID());
std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult(
status, vDiskId, TAppData::TimeProvider->Now(), 0, nullptr,
- nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U));
+ nullptr, nullptr, nullptr, {}, 0U, 0U));
return result;
}
diff --git a/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp b/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
index 2e51f3e5bfc..23bbd621bd1 100644
--- a/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp
@@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) {
TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr));
for(auto [blob, data, status] : blobs) {
- static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr);
+ static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr, NWilson::TTraceId());
}
static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->Record = proto;
@@ -105,7 +105,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) {
TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr));
for(auto [blob, data, status] : blobs) {
- static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr);
+ static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr, NWilson::TTraceId());
}
env.WithQueueId(test.Info->GetVDiskInSubgroup(0, blobs[0].BlobId.Hash()), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) {
@@ -528,7 +528,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) {
if (i % 19 != 18) {
++goodCount;
TLogoBlobID blob(i, 1, 0, 0, blobSize, 0, 1);
- static_cast<TEvBlobStorage::TEvVMultiPut*>(events[i].get())->AddVPut(blob, data, nullptr, nullptr);
+ static_cast<TEvBlobStorage::TEvVMultiPut*>(events[i].get())->AddVPut(blob, data, nullptr, nullptr, NWilson::TTraceId());
}
}
diff --git a/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp b/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp
index 931cd7958dd..f60f1616793 100644
--- a/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp
+++ b/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp
@@ -448,7 +448,7 @@ class TManyMultiPuts : public TActorBootstrapped<TManyMultiPuts> {
TVDiskIdShort mainVDiskId = TIngress::GetMainReplica(&Conf->GroupInfo->GetTopology(), logoBlobID);
if (mainVDiskId == VDiskInfo.VDiskID) {
ui64 cookieValue = Step;
- vMultiPut->AddVPut(logoBlobID, MsgData, &cookieValue, nullptr);
+ vMultiPut->AddVPut(logoBlobID, MsgData, &cookieValue, nullptr, NWilson::TTraceId());
putCount++;
Step++;
diff --git a/ydb/core/blobstorage/ut_vdisk/lib/vdisk_mock.cpp b/ydb/core/blobstorage/ut_vdisk/lib/vdisk_mock.cpp
index d00daeecd30..9dcec09c1bc 100644
--- a/ydb/core/blobstorage/ut_vdisk/lib/vdisk_mock.cpp
+++ b/ydb/core/blobstorage/ut_vdisk/lib/vdisk_mock.cpp
@@ -102,7 +102,7 @@ public:
auto response = std::make_unique<TEvBlobStorage::TEvVPutResult>(status, id,
VDiskIDFromVDiskID(record.GetVDiskID()), record.HasCookie() ? &cookie : nullptr,
TOutOfSpaceStatus(0u, 0.0), TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(),
- &record, nullptr, nullptr, nullptr, 0, NWilson::TTraceId(), 0, errorReason);
+ &record, nullptr, nullptr, nullptr, 0, 0, errorReason);
FinalizeAndSend(std::move(response), ctx, ev->Sender);
};
@@ -141,7 +141,7 @@ public:
auto response = std::make_unique<TEvBlobStorage::TEvVMultiPutResult>(NKikimrProto::OK,
VDiskIDFromVDiskID(record.GetVDiskID()), record.HasCookie() ? &cookie : nullptr,
TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(),
- &record, nullptr, nullptr, nullptr, 0, NWilson::TTraceId(), 0, TString());
+ &record, nullptr, nullptr, nullptr, 0, 0, TString());
if (ErrorMode) {
response->MakeError(NKikimrProto::ERROR, "error mode", record);
LOG_DEBUG(ctx, NActorsServices::TEST, "TEvVMultiPut %s -> %s", ev->Get()->ToString().data(),
@@ -179,7 +179,7 @@ public:
auto response = std::make_unique<TEvBlobStorage::TEvVGetResult>(NKikimrProto::OK,
VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(),
- &record, nullptr, nullptr, nullptr, NWilson::TTraceId(), cookie, 0U, 0U);
+ &record, nullptr, nullptr, nullptr, cookie, 0U, 0U);
if (ErrorMode) {
response->MakeError(NKikimrProto::ERROR, "error mode", record);
@@ -336,7 +336,7 @@ public:
TEvBlobStorage::TEvVBlockResult::TTabletActGen actual(record.GetTabletId(), record.GetGeneration());
auto response = std::make_unique<TEvBlobStorage::TEvVBlockResult>(NKikimrProto::OK, &actual,
VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(),
- (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, NWilson::TTraceId(), 0);
+ (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, 0);
FinalizeAndSend(std::move(response), ctx, ev->Sender);
}
@@ -349,11 +349,11 @@ public:
if (it != Blocks.end()) {
response.reset(new TEvBlobStorage::TEvVGetBlockResult(NKikimrProto::OK, it->first, it->second,
VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(),
- ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, NWilson::TTraceId()));
+ ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr));
} else {
response.reset(new TEvBlobStorage::TEvVGetBlockResult(NKikimrProto::NODATA, record.GetTabletId(),
VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(),
- ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, NWilson::TTraceId()));
+ ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr));
}
FinalizeAndSend(std::move(response), ctx, ev->Sender);
@@ -367,7 +367,7 @@ public:
auto response = std::make_unique<TEvBlobStorage::TEvVCollectGarbageResult>(NKikimrProto::BLOCKED,
record.GetTabletId(), record.GetRecordGeneration(), record.GetChannel(),
VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(),
- (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, NWilson::TTraceId(), 0);
+ (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, 0);
FinalizeAndSend(std::move(response), ctx, ev->Sender);
return;
}
@@ -387,8 +387,7 @@ public:
auto response = std::make_unique<TEvBlobStorage::TEvVCollectGarbageResult>(NKikimrProto::OK, record.GetTabletId(),
record.GetRecordGeneration(), record.GetChannel(), VDiskIDFromVDiskID(record.GetVDiskID()),
- TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr,
- NWilson::TTraceId(), 0);
+ TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, 0);
FinalizeAndSend(std::move(response), ctx, ev->Sender);
}
@@ -406,7 +405,7 @@ public:
auto response = std::make_unique<TEvBlobStorage::TEvVGetBarrierResult>(NKikimrProto::OK,
VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(),
- &record, nullptr, nullptr, nullptr, NWilson::TTraceId());
+ &record, nullptr, nullptr, nullptr);
auto it = Barriers.lower_bound(first);
while (it != Barriers.end() && it->first <= last) {
diff --git a/ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h b/ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h
index e44bcae7e7c..1ec81167f1e 100644
--- a/ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h
+++ b/ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h
@@ -83,10 +83,8 @@ namespace NKikimr {
TEvAnubisOsirisPutResult(NKikimrProto::EReplyStatus status,
const TInstant &now,
::NMonitoring::TDynamicCounters::TCounterPtr counterPtr,
- NVDiskMon::TLtcHistoPtr histoPtr,
- NWilson::TTraceId traceId)
- : TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr, histoPtr,
- std::move(traceId))
+ NVDiskMon::TLtcHistoPtr histoPtr)
+ : TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr, histoPtr)
, Status(status)
{}
};
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_events.cpp
index f02de6350c3..2a2a8d69b17 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.cpp
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.cpp
@@ -9,10 +9,10 @@ namespace NKikimr {
const TLogoBlobID &logoBlobId, const TVDiskID &vdisk, const ui64 *cookie, TOutOfSpaceStatus oosStatus,
const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVPut *record,
const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, const ui64 bufferSizeBytes, NWilson::TTraceId traceId,
+ const NVDiskMon::TLtcHistoPtr &histoPtr, const ui64 bufferSizeBytes,
ui64 incarnationGuid, const TString& errorReason)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId),
- TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, record, skeletonFrontIDPtr)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG,
+ recByteSize, record, skeletonFrontIDPtr)
{
IncrementSize(bufferSizeBytes);
Record.SetStatus(status);
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index 51040aefacf..82e3dbd8652 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -355,13 +355,12 @@ namespace NKikimr {
TEvVResultBase() = default;
TEvVResultBase(const TInstant &now, ui32 channel, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId)
+ const NVDiskMon::TLtcHistoPtr &histoPtr)
: TVDiskNonlocalResultBase(channel)
, Start(now)
, Size(0)
, CounterPtr(counterPtr)
, HistoPtr(histoPtr)
- , TraceId(std::move(traceId))
{}
virtual ~TEvVResultBase() {
@@ -394,9 +393,6 @@ namespace NKikimr {
::NMonitoring::TDynamicCounters::TCounterPtr CounterPtr;
NVDiskMon::TLtcHistoPtr HistoPtr;
bool Finalized = false;
-
- public:
- NWilson::TTraceId TraceId;
};
template<typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType>
@@ -410,8 +406,8 @@ namespace NKikimr {
TEvVResultBasePB() = default;
TEvVResultBasePB(const TInstant &now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel)
- : TEvVResultBase(now, channel, counterPtr, histoPtr, std::move(traceId))
+ const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel)
+ : TEvVResultBase(now, channel, counterPtr, histoPtr)
{}
};
@@ -433,10 +429,9 @@ namespace NKikimr {
template<typename TQueryRecord>
TEvVResultBaseWithQoSPB(TInstant now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel,
+ const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel,
ui32 recByteSize, TQueryRecord *queryRecord, const TActorIDPtr &skeletonFrontIDPtr)
- : TBase(queryRecord ? GetReceivedTimestamp(queryRecord) : now,
- counterPtr, histoPtr, std::move(traceId), channel)
+ : TBase(queryRecord ? GetReceivedTimestamp(queryRecord) : now, counterPtr, histoPtr, channel)
, MsgCtx(queryRecord ? TVMsgContext(recByteSize, queryRecord->GetMsgQoS()) : TVMsgContext())
, SkeletonFrontIDPtr(skeletonFrontIDPtr)
{
@@ -717,8 +712,7 @@ namespace NKikimr {
const ui64 *cookie, TOutOfSpaceStatus oosStatus, const TInstant &now, ui32 recByteSize,
NKikimrBlobStorage::TEvVPut *queryRecord, const TActorIDPtr &skeletonFrontIDPtr,
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr,
- const ui64 bufferSizeBytes, NWilson::TTraceId traceId, ui64 incarnationGuid,
- const TString& errorReason);
+ const ui64 bufferSizeBytes, ui64 incarnationGuid, const TString& errorReason);
TString ToString() const override {
return ToString(Record);
@@ -838,7 +832,7 @@ namespace NKikimr {
TRope GetItemBuffer(ui64 itemIdx) const;
void AddVPut(const TLogoBlobID &logoBlobId, const TString &buffer, ui64 *cookie,
- std::vector<std::pair<ui64, ui32>> *extraBlockChecks) {
+ std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TTraceId traceId) {
NKikimrBlobStorage::TVMultiPutItem *item = Record.AddItems();
LogoBlobIDFromLogoBlobID(logoBlobId, item->MutableBlobID());
item->SetFullDataSize(logoBlobId.BlobSize());
@@ -854,6 +848,9 @@ namespace NKikimr {
p->SetGeneration(generation);
}
}
+ if (traceId) {
+ traceId.Serialize(item->MutableTraceId());
+ }
}
bool Validate(TString& errorReason) {
@@ -929,10 +926,10 @@ namespace NKikimr {
TEvVMultiPutResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const ui64 *cookie,
const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVMultiPut *record,
const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, const ui64 bufferSizeBytes, NWilson::TTraceId traceId,
- ui64 incarnationGuid, const TString& errorReason)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId),
- TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, record, skeletonFrontIDPtr)
+ const NVDiskMon::TLtcHistoPtr &histoPtr, const ui64 bufferSizeBytes, ui64 incarnationGuid,
+ const TString& errorReason)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG,
+ recByteSize, record, skeletonFrontIDPtr)
{
IncrementSize(bufferSizeBytes);
Record.SetStatus(status);
@@ -1265,9 +1262,8 @@ namespace NKikimr {
TEvVGetResult(NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TInstant &now,
ui32 recByteSize, NKikimrBlobStorage::TEvVGet *queryRecord, const TActorIDPtr &skeletonFrontIDPtr,
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr,
- NWilson::TTraceId traceId, TMaybe<ui64> cookie, ui32 channel, ui64 incarnationGuid)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), channel,
- recByteSize, queryRecord, skeletonFrontIDPtr)
+ TMaybe<ui64> cookie, ui32 channel, ui64 incarnationGuid)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, channel, recByteSize, queryRecord, skeletonFrontIDPtr)
{
Record.SetStatus(status);
VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID());
@@ -1517,9 +1513,8 @@ namespace NKikimr {
TOutOfSpaceStatus oosStatus, const TInstant &now, ui32 recByteSize,
TReqRecord *record, const TActorIDPtr &skeletonFrontIDPtr,
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr,
- NWilson::TTraceId traceId, ui64 incarnationGuid, const TString& errorReason)
- : TBase(now, counterPtr, histoPtr, std::move(traceId),
- TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, record, skeletonFrontIDPtr)
+ ui64 incarnationGuid, const TString& errorReason)
+ : TBase(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, record, skeletonFrontIDPtr)
{
this->Record.SetStatus(status);
LogoBlobIDFromLogoBlobID(originalBlobId, this->Record.MutableOriginalBlobId());
@@ -1640,9 +1635,9 @@ namespace NKikimr {
TOutOfSpaceStatus oosStatus, const TInstant &now, ui32 recByteSize,
NKikimrBlobStorage::TEvVMovedPatch *record, const TActorIDPtr &skeletonFrontIDPtr,
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr,
- NWilson::TTraceId traceId, ui64 incarnationGuid, const TString& errorReason)
+ ui64 incarnationGuid, const TString& errorReason)
: TBase(status, originalBlobId, patchedBlobId, vdisk, cookie, oosStatus, now, recByteSize,
- record, skeletonFrontIDPtr, counterPtr, histoPtr, std::move(traceId), incarnationGuid, errorReason)
+ record, skeletonFrontIDPtr, counterPtr, histoPtr, incarnationGuid, errorReason)
{}
};
@@ -1688,9 +1683,9 @@ namespace NKikimr {
TOutOfSpaceStatus oosStatus, const TInstant &now, ui32 recByteSize,
NKikimrBlobStorage::TEvVInplacePatch *record, const TActorIDPtr &skeletonFrontIDPtr,
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr,
- NWilson::TTraceId traceId, ui64 incarnationGuid, const TString& errorReason)
+ ui64 incarnationGuid, const TString& errorReason)
: TBase(status, originalBlobId, patchedBlobId, vdisk, cookie, oosStatus, now, recByteSize,
- record, skeletonFrontIDPtr, counterPtr, histoPtr, std::move(traceId), incarnationGuid, errorReason)
+ record, skeletonFrontIDPtr, counterPtr, histoPtr, incarnationGuid, errorReason)
{}
};
@@ -1738,9 +1733,9 @@ namespace NKikimr {
TEvVBlockResult(NKikimrProto::EReplyStatus status, const TTabletActGen *actual, const TVDiskID &vdisk,
const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVBlock *queryRecord,
const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui64 incarnationGuid)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId),
- TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, queryRecord, skeletonFrontIDPtr)
+ const NVDiskMon::TLtcHistoPtr &histoPtr, ui64 incarnationGuid)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG,
+ recByteSize, queryRecord, skeletonFrontIDPtr)
{
Record.SetStatus(status);
if (actual) {
@@ -1832,8 +1827,8 @@ namespace NKikimr {
const TLogoBlobID &patchedBlobId, const TVDiskID &vDiskId, TMaybe<ui64> cookie, const TInstant &now,
const TString &errorReason, NKikimrBlobStorage::TEvVPatchStart *record,
const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui64 incarnationGuid)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId),
+ const NVDiskMon::TLtcHistoPtr &histoPtr, ui64 incarnationGuid)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr,
TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, record->GetCachedSize(),
record, skeletonFrontIDPtr)
{
@@ -1996,8 +1991,8 @@ namespace NKikimr {
TEvVPatchXorDiffResult(NKikimrProto::EReplyStatus status, TInstant now,
NKikimrBlobStorage::TEvVPatchXorDiff *record, const TActorIDPtr &skeletonFrontIDPtr,
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId),
+ const NVDiskMon::TLtcHistoPtr &histoPtr)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr,
TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, record->GetCachedSize(),
record, skeletonFrontIDPtr)
{
@@ -2030,8 +2025,8 @@ namespace NKikimr {
const TLogoBlobID &patchedPartBlobId, const TVDiskID &vDiskId, TMaybe<ui64> cookie, TInstant now,
NKikimrBlobStorage::TEvVPatchDiff *record, const TActorIDPtr &skeletonFrontIDPtr,
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr,
- NWilson::TTraceId traceId, ui64 incarnationGuid)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId),
+ ui64 incarnationGuid)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr,
TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, record->GetCachedSize(),
record, skeletonFrontIDPtr)
{
@@ -2109,9 +2104,9 @@ namespace NKikimr {
TEvVGetBlockResult(NKikimrProto::EReplyStatus status, ui64 tabletId, const TVDiskID &vdisk,
const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVGetBlock *queryRecord,
const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId),
- TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, queryRecord, skeletonFrontIDPtr)
+ const NVDiskMon::TLtcHistoPtr &histoPtr)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG,
+ recByteSize, queryRecord, skeletonFrontIDPtr)
{
Record.SetStatus(status);
Record.SetTabletId(tabletId);
@@ -2121,9 +2116,9 @@ namespace NKikimr {
TEvVGetBlockResult(NKikimrProto::EReplyStatus status, ui64 tabletId, ui32 generation, const TVDiskID &vdisk,
const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVGetBlock *queryRecord,
const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId),
- TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, queryRecord, skeletonFrontIDPtr)
+ const NVDiskMon::TLtcHistoPtr &histoPtr)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG,
+ recByteSize, queryRecord, skeletonFrontIDPtr)
{
Record.SetStatus(status);
Record.SetTabletId(tabletId);
@@ -2234,9 +2229,9 @@ namespace NKikimr {
const TVDiskID &vdisk, const TInstant &now, ui32 recByteSize,
NKikimrBlobStorage::TEvVCollectGarbage *queryRecord, const TActorIDPtr &skeletonFrontIDPtr,
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr,
- NWilson::TTraceId traceId, ui64 incarnationGuid)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId),
- TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, queryRecord, skeletonFrontIDPtr)
+ ui64 incarnationGuid)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG,
+ recByteSize, queryRecord, skeletonFrontIDPtr)
{
Record.SetStatus(status);
Record.SetTabletId(tabletId);
@@ -2329,9 +2324,9 @@ namespace NKikimr {
TEvVGetBarrierResult(NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TInstant &now,
ui32 recByteSize, NKikimrBlobStorage::TEvVGetBarrier *queryRecord,
const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId)
- : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId),
- TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, queryRecord, skeletonFrontIDPtr)
+ const NVDiskMon::TLtcHistoPtr &histoPtr)
+ : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG,
+ recByteSize, queryRecord, skeletonFrontIDPtr)
{
Record.SetStatus(status);
VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID());
@@ -2498,10 +2493,8 @@ namespace NKikimr {
TEvVDbStatResult() = default;
TEvVDbStatResult(NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TInstant &now,
- const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr,
- NWilson::TTraceId traceId)
- : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId),
- TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG)
+ const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr)
+ : TEvVResultBasePB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG)
{
Record.SetStatus(status);
VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID());
@@ -2566,8 +2559,8 @@ namespace NKikimr {
// read response
TEvVSyncGuidResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TInstant &now,
ui64 guid, EState state, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel)
- : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel)
+ const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel)
+ : TEvVResultBasePB(now, counterPtr, histoPtr, channel)
{
Record.SetStatus(status);
auto guidInfo = Record.MutableReadInfo();
@@ -2579,8 +2572,8 @@ namespace NKikimr {
// write or error response
TEvVSyncGuidResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TInstant &now,
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel)
- : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel)
+ const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel)
+ : TEvVResultBasePB(now, counterPtr, histoPtr, channel)
{
Record.SetStatus(status);
VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID());
@@ -2625,8 +2618,8 @@ namespace NKikimr {
TEvVSyncResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk,
const TSyncState &newSyncState, bool finished, NPDisk::TStatusFlags flags, const TInstant &now,
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel)
- : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel)
+ const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel)
+ : TEvVResultBasePB(now, counterPtr, histoPtr, channel)
{
Record.SetStatus(status);
SyncStateFromSyncState(newSyncState, Record.MutableNewSyncState());
@@ -2637,8 +2630,8 @@ namespace NKikimr {
TEvVSyncResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, NPDisk::TStatusFlags flags,
const TInstant &now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel)
- : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel)
+ const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel)
+ : TEvVResultBasePB(now, counterPtr, histoPtr, channel)
{
Record.SetStatus(status);
VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID());
@@ -2691,8 +2684,8 @@ namespace NKikimr {
TEvVSyncFullResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TSyncState &syncState,
ui64 cookie, const TInstant &now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel)
- : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel)
+ const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel)
+ : TEvVResultBasePB(now, counterPtr, histoPtr, channel)
{
Record.SetStatus(status);
VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID());
@@ -2702,8 +2695,8 @@ namespace NKikimr {
TEvVSyncFullResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, ui64 cookie,
const TInstant &now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr,
- const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel)
- : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel)
+ const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel)
+ : TEvVResultBasePB(now, counterPtr, histoPtr, channel)
{
Record.SetStatus(status);
VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID());
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h
index 840f5f7da1f..64c92463539 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h
@@ -61,10 +61,9 @@ namespace NKikimr {
const ui64 OrderId;
TEvDelLogoBlobDataSyncLogResult(ui64 orderId, const TInstant &now,
- ::NMonitoring::TDynamicCounters::TCounterPtr counterPtr, NVDiskMon::TLtcHistoPtr histoPtr,
- NWilson::TTraceId traceId)
+ ::NMonitoring::TDynamicCounters::TCounterPtr counterPtr, NVDiskMon::TLtcHistoPtr histoPtr)
: TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr,
- histoPtr, std::move(traceId))
+ histoPtr)
, OrderId(orderId)
{}
};
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
index 6555ca46312..8613e05c0ef 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp
@@ -14,27 +14,23 @@ void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEve
}
void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel) {
- NWilson::TTraceId traceId;
-
switch (const ui32 type = ev->Type()) {
-#define WILSON_HANDLE_EVENT(T, EV) \
+#define HANDLE_EVENT(T, EV) \
case TEvBlobStorage::T::EventType: { \
TEvBlobStorage::T *event = static_cast<TEvBlobStorage::T *>(ev); \
- traceId = std::move(event->TraceId); \
const double usPerCycle = 1000000.0 / NHPTimer::GetCyclesPerSecond(); \
event->Record.MutableTimestamps()->SetSentByVDiskUs(GetCycleCountFast() * usPerCycle); \
break; \
}
- WILSON_HANDLE_EVENT(TEvVPutResult, EvVPutResultSent)
- WILSON_HANDLE_EVENT(TEvVMultiPutResult, EvVMultiPutResultSent)
- WILSON_HANDLE_EVENT(TEvVGetResult, EvVGetResultSent)
+ HANDLE_EVENT(TEvVPutResult, EvVPutResultSent)
+ HANDLE_EVENT(TEvVMultiPutResult, EvVMultiPutResultSent)
+ HANDLE_EVENT(TEvVGetResult, EvVGetResultSent)
-#undef WILSON_HANDLE_EVENT
+#undef HANDLE_EVENT
}
- auto event = std::make_unique<IEventHandle>(recipient, ctx.SelfID, ev, IEventHandle::MakeFlags(channel, 0), cookie,
- nullptr, std::move(traceId));
+ auto event = std::make_unique<IEventHandle>(recipient, ctx.SelfID, ev, IEventHandle::MakeFlags(channel, 0), cookie);
if (TEvVResultBase *base = dynamic_cast<TEvVResultBase *>(ev)) {
base->FinalizeAndSend(ctx, std::move(event));
} else {
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
index 6c154281eb8..e306630ee2c 100644
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
+++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/blobstorage/vdisk/common/vdisk_lsnmngr.h>
#include <ydb/core/blobstorage/vdisk/common/blobstorage_dblogcutter.h>
#include <ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h>
+#include <ydb/core/base/wilson.h>
#include <library/cpp/monlib/service/pages/templates.h>
using namespace NKikimrServices;
@@ -154,6 +155,7 @@ namespace NKikimr {
ui64 WriteId;
TDiskPart DiskAddr;
static void *Cookie;
+ NWilson::TSpan Span;
friend class TActorBootstrapped<THullHugeBlobWriter>;
@@ -186,6 +188,7 @@ namespace NKikimr {
"Writer: bootstrap: id# %s chunkId# %u offset# %u storedBlobSize# %u "
"writtenSize# %u", HugeSlot.ToString().data(), chunkId, offset,
storedBlobSize, writtenSize));
+ Span.Event("Send_TEvChunkWrite", NWilson::TKeyValueList{{{"ChunkId", chunkId}, {"Offset", offset}, {"WrittenSize", writtenSize}}});
ctx.Send(HugeKeeperCtx->PDiskCtx->PDiskId,
new NPDisk::TEvChunkWrite(HugeKeeperCtx->PDiskCtx->Dsk->Owner,
HugeKeeperCtx->PDiskCtx->Dsk->OwnerRound, chunkId, offset,
@@ -197,12 +200,16 @@ namespace NKikimr {
}
void Handle(NPDisk::TEvChunkWriteResult::TPtr &ev, const TActorContext &ctx) {
+ if (ev->Get()->Status == NKikimrProto::OK) {
+ Span.EndOk();
+ } else {
+ Span.EndError(TStringBuilder() << NKikimrProto::EReplyStatus_Name(ev->Get()->Status));
+ }
CHECK_PDISK_RESPONSE(HugeKeeperCtx->VCtx, ev, ctx);
ctx.Send(NotifyID, new TEvHullHugeWritten(HugeSlot));
- ctx.Send(HugeKeeperCtx->SkeletonId,
- new TEvHullLogHugeBlob(WriteId, Item->LogoBlobId, Item->Ingress, DiskAddr,
- Item->IgnoreBlock, Item->SenderId, Item->Cookie, std::move(Item->Result),
- &Item->ExtraBlockChecks));
+ ctx.Send(HugeKeeperCtx->SkeletonId, new TEvHullLogHugeBlob(WriteId, Item->LogoBlobId, Item->Ingress, DiskAddr,
+ Item->IgnoreBlock, Item->SenderId, Item->Cookie, std::move(Item->Result), &Item->ExtraBlockChecks), 0, 0,
+ Span.GetTraceId());
LOG_DEBUG(ctx, BS_HULLHUGE,
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
"Writer: finish: id# %s diskAddr# %s",
@@ -228,7 +235,8 @@ namespace NKikimr {
const TActorId &notifyID,
const NHuge::THugeSlot &hugeSlot,
std::unique_ptr<TEvHullWriteHugeBlob> item,
- ui64 wId)
+ ui64 wId,
+ NWilson::TTraceId traceId)
: TActorBootstrapped<TThis>()
, HugeKeeperCtx(std::move(hugeKeeperCtx))
, NotifyID(notifyID)
@@ -236,6 +244,7 @@ namespace NKikimr {
, Item(std::move(item))
, WriteId(wId)
, DiskAddr()
+ , Span(TWilson::VDiskInternals, std::move(traceId), "VDisk.HugeBlobKeeper.Write")
{}
};
@@ -527,18 +536,9 @@ namespace NKikimr {
// THullHugeKeeperState
////////////////////////////////////////////////////////////////////////////
struct THullHugeKeeperState {
- //////////////////////////// TMsgQueue /////////////////////////////////
- struct TDestroy {
- static void Destroy(TEvHullWriteHugeBlob *item) {
- delete item;
- }
- };
- typedef TIntrusiveListWithAutoDelete<TEvHullWriteHugeBlob, TDestroy> TMsgQueue;
- //////////////////////////// TMsgQueue /////////////////////////////////
-
ui64 WaitQueueSize = 0;
ui64 WaitQueueByteSize = 0;
- TMsgQueue WaitQueue; // Huge blobs for writing are placed here
+ std::deque<std::unique_ptr<TEvHullWriteHugeBlob::THandle>> WaitQueue;
bool Committing = false;
ui64 FreeUpToLsn = 0; // last value we got from PDisk
@@ -627,27 +627,27 @@ namespace NKikimr {
TActiveActors ActiveActors;
std::unordered_set<ui32> AllocatingChunkPerSlotSize;
- void PutToWaitQueue(std::unique_ptr<TEvHullWriteHugeBlob> item) {
+ void PutToWaitQueue(std::unique_ptr<TEvHullWriteHugeBlob::THandle> item) {
State.WaitQueueSize++;
- State.WaitQueueByteSize += item->ByteSize();
- State.WaitQueue.PushBack(item.release());
+ State.WaitQueueByteSize += item->Get()->ByteSize();
+ State.WaitQueue.push_back(std::move(item));
}
- bool ProcessWrite(TEvHullWriteHugeBlob *ev, const TActorContext& ctx) {
+ bool ProcessWrite(TEvHullWriteHugeBlob::THandle& ev, const TActorContext& ctx, bool fromWaitQueue) {
+ auto& msg = *ev.Get();
NHuge::THugeSlot hugeSlot;
ui32 slotSize;
- if (State.Pers->Heap->Allocate(ev->Data.GetSize(), &hugeSlot, &slotSize)) {
- if (!ev->Empty()) { // remove item from the WaitQueue
+ if (State.Pers->Heap->Allocate(msg.Data.GetSize(), &hugeSlot, &slotSize)) {
+ if (fromWaitQueue) {
--State.WaitQueueSize;
- State.WaitQueueByteSize -= ev->ByteSize();
- ev->Unlink();
+ State.WaitQueueByteSize -= msg.ByteSize();
}
const bool inserted = State.Pers->AllocatedSlots.insert(hugeSlot).second;
Y_VERIFY(inserted);
const ui64 wId = State.LogLsnFifo.Push(HugeKeeperCtx->LsnMngr->GetLsn());
auto aid = ctx.Register(new THullHugeBlobWriter(HugeKeeperCtx, ctx.SelfID, hugeSlot,
- std::unique_ptr<TEvHullWriteHugeBlob>(ev), wId));
+ std::unique_ptr<TEvHullWriteHugeBlob>(ev.Release().Release()), wId, std::move(ev.TraceId)));
ActiveActors.Insert(aid);
return true;
} else if (AllocatingChunkPerSlotSize.insert(slotSize).second) {
@@ -658,7 +658,11 @@ namespace NKikimr {
}
void ProcessQueue(const TActorContext &ctx) {
- State.WaitQueue.ForEach(std::bind(&TThis::ProcessWrite, this, std::placeholders::_1, std::cref(ctx)));
+ auto it = State.WaitQueue.begin();
+ while (it != State.WaitQueue.end() && ProcessWrite(**it, ctx, true)) {
+ ++it;
+ }
+ State.WaitQueue.erase(State.WaitQueue.begin(), it);
}
void FreeChunks(const TActorContext &ctx) {
@@ -739,11 +743,10 @@ namespace NKikimr {
//////////// Event Handlers ////////////////////////////////////
void Handle(TEvHullWriteHugeBlob::TPtr &ev, const TActorContext &ctx) {
- std::unique_ptr<TEvHullWriteHugeBlob> item(ev->Release().Release());
- LOG_DEBUG(ctx, BS_HULLHUGE,
- VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
- "THullHugeKeeper: TEvHullWriteHugeBlob: %s", item->ToString().data()));
- if (ProcessWrite(item.get(), ctx)) {
+ LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
+ "THullHugeKeeper: TEvHullWriteHugeBlob: %s", std::data(ev->Get()->ToString())));
+ std::unique_ptr<TEvHullWriteHugeBlob::THandle> item(ev.Release());
+ if (ProcessWrite(*item, ctx, false)) {
(void)item.release();
} else {
PutToWaitQueue(std::move(item));
diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
index 1876855d797..0eaaca14f56 100644
--- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
+++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
@@ -5,13 +5,14 @@
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_pdiskctx.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_defrag.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
// TEvHullWriteHugeBlob
////////////////////////////////////////////////////////////////////////////
- class TEvHullWriteHugeBlob : public TEventLocal<TEvHullWriteHugeBlob, TEvBlobStorage::EvHullWriteHugeBlob>, public TIntrusiveListItem<TEvHullWriteHugeBlob> {
+ class TEvHullWriteHugeBlob : public TEventLocal<TEvHullWriteHugeBlob, TEvBlobStorage::EvHullWriteHugeBlob> {
public:
const TActorId SenderId;
const ui64 Cookie;
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
index 58c881fb027..714d0e322ae 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp
@@ -150,9 +150,10 @@ namespace NKikimr {
IEventBase *msg,
const TActorId &recipient,
ui64 recipientCookie,
+ NWilson::TTraceId traceId,
ui64 lsn)
{
- Fields->DelayedResponses.Put(msg, recipient, recipientCookie, lsn);
+ Fields->DelayedResponses.Put(msg, recipient, recipientCookie, std::move(traceId), lsn);
}
////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
index 60c68a51114..938230dce42 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h
@@ -84,7 +84,8 @@ namespace NKikimr {
// Request from PDisk to cut the recovery log
void CutRecoveryLog(const TActorContext &ctx, std::unique_ptr<NPDisk::TEvCutLog> msg);
- void PostponeReplyUntilCommitted(IEventBase *msg, const TActorId &recipient, ui64 recipientCookie, ui64 lsn);
+ void PostponeReplyUntilCommitted(IEventBase *msg, const TActorId &recipient, ui64 recipientCookie,
+ NWilson::TTraceId traceId, ui64 lsn);
////////////////////////////////////////////////////////////////////////
// LogoBlobs
@@ -135,7 +136,7 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////
// Blocks
////////////////////////////////////////////////////////////////////////
- using TReplySender = std::function<void (const TActorId &, ui64, IEventBase *)>;
+ using TReplySender = std::function<void (const TActorId &, ui64, NWilson::TTraceId, IEventBase *)>;
THullCheckStatus CheckBlockCmdAndAllocLsn(
ui64 tabletID,
diff --git a/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp.h b/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp.h
index 8d65c474c9b..99c87171716 100644
--- a/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp.h
+++ b/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp.h
@@ -1,6 +1,10 @@
#pragma once
+
#include "defs.h"
+#include <ydb/core/base/wilson.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
+
namespace NKikimr {
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -11,17 +15,23 @@ namespace NKikimr {
///////////////////////////////////////////////////////////////////////////////////////////////////////
class TDelayedResponses {
public:
- using TAction = std::function<void (const TActorId &id, ui64 cookie, IEventBase *msg)>;
-
- void Put(IEventBase *msg, const TActorId &recipient, ui64 recipientCookie, ui64 lsn) {
- Map.emplace(lsn, TValue {recipient, recipientCookie, std::unique_ptr<IEventBase>(msg)});
+ using TAction = std::function<void (const TActorId &id, ui64 cookie, NWilson::TTraceId traceId, IEventBase *msg)>;
+
+ void Put(IEventBase *msg, const TActorId &recipient, ui64 recipientCookie, NWilson::TTraceId traceId, ui64 lsn) {
+ Map.emplace(lsn, TValue{
+ recipient,
+ recipientCookie,
+ NWilson::TSpan(TWilson::VDiskInternals, std::move(traceId), "VDisk.DelayedResponses.Queue"),
+ std::unique_ptr<IEventBase>(msg)
+ });
}
void ConfirmLsn(ui64 lsn, const TAction &action) {
TMap::iterator it = Map.begin();
while (it != Map.end() && it->first <= lsn) {
TValue &v = it->second;
- action(v.Recipient, v.RecipientCookie, v.Msg.release());
+ v.Span.EndOk();
+ action(v.Recipient, v.RecipientCookie, v.Span.GetTraceId(), v.Msg.release());
++it;
}
// remove all traversed elements
@@ -32,6 +42,7 @@ namespace NKikimr {
struct TValue {
TActorId Recipient;
ui64 RecipientCookie;
+ NWilson::TSpan Span;
std::unique_ptr<IEventBase> Msg;
};
diff --git a/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp_ut.cpp b/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp_ut.cpp
index f95ace015a2..29fed34e8d0 100644
--- a/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp_ut.cpp
@@ -10,7 +10,7 @@ namespace NKikimr {
Y_UNIT_TEST(Test) {
TVector<ui64> res;
- auto action = [&res] (const TActorId &actorId, ui64 cookie, IEventBase *msg) {
+ auto action = [&res] (const TActorId &actorId, ui64 cookie, NWilson::TTraceId, IEventBase *msg) {
Y_UNUSED(actorId);
STR << "cookie# " << cookie << "\n";
res.push_back(cookie);
@@ -18,12 +18,12 @@ namespace NKikimr {
};
auto dr = std::make_unique<TDelayedResponses>();
- dr->Put(nullptr, TActorId(), 1, 500);
- dr->Put(nullptr, TActorId(), 2, 500);
- dr->Put(nullptr, TActorId(), 3, 501);
- dr->Put(nullptr, TActorId(), 4, 500);
+ dr->Put(nullptr, TActorId(), 1, {}, 500);
+ dr->Put(nullptr, TActorId(), 2, {}, 500);
+ dr->Put(nullptr, TActorId(), 3, {}, 501);
+ dr->Put(nullptr, TActorId(), 4, {}, 500);
dr->ConfirmLsn(500, action);
- dr->Put(nullptr, TActorId(), 5, 502);
+ dr->Put(nullptr, TActorId(), 5, {}, 502);
dr->ConfirmLsn(501, action);
dr->ConfirmLsn(502, action);
diff --git a/ydb/core/blobstorage/vdisk/query/query_extr.cpp b/ydb/core/blobstorage/vdisk/query/query_extr.cpp
index 6315bd47f28..328f2f207a8 100644
--- a/ydb/core/blobstorage/vdisk/query/query_extr.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_extr.cpp
@@ -345,8 +345,8 @@ namespace NKikimr {
SendResponseAndDie(ctx, this);
} else {
ui8 priority = PDiskPriority();
- std::unique_ptr<IActor> a(Batcher.CreateAsyncDataReader(ctx.SelfID, priority, std::move(Result->TraceId),
- IsRepl(), TActivationContext::Now()));
+ std::unique_ptr<IActor> a(Batcher.CreateAsyncDataReader(ctx.SelfID, priority, /*std::move(Result->TraceId)*/ NWilson::TTraceId(), // FIXME: trace
+ IsRepl()));
if (a) {
auto aid = ctx.Register(a.release());
ActiveActors.Insert(aid);
diff --git a/ydb/core/blobstorage/vdisk/query/query_readactor.cpp b/ydb/core/blobstorage/vdisk/query/query_readactor.cpp
index 9a27de1b3f7..664e57d90bb 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readactor.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_readactor.cpp
@@ -1,5 +1,6 @@
#include "query_readbatch.h"
#include <ydb/core/blobstorage/base/vdisk_priorities.h>
+#include <ydb/core/base/wilson.h>
#include <library/cpp/actors/wilson/wilson_span.h>
#include <util/generic/algorithm.h>
@@ -47,7 +48,7 @@ namespace NKikimr {
// send request
TReplQuoter::QuoteMessage(quoter, std::make_unique<IEventHandle>(Ctx->PDiskCtx->PDiskId, SelfId(),
- msg.release(), 0, 0, nullptr, Span), it->Part.Size);
+ msg.release(), 0, 0, nullptr, Span.GetTraceId()), it->Part.Size);
Counter++;
}
@@ -116,14 +117,13 @@ namespace NKikimr {
std::shared_ptr<TReadBatcherResult> result,
ui8 priority,
NWilson::TTraceId traceId,
- bool isRepl,
- TInstant now)
+ bool isRepl)
: TActorBootstrapped<TTReadBatcherActor>()
, Ctx(ctx)
, NotifyID(notifyID)
, Result(std::move(result))
, Priority(priority)
- , Span(12, NWilson::ERelation::ChildOf, std::move(traceId), now, "VDisk.TReadBatcherActor")
+ , Span(TWilson::VDiskInternals, std::move(traceId), "VDisk.Query.ReadBatcher")
, IsRepl(isRepl)
{}
};
@@ -134,10 +134,9 @@ namespace NKikimr {
std::shared_ptr<TReadBatcherResult> result,
ui8 priority,
NWilson::TTraceId traceId,
- bool isRepl,
- TInstant now)
+ bool isRepl)
{
- return new TTReadBatcherActor(ctx, notifyID, result, priority, std::move(traceId), isRepl, now);
+ return new TTReadBatcherActor(ctx, notifyID, result, priority, std::move(traceId), isRepl);
}
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/query/query_readactor.h b/ydb/core/blobstorage/vdisk/query/query_readactor.h
index daee28aaf6f..614625ee8b3 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readactor.h
+++ b/ydb/core/blobstorage/vdisk/query/query_readactor.h
@@ -11,8 +11,7 @@ namespace NKikimr {
std::shared_ptr<TReadBatcherResult> result,
ui8 priority,
NWilson::TTraceId traceId,
- bool isRepl,
- TInstant now);
+ bool isRepl);
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
index 566af60d43c..8b9ac0caeca 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
+++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp
@@ -218,7 +218,7 @@ namespace NKikimr {
}
IActor *TReadBatcher::CreateAsyncDataReader(const TActorId &notifyID, ui8 priority, NWilson::TTraceId traceId,
- bool isRepl, TInstant now) {
+ bool isRepl) {
if (Result->DiskDataItemPtrs.empty())
return nullptr;
else {
@@ -239,7 +239,7 @@ namespace NKikimr {
PDiskReadBytes += size;
}
// start reader
- return CreateReadBatcherActor(Ctx, notifyID, Result, priority, std::move(traceId), isRepl, now);
+ return CreateReadBatcherActor(Ctx, notifyID, Result, priority, std::move(traceId), isRepl);
}
}
diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.h b/ydb/core/blobstorage/vdisk/query/query_readbatch.h
index fba173b8092..b0592a9a202 100644
--- a/ydb/core/blobstorage/vdisk/query/query_readbatch.h
+++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.h
@@ -329,8 +329,7 @@ namespace NKikimr {
// creates an actor for efficient async data reads, returns nullptr
// if no read required
- IActor *CreateAsyncDataReader(const TActorId &notifyID, ui8 priority, NWilson::TTraceId traceId, bool isRepl,
- TInstant now);
+ IActor *CreateAsyncDataReader(const TActorId &notifyID, ui8 priority, NWilson::TTraceId traceId, bool isRepl);
const TReadBatcherResult &GetResult() const { return *Result; }
ui64 GetPDiskReadBytes() const { return PDiskReadBytes; }
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index f2e2847b698..0e6c0816fe5 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -295,19 +295,22 @@ namespace NKikimr {
}
struct TVPutInfo {
- TRope Buffer = {};
- TLogoBlobID BlobId = {};
- TIngress Ingress = {};
- TLsnSeg Lsn = {};
+ TRope Buffer;
+ TLogoBlobID BlobId;
+ TIngress Ingress;
+ TLsnSeg Lsn;
THullCheckStatus HullStatus;
bool IsHugeBlob = false;
NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks;
+ NWilson::TTraceId TraceId;
TVPutInfo(TLogoBlobID blobId, TRope &&buffer,
- NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks)
+ NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks,
+ NWilson::TTraceId traceId)
: Buffer(std::move(buffer))
, BlobId(blobId)
, HullStatus({NKikimrProto::UNKNOWN, 0, false})
+ , TraceId(std::move(traceId))
{
ExtraBlockChecks.Swap(extraBlockChecks);
}
@@ -318,20 +321,9 @@ namespace NKikimr {
// batched along with other VDisk log entries on the PDisk
}
- TLoggedRecVPut* CreateLoggedRec(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id,
- const TIngress &ingress, TRope &&buffer, std::unique_ptr<TEvBlobStorage::TEvVPutResult> res,
- const TActorId &sender, ui64 cookie)
- {
- return new TLoggedRecVPut(seg, confirmSyncLogAlso, id, ingress, std::move(buffer), std::move(res), sender, cookie);
- }
-
- TLoggedRecVMultiPutItem* CreateLoggedRec(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id,
- const TIngress &ingress, TRope &&buffer, std::unique_ptr<TEvVMultiPutItemResult> res,
- const TActorId &sender, ui64 cookie)
- {
- return new TLoggedRecVMultiPutItem(seg, confirmSyncLogAlso, id, ingress, std::move(buffer), std::move(res),
- sender, cookie);
- }
+ template<typename TEvResult> struct TLoggedRecType {};
+ template<> struct TLoggedRecType<TEvBlobStorage::TEvVPutResult> { using T = TLoggedRecVPut; };
+ template<> struct TLoggedRecType<TEvVMultiPutItemResult> { using T = TLoggedRecVMultiPutItem; };
template <typename TEvResult>
std::unique_ptr<NPDisk::TEvLog> CreatePutLogEvent(const TActorContext &ctx, TString evPrefix, NActors::TActorId sender,
@@ -362,8 +354,8 @@ namespace NKikimr {
UpdatePDiskWriteBytes(dataToWrite.size());
bool confirmSyncLogAlso = static_cast<bool>(syncLogMsg);
- intptr_t loggedRecId = LoggedRecsVault.Put(
- CreateLoggedRec(seg, confirmSyncLogAlso, id, ingress, std::move(buffer), std::move(result), sender, cookie));
+ intptr_t loggedRecId = LoggedRecsVault.Put(new typename TLoggedRecType<TEvResult>::T(seg, confirmSyncLogAlso,
+ id, ingress, std::move(buffer), std::move(result), sender, cookie, std::move(info.TraceId)));
void *loggedRecCookie = reinterpret_cast<void *>(loggedRecId);
// create log msg
auto logMsg = CreateHullUpdate(HullLogCtx, TLogSignature::SignatureLogoBlobOpt, dataToWrite,
@@ -482,7 +474,8 @@ namespace NKikimr {
for (ui64 itemIdx = 0; itemIdx < record.ItemsSize(); ++itemIdx) {
auto &item = *record.MutableItems(itemIdx);
TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID());
- putsInfo.emplace_back(blobId, ev->Get()->GetItemBuffer(itemIdx), item.MutableExtraBlockChecks());
+ putsInfo.emplace_back(blobId, ev->Get()->GetItemBuffer(itemIdx), item.MutableExtraBlockChecks(),
+ item.HasTraceId() ? item.GetTraceId() : NWilson::TTraceId());
TVPutInfo &info = putsInfo.back();
try {
@@ -545,7 +538,6 @@ namespace NKikimr {
std::unique_ptr<NPDisk::TEvMultiLog> evLogs = std::make_unique<NPDisk::TEvMultiLog>();
ui64 cookie = ev->Cookie;
- const NWilson::TTraceId traceId(ev->TraceId);
IActor* vMultiPutActor = CreateSkeletonVMultiPutActor(SelfId(), statuses, oosStatus, ev,
SkeletonFrontIDPtr, IFaceMonGroup->MultiPutResMsgsPtr(), Db->GetVDiskIncarnationGuid());
@@ -567,7 +559,8 @@ namespace NKikimr {
if (info.HullStatus.Postponed) {
auto result = std::make_unique<TEvVMultiPutItemResult>(info.BlobId, itemIdx, status, errorReason);
- Hull->PostponeReplyUntilCommitted(result.release(), vMultiPutActorId, itemIdx, info.HullStatus.Lsn);
+ Hull->PostponeReplyUntilCommitted(result.release(), vMultiPutActorId, itemIdx, std::move(info.TraceId),
+ info.HullStatus.Lsn);
continue;
}
@@ -586,14 +579,15 @@ namespace NKikimr {
new TEvBlobStorage::TEvVPutResult(status, info.BlobId, SelfVDiskId, &itemIdx, oosStatus, now,
vPut.GetCachedByteSize(), &vPut.Record, SkeletonFrontIDPtr, nullptr,
VCtx->Histograms.GetHistogram(handleClass), info.Buffer.GetSize(),
- NWilson::TTraceId(), Db->GetVDiskIncarnationGuid(), errorReason));
+ Db->GetVDiskIncarnationGuid(), errorReason));
if (info.Buffer) {
auto hugeWrite = CreateHullWriteHugeBlob(vMultiPutActorId, cookie, ignoreBlock, handleClass,
info, std::move(result));
- ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, NWilson::TTraceId(traceId));
+ ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(info.TraceId));
} else {
ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(),
- ignoreBlock, vMultiPutActorId, cookie, std::move(result), &info.ExtraBlockChecks));
+ ignoreBlock, vMultiPutActorId, cookie, std::move(result), &info.ExtraBlockChecks), 0, 0,
+ std::move(info.TraceId));
}
} else {
Y_VERIFY(lsnBatch.First <= lsnBatch.Last);
@@ -625,7 +619,7 @@ namespace NKikimr {
std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status.Status, status.ErrorReason, ev, now, SkeletonFrontIDPtr,
SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo));
if (status.Postponed) {
- Hull->PostponeReplyUntilCommitted(res.release(), ev->Sender, ev->Cookie, status.Lsn);
+ Hull->PostponeReplyUntilCommitted(res.release(), ev->Sender, ev->Cookie, std::move(ev->TraceId), status.Lsn);
} else {
SendReply(ctx, std::move(res), ev, BS_VDISK_PUT);
}
@@ -654,7 +648,7 @@ namespace NKikimr {
const TLogoBlobID id = LogoBlobIDFromLogoBlobID(record.GetBlobID());
LWTRACK(VDiskSkeletonVPutRecieved, ev->Get()->Orbit, VCtx->NodeId, VCtx->GroupId,
VCtx->Top->GetFailDomainOrderNumber(VCtx->ShortSelfVDisk), id.TabletID(), id.BlobSize());
- TVPutInfo info(id, ev->Get()->GetBuffer(), record.MutableExtraBlockChecks());
+ TVPutInfo info(id, ev->Get()->GetBuffer(), record.MutableExtraBlockChecks(), std::move(ev->TraceId));
const ui64 bufSize = info.Buffer.GetSize();
try {
@@ -723,10 +717,10 @@ namespace NKikimr {
NKikimrBlobStorage::EPutHandleClass handleClass = record.GetHandleClass();
auto hugeWrite = CreateHullWriteHugeBlob(ev->Sender, ev->Cookie, ignoreBlock, handleClass, info,
std::move(result));
- ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(ev->TraceId));
+ ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(info.TraceId));
} else {
- ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(),
- ignoreBlock, ev->Sender, ev->Cookie, std::move(result), &info.ExtraBlockChecks));
+ ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(), ignoreBlock,
+ ev->Sender, ev->Cookie, std::move(result), &info.ExtraBlockChecks), 0, 0, std::move(info.TraceId));
}
}
@@ -746,7 +740,7 @@ namespace NKikimr {
}
if (status.Postponed) {
Hull->PostponeReplyUntilCommitted(msg->Result.release(), msg->OrigClient, msg->OrigCookie,
- status.Lsn);
+ std::move(ev->TraceId), status.Lsn);
} else {
SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), msg->OrigCookie);
}
@@ -777,8 +771,7 @@ namespace NKikimr {
UpdatePDiskWriteBytes(dataToWrite.size());
// prepare TLoggedRecVPutHuge
bool confirmSyncLogAlso = static_cast<bool>(syncLogMsg);
- intptr_t loggedRecId = LoggedRecsVault.Put(new TLoggedRecVPutHuge(seg, confirmSyncLogAlso,
- Db->HugeKeeperID, ev));
+ intptr_t loggedRecId = LoggedRecsVault.Put(new TLoggedRecVPutHuge(seg, confirmSyncLogAlso, Db->HugeKeeperID, ev));
void *loggedRecCookie = reinterpret_cast<void *>(loggedRecId);
// create log msg
auto logMsg = CreateHullUpdate(HullLogCtx, TLogSignature::SignatureHugeLogoBlob, dataToWrite, seg,
@@ -806,7 +799,7 @@ namespace NKikimr {
std::unique_ptr<NSyncLog::TEvSyncLogPut> syncLogMsg(
new NSyncLog::TEvSyncLogPut(Db->GType, seg.Point(), msg->Id, msg->Ingress));
std::unique_ptr<TEvDelLogoBlobDataSyncLogResult> result(new TEvDelLogoBlobDataSyncLogResult(msg->OrderId, now,
- nullptr, nullptr, NWilson::TTraceId()));
+ nullptr, nullptr));
bool confirmSyncLogAlso = static_cast<bool>(syncLogMsg);
intptr_t loggedRecId = LoggedRecsVault.Put(
@@ -870,8 +863,8 @@ namespace NKikimr {
auto handleClass = ev->Get()->Record.GetHandleClass();
auto result = std::make_unique<TEvBlobStorage::TEvVGetResult>(NKikimrProto::OK, SelfVDiskId, now,
ev->Get()->GetCachedByteSize(), &record, ev->Get()->GetIsLocalMon() ? nullptr : SkeletonFrontIDPtr,
- IFaceMonGroup->GetResMsgsPtr(), VCtx->Histograms.GetHistogram(handleClass), std::move(ev->TraceId),
- cookie, ev->GetChannel(), Db->GetVDiskIncarnationGuid());
+ IFaceMonGroup->GetResMsgsPtr(), VCtx->Histograms.GetHistogram(handleClass), cookie, ev->GetChannel(),
+ Db->GetVDiskIncarnationGuid());
if (record.GetAcquireBlockedGeneration()) {
ui64 tabletId = record.GetTabletId();
if (tabletId) {
@@ -950,7 +943,8 @@ namespace NKikimr {
if (status != NKikimrProto::OK) {
if (postponed) {
- Hull->PostponeReplyUntilCommitted(result.release(), ev->Sender, ev->Cookie, postponeUntilLsn);
+ Hull->PostponeReplyUntilCommitted(result.release(), ev->Sender, ev->Cookie, std::move(ev->TraceId),
+ postponeUntilLsn);
} else {
LOG_DEBUG_S(ctx, BS_VDISK_BLOCK, VCtx->VDiskLogPrefix << "TEvVBlockResult: " << result->ToString()
<< " Marker# BSVS15");
@@ -994,18 +988,18 @@ namespace NKikimr {
if (!SelfVDiskId.SameDisk(record.GetVDiskID())) {
result = std::make_unique<TEvBlobStorage::TEvVGetBlockResult>(NKikimrProto::RACE, tabletId,
SelfVDiskId, now, ev->Get()->GetCachedByteSize(), &ev->Get()->Record, SkeletonFrontIDPtr,
- IFaceMonGroup->GetBlockResMsgsPtr(), nullptr, std::move(ev->TraceId));
+ IFaceMonGroup->GetBlockResMsgsPtr(), nullptr);
} else {
ui32 blockedGen = 0;
bool isBlocked = Hull->GetBlocked(tabletId, &blockedGen);
if (isBlocked) {
result = std::make_unique<TEvBlobStorage::TEvVGetBlockResult>(NKikimrProto::OK, tabletId, blockedGen,
SelfVDiskId, now, ev->Get()->GetCachedByteSize(), &ev->Get()->Record, SkeletonFrontIDPtr,
- IFaceMonGroup->GetBlockResMsgsPtr(), nullptr, std::move(ev->TraceId));
+ IFaceMonGroup->GetBlockResMsgsPtr(), nullptr);
} else {
result = std::make_unique<TEvBlobStorage::TEvVGetBlockResult>(NKikimrProto::NODATA, tabletId,
SelfVDiskId, now, ev->Get()->GetCachedByteSize(), &ev->Get()->Record, SkeletonFrontIDPtr,
- IFaceMonGroup->GetBlockResMsgsPtr(), nullptr, std::move(ev->TraceId));
+ IFaceMonGroup->GetBlockResMsgsPtr(), nullptr);
}
}
@@ -1024,7 +1018,7 @@ namespace NKikimr {
std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status.Status, status.ErrorReason, ev, now,
SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo));
if (status.Postponed) {
- Hull->PostponeReplyUntilCommitted(res.release(), ev->Sender, ev->Cookie, status.Lsn);
+ Hull->PostponeReplyUntilCommitted(res.release(), ev->Sender, ev->Cookie, std::move(ev->TraceId), status.Lsn);
} else {
SendReply(ctx, std::move(res), ev, BS_VDISK_GC);
}
@@ -1108,7 +1102,7 @@ namespace NKikimr {
std::unique_ptr<TEvBlobStorage::TEvVGetBarrierResult> result;
result = std::make_unique<TEvBlobStorage::TEvVGetBarrierResult>(NKikimrProto::OK, SelfVDiskId,
now, ev->Get()->GetCachedByteSize(), &record, SkeletonFrontIDPtr,
- IFaceMonGroup->GetBarrierResMsgsPtr(), nullptr, std::move(ev->TraceId));
+ IFaceMonGroup->GetBarrierResMsgsPtr(), nullptr);
THullDsSnap fullSnap = Hull->GetIndexSnapshot();
fullSnap.LogoBlobsSnap.Destroy();
fullSnap.BlocksSnap.Destroy();
@@ -1166,7 +1160,7 @@ namespace NKikimr {
ReplyError(NKikimrProto::RACE, "group generation mismatch", ev, ctx, now);
} else {
auto result = std::make_unique<TEvBlobStorage::TEvVDbStatResult>(NKikimrProto::OK, SelfVDiskId, now,
- IFaceMonGroup->DbStatResMsgsPtr(), nullptr, std::move(ev->TraceId));
+ IFaceMonGroup->DbStatResMsgsPtr(), nullptr);
THullDsSnap fullSnap = Hull->GetIndexSnapshot();
IActor *actor = CreateDbStatActor(HullCtx, HugeBlobCtx, ctx, std::move(fullSnap),
ctx.SelfID, ev, std::move(result));
@@ -1356,7 +1350,7 @@ namespace NKikimr {
void ReplyError(const NKikimrProto::EReplyStatus status, const TString& /*errorReason*/, TEvLocalSyncData::TPtr &ev,
const TActorContext &ctx, const TInstant &now) {
auto result = std::make_unique<TEvLocalSyncDataResult>(status, now, SyncLogIFaceGroup.LocalSyncResMsgsPtr(),
- nullptr, NWilson::TTraceId());
+ nullptr);
SendReply(ctx, std::move(result), ev, BS_VDISK_OTHER);
}
@@ -1384,7 +1378,7 @@ namespace NKikimr {
#endif
std::unique_ptr<TEvLocalSyncDataResult> result(
new TEvLocalSyncDataResult(NKikimrProto::OK, now, SyncLogIFaceGroup.LocalSyncResMsgsPtr(),
- nullptr, NWilson::TTraceId()));
+ nullptr));
OverloadHandler->ActualizeWeights(ctx, AllEHullDbTypes);
@@ -1427,8 +1421,7 @@ namespace NKikimr {
TEvAnubisOsirisPut::TPtr &ev,
const TActorContext &ctx,
const TInstant &now) {
- std::unique_ptr<IEventBase> res(new TEvAnubisOsirisPutResult(status, now, IFaceMonGroup->PutResMsgsPtr(),
- nullptr, NWilson::TTraceId()));
+ std::unique_ptr<IEventBase> res(new TEvAnubisOsirisPutResult(status, now, IFaceMonGroup->PutResMsgsPtr(), nullptr));
SendReply(ctx, std::move(res), ev, BS_VDISK_PUT);
}
@@ -1451,8 +1444,7 @@ namespace NKikimr {
OverloadHandler->ActualizeWeights(ctx, Mask(EHullDbType::LogoBlobs));
std::unique_ptr<TEvAnubisOsirisPutResult> result(new TEvAnubisOsirisPutResult(NKikimrProto::OK, now,
- (msg->IsAnubis() ? IFaceMonGroup->AnubisPutResMsgsPtr() : IFaceMonGroup->OsirisPutResMsgsPtr()),
- nullptr, NWilson::TTraceId()));
+ (msg->IsAnubis() ? IFaceMonGroup->AnubisPutResMsgsPtr() : IFaceMonGroup->OsirisPutResMsgsPtr()), nullptr));
// log data
TAnubisOsirisPutRecoveryLogRec logRec(*msg);
TString data = logRec.Serialize();
@@ -1541,7 +1533,7 @@ namespace NKikimr {
auto oosStatus = VCtx->GetOutOfSpaceState().GetGlobalStatusFlags();
auto result = std::make_unique<TEvBlobStorage::TEvVPutResult>(NKikimrProto::OK, id, SelfVDiskId, nullptr,
oosStatus, now, 0, nullptr, nullptr, IFaceMonGroup->RecoveredHugeBlobResMsgsPtr(), nullptr, bufSize,
- std::move(ev->TraceId), 0, TString());
+ 0, TString());
// pass the work to huge blob writer
TIngress ingress = *TIngress::CreateIngressWithLocal(VCtx->Top.get(), SelfVDiskId, id);
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h
index bd98ea87bdf..f209e77fca5 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h
@@ -111,7 +111,7 @@ namespace NKikimr {
return std::make_unique<TEvBlobStorage::TEvVMovedPatchResult>(status, originalId, patchedId, vdiskID, cookie,
oosStatus, now, ev->Get()->GetCachedByteSize(), &record, skeletonFrontIDPtr, counterPtr, nullptr,
- std::move(ev->TraceId), vdiskIncarnationGuid, errorReason);
+ vdiskIncarnationGuid, errorReason);
}
static inline std::unique_ptr<TEvBlobStorage::TEvVPatchFoundParts>
@@ -128,7 +128,7 @@ namespace NKikimr {
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
return std::make_unique<TEvBlobStorage::TEvVPatchFoundParts>(status, originalId, patchedId, vdiskID, cookie, now,
- errorReason, &record, skeletonFrontIDPtr, counterPtr, nullptr, std::move(ev->TraceId), vdiskIncarnationGuid);
+ errorReason, &record, skeletonFrontIDPtr, counterPtr, nullptr, vdiskIncarnationGuid);
}
static inline std::unique_ptr<TEvBlobStorage::TEvVPatchResult>
@@ -146,7 +146,7 @@ namespace NKikimr {
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
auto res = std::make_unique<TEvBlobStorage::TEvVPatchResult>(status, originalId, patchedId, vdiskID, cookie,
- now, &record, skeletonFrontIDPtr, counterPtr, nullptr, std::move(ev->TraceId), vdiskIncarnationGuid);
+ now, &record, skeletonFrontIDPtr, counterPtr, nullptr, vdiskIncarnationGuid);
res->SetStatus(status, errorReason);
return res;
}
@@ -160,7 +160,7 @@ namespace NKikimr {
auto &record = ev->Get()->Record;
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
return std::make_unique<TEvBlobStorage::TEvVPatchXorDiffResult>(status, now, &record, skeletonFrontIDPtr,
- counterPtr, nullptr, std::move(ev->TraceId));
+ counterPtr, nullptr);
}
static inline std::unique_ptr<TEvBlobStorage::TEvVPutResult>
@@ -180,7 +180,7 @@ namespace NKikimr {
return std::make_unique<TEvBlobStorage::TEvVPutResult>(status, id, vdiskID, cookie, oosStatus, now,
ev->Get()->GetCachedByteSize(), &record, skeletonFrontIDPtr, counterPtr, histoPtr,
- bufferSizeBytes, std::move(ev->TraceId), vdiskIncarnationGuid, errorReason);
+ bufferSizeBytes, vdiskIncarnationGuid, errorReason);
}
static inline std::unique_ptr<TEvBlobStorage::TEvVBlockResult>
@@ -192,7 +192,7 @@ namespace NKikimr {
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
return std::make_unique<TEvBlobStorage::TEvVBlockResult>(status, actual, vdiskID, now,
ev->Get()->GetCachedByteSize(), &ev->Get()->Record, skeletonFrontIDPtr, counterPtr,
- nullptr, std::move(ev->TraceId), vdiskIncarnationGuid);
+ nullptr, vdiskIncarnationGuid);
}
static inline std::unique_ptr<TEvBlobStorage::TEvVCollectGarbageResult>
@@ -207,7 +207,7 @@ namespace NKikimr {
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
return std::make_unique<TEvBlobStorage::TEvVCollectGarbageResult>(status, tabletId, gen, channel, vdiskID, now,
ev->Get()->GetCachedByteSize(), &record, skeletonFrontIDPtr, counterPtr, nullptr,
- std::move(ev->TraceId), vdiskIncarnationGuid);
+ vdiskIncarnationGuid);
}
////////////////////////////////////////////////////////////////////////////
@@ -272,7 +272,7 @@ namespace NKikimr {
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
auto result = std::make_unique<TEvBlobStorage::TEvVMultiPutResult>(status, vdiskID, cookie, now,
ev->Get()->GetCachedByteSize(), &record, skeletonFrontIDPtr, counterPtr, histoPtr, bufferSizeBytes,
- std::move(ev->TraceId), vdiskIncarnationGuid, errorReason);
+ vdiskIncarnationGuid, errorReason);
Y_VERIFY(record.ItemsSize() == statuses.size());
for (ui64 itemIdx = 0; itemIdx < record.ItemsSize(); ++itemIdx) {
auto &item = record.GetItems(itemIdx);
@@ -317,7 +317,7 @@ namespace NKikimr {
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
auto result = std::make_unique<TEvBlobStorage::TEvVGetResult>(status, vdiskID, now,
ev->Get()->GetCachedByteSize(), &record, skeletonFrontIDPtr, counterPtr, histoPtr,
- std::move(ev->TraceId), cookie, ev->GetChannel(), vdiskIncarnationGuid);
+ cookie, ev->GetChannel(), vdiskIncarnationGuid);
// range query
if (record.HasRangeQuery()) {
const NKikimrBlobStorage::TRangeQuery *q = &record.GetRangeQuery();
@@ -360,7 +360,7 @@ namespace NKikimr {
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
auto result = std::make_unique<TEvBlobStorage::TEvVGetBlockResult>(status, ev->Get()->Record.GetTabletId(),
vdiskID, now, ev->Get()->GetCachedByteSize(), &ev->Get()->Record, skeletonFrontIDPtr, counterPtr,
- nullptr, std::move(ev->TraceId));
+ nullptr);
SetRacingGroupInfo(ev->Get()->Record, result->Record, groupInfo);
return result;
}
@@ -386,8 +386,7 @@ namespace NKikimr {
Y_UNUSED(vdiskIncarnationGuid);
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
auto result = std::make_unique<TEvBlobStorage::TEvVGetBarrierResult>(status,vdiskID, now,
- ev->Get()->GetCachedByteSize(), &ev->Get()->Record, skeletonFrontIDPtr, counterPtr, nullptr,
- std::move(ev->TraceId));
+ ev->Get()->GetCachedByteSize(), &ev->Get()->Record, skeletonFrontIDPtr, counterPtr, nullptr);
SetRacingGroupInfo(ev->Get()->Record, result->Record, groupInfo);
return result;
}
@@ -400,8 +399,7 @@ namespace NKikimr {
{
Y_UNUSED(vdiskIncarnationGuid);
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
- return std::make_unique<TEvBlobStorage::TEvVDbStatResult>(status, vdiskID, now, counterPtr, nullptr,
- std::move(ev->TraceId));
+ return std::make_unique<TEvBlobStorage::TEvVDbStatResult>(status, vdiskID, now, counterPtr, nullptr);
}
static inline std::unique_ptr<IEventBase>
@@ -414,7 +412,7 @@ namespace NKikimr {
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
auto flags = vctx->GetOutOfSpaceState().GetLocalStatusFlags();
return std::make_unique<TEvBlobStorage::TEvVSyncResult>(status, vdiskID, flags, now, counterPtr, nullptr,
- std::move(ev->TraceId), ev->GetChannel());
+ ev->GetChannel());
}
static inline std::unique_ptr<IEventBase>
@@ -427,7 +425,7 @@ namespace NKikimr {
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
ui64 cookie = ev->Get()->Record.GetCookie();
return std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(status, vdiskID, cookie, now, counterPtr, nullptr,
- std::move(ev->TraceId), ev->GetChannel());
+ ev->GetChannel());
}
static inline std::unique_ptr<IEventBase>
@@ -439,7 +437,7 @@ namespace NKikimr {
Y_UNUSED(vdiskIncarnationGuid);
const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev);
return std::make_unique<TEvBlobStorage::TEvVSyncGuidResult>(status, vdiskID, now, counterPtr, nullptr,
- std::move(ev->TraceId), ev->GetChannel());
+ ev->GetChannel());
}
} // NErrBuilder
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
index 0d6e09be6a8..9a1ddd7a9e1 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
@@ -19,9 +19,11 @@
#include <ydb/core/util/queue_inplace.h>
#include <ydb/core/base/counters.h>
+#include <ydb/core/base/wilson.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
#include <library/cpp/monlib/service/pages/templates.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
#include <util/generic/set.h>
#include <util/generic/maybe.h>
@@ -114,9 +116,10 @@ namespace NKikimr {
, ExtQueueId(extQueueId)
, ClientId(clientId)
, ActorId(Ev->Sender)
- , Span(9 /*verbosity*/, NWilson::ERelation::FollowsFrom, std::move(Ev->TraceId), now, "VDisk.PutInQueue")
+ , Span(TWilson::VDiskTopLevel, std::move(Ev->TraceId), "VDisk.SkeletonFront.Queue")
{
Span.Attribute("QueueName", std::move(name));
+ Ev->TraceId = Span.GetTraceId();
}
};
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp
index 8452a571343..71443d172bc 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp
@@ -57,8 +57,7 @@ namespace NKikimr {
if (!SelfVDiskId.SameGroupAndGeneration(SourceVDisk) ||
!SelfVDiskId.SameDisk(TargetVDisk)) {
auto result = std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(NKikimrProto::ERROR, SelfVDiskId,
- Record.GetCookie(), Now, IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, std::move(Ev->TraceId),
- Ev->GetChannel());
+ Record.GetCookie(), Now, IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, Ev->GetChannel());
SendVDiskResponse(ctx, recipient, result.release(), cookie);
Die(ctx);
return;
@@ -73,7 +72,7 @@ namespace NKikimr {
<< " Marker# BSVSFH02");
auto result = std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(NKikimrProto::NODATA, SelfVDiskId,
TSyncState(Db->GetVDiskIncarnationGuid(), DbBirthLsn), Record.GetCookie(), Now,
- IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, std::move(Ev->TraceId), Ev->GetChannel());
+ IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, Ev->GetChannel());
SendVDiskResponse(ctx, recipient, result.release(), cookie);
Die(ctx);
return;
@@ -106,7 +105,7 @@ namespace NKikimr {
TSyncState newSyncState(Db->GetVDiskIncarnationGuid(), syncedLsn);
auto result = std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(NKikimrProto::OK, SelfVDiskId,
newSyncState, Record.GetCookie(), Now, IFaceMonGroup->SyncFullResMsgsPtr(), nullptr,
- std::move(Ev->TraceId), Ev->GetChannel());
+ Ev->GetChannel());
// snapshotLsn is _always_ the last confirmed lsn
THullDsSnap fullSnap = Hull->GetIndexSnapshot();
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp
index 474714864b0..277789353c6 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp
@@ -2,6 +2,7 @@
#include <ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_response.h>
#include <ydb/core/blobstorage/vdisk/common/circlebufresize.h>
+#include <ydb/core/base/wilson.h>
namespace NKikimr {
@@ -25,7 +26,8 @@ namespace NKikimr {
TRope &&buffer,
std::unique_ptr<TEvBlobStorage::TEvVPutResult> result,
const TActorId &recipient,
- ui64 recipientCookie)
+ ui64 recipientCookie,
+ NWilson::TTraceId traceId)
: ILoggedRec(seg, confirmSyncLogAlso)
, Id(id)
, Ingress(ingress)
@@ -33,6 +35,7 @@ namespace NKikimr {
, Result(std::move(result))
, Recipient(recipient)
, RecipientCookie(recipientCookie)
+ , Span(TWilson::VDiskInternals, std::move(traceId), "VDisk.Log.Put")
{}
void TLoggedRecVPut::Replay(THull &hull, const TActorContext &ctx) {
@@ -44,6 +47,7 @@ namespace NKikimr {
<< " msg# " << Result->ToString()
<< " Marker# BSVSLR01");
+ Span.EndOk();
SendVDiskResponse(ctx, Recipient, Result.release(), RecipientCookie);
}
@@ -58,7 +62,8 @@ namespace NKikimr {
TRope &&buffer,
std::unique_ptr<TEvVMultiPutItemResult> result,
const TActorId &recipient,
- ui64 recipientCookie)
+ ui64 recipientCookie,
+ NWilson::TTraceId traceId)
: ILoggedRec(seg, confirmSyncLogAlso)
, Id(id)
, Ingress(ingress)
@@ -66,6 +71,7 @@ namespace NKikimr {
, Result(std::move(result))
, Recipient(recipient)
, RecipientCookie(recipientCookie)
+ , Span(TWilson::VDiskInternals, std::move(traceId), "VDisk.Log.MultiPutItem")
{}
void TLoggedRecVMultiPutItem::Replay(THull &hull, const TActorContext &ctx) {
@@ -78,6 +84,7 @@ namespace NKikimr {
<< " msg# " << Result->ToString()
<< " Marker# BSVSLR02");
+ Span.EndOk();
ctx.Send(Recipient, Result.release(), RecipientCookie);
}
@@ -92,6 +99,7 @@ namespace NKikimr {
: ILoggedRec(seg, confirmSyncLogAlso)
, HugeKeeperId(hugeKeeperId)
, Ev(ev)
+ , Span(TWilson::VDiskInternals, std::move(Ev->TraceId), "VDisk.Log.PutHuge")
{}
void TLoggedRecVPutHuge::Replay(THull &hull, const TActorContext &ctx) {
@@ -107,6 +115,7 @@ namespace NKikimr {
LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PUT, hull.GetHullCtx()->VCtx->VDiskLogPrefix
<< "TEvVPut: realtime# false result# " << msg->Result->ToString()
<< " Marker# BSVSLR03");
+ Span.EndOk();
SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), msg->OrigCookie);
}
@@ -132,7 +141,7 @@ namespace NKikimr {
{}
void TLoggedRecVBlock::Replay(THull &hull, const TActorContext &ctx) {
- auto replySender = [&ctx] (const TActorId &id, ui64 cookie, IEventBase *msg) {
+ auto replySender = [&ctx] (const TActorId &id, ui64 cookie, NWilson::TTraceId, IEventBase *msg) {
SendVDiskResponse(ctx, id, msg, cookie);
};
@@ -183,7 +192,7 @@ namespace NKikimr {
{}
void TLoggedRecLocalSyncData::Replay(THull &hull, const TActorContext &ctx) {
- auto replySender = [&ctx] (const TActorId &id, ui64 cookie, IEventBase *msg) {
+ auto replySender = [&ctx] (const TActorId &id, ui64 cookie, NWilson::TTraceId, IEventBase *msg) {
SendVDiskResponse(ctx, id, msg, cookie);
};
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h
index 34c3c4a5dfb..71d9bbf9a9b 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h
@@ -6,6 +6,7 @@
#include <ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h>
#include <ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h>
#include <ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
namespace NKikimr {
@@ -49,7 +50,7 @@ namespace NKikimr {
public:
TLoggedRecVPut(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id, const TIngress &ingress,
TRope &&buffer, std::unique_ptr<TEvBlobStorage::TEvVPutResult> result, const TActorId &recipient,
- ui64 recipientCookie);
+ ui64 recipientCookie, NWilson::TTraceId traceId);
void Replay(THull &hull, const TActorContext &ctx) override;
private:
@@ -59,6 +60,7 @@ namespace NKikimr {
std::unique_ptr<TEvBlobStorage::TEvVPutResult> Result;
TActorId Recipient;
ui64 RecipientCookie;
+ NWilson::TSpan Span;
};
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -68,7 +70,7 @@ namespace NKikimr {
public:
TLoggedRecVMultiPutItem(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id, const TIngress &ingress,
TRope &&buffer, std::unique_ptr<TEvVMultiPutItemResult> result, const TActorId &recipient,
- ui64 recipientCookie);
+ ui64 recipientCookie, NWilson::TTraceId traceId);
void Replay(THull &hull, const TActorContext &ctx) override;
private:
@@ -78,6 +80,7 @@ namespace NKikimr {
std::unique_ptr<TEvVMultiPutItemResult> Result;
TActorId Recipient;
ui64 RecipientCookie;
+ NWilson::TSpan Span;
};
///////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -92,6 +95,7 @@ namespace NKikimr {
private:
const TActorId HugeKeeperId;
TEvHullLogHugeBlob::TPtr Ev;
+ NWilson::TSpan Span;
};
///////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp
index fbe53c19680..b9e74248389 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp
@@ -3,6 +3,8 @@
#include <ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hullsatisfactionrank.h>
#include <ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h>
#include <ydb/core/util/queue_inplace.h>
+#include <ydb/core/base/wilson.h>
+#include <library/cpp/actors/wilson/wilson_span.h>
namespace NKikimr {
@@ -12,13 +14,17 @@ namespace NKikimr {
class TEmergencyQueue {
struct TItem {
std::unique_ptr<IEventHandle> Ev;
+ NWilson::TSpan Span;
TItem() = default;
template<typename T>
TItem(TAutoPtr<TEventHandle<T>> ev)
: Ev(ev.Release())
- {}
+ , Span(TWilson::VDiskInternals, std::move(Ev->TraceId), "VDisk.Skeleton.EmergencyQueue")
+ {
+ Ev->TraceId = Span.GetTraceId();
+ }
};
// emergency queue of 'put' events
@@ -95,6 +101,7 @@ namespace NKikimr {
auto item = Queue.Head();
Y_VERIFY(item);
TAutoPtr<IEventHandle> ev = item->Ev.release();
+ item->Span.EndOk();
Queue.Pop();
switch (ev->GetTypeRewrite()) {
case TEvBlobStorage::EvVMovedPatch: {
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp
index 34bc4abbc81..3ac5c446dd1 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp
@@ -30,7 +30,6 @@ namespace NKikimr {
TOutOfSpaceStatus OOSStatus;
- NWilson::TTraceId TraceId;
NLWTrace::TOrbit Orbit;
const ui64 IncarnationGuid;
@@ -86,8 +85,7 @@ namespace NKikimr {
TInstant now = TAppData::TimeProvider->Now();
auto vMovedPatchResult = std::make_unique<TEvBlobStorage::TEvVMovedPatchResult>(status, OriginalId,
PatchedId, vdisk, cookie, OOSStatus, now, Event->Get()->GetCachedByteSize(), &record,
- SkeletonFrontIDPtr, MovedPatchResMsgsPtr, nullptr, std::move(TraceId), IncarnationGuid,
- ErrorReason);
+ SkeletonFrontIDPtr, MovedPatchResMsgsPtr, nullptr, IncarnationGuid, ErrorReason);
vMovedPatchResult->Orbit = std::move(Orbit);
if (status == NKikimrProto::ERROR) {
@@ -112,7 +110,6 @@ namespace NKikimr {
void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev, const TActorContext &ctx) {
TEvBlobStorage::TEvGetResult *result = ev->Get();
Orbit = std::move(result->Orbit);
- TraceId = std::move(ev->TraceId);
ui32 patchedIdHash = PatchedId.Hash();
@@ -150,13 +147,12 @@ namespace NKikimr {
NKikimrBlobStorage::UserData, TEvBlobStorage::TEvPut::TacticDefault);
put->Orbit = std::move(Orbit);
- SendToBSProxy(SelfId(), PatchedGroupId, put.release(), OriginalId.Hash(), std::move(Event->TraceId));
+ SendToBSProxy(SelfId(), PatchedGroupId, put.release(), OriginalId.Hash());
}
void Handle(TEvBlobStorage::TEvPutResult::TPtr &ev, const TActorContext &ctx) {
TEvBlobStorage::TEvPutResult *result = ev->Get();
Orbit = std::move(result->Orbit);
- TraceId = std::move(ev->TraceId);
ui32 originalIdHash = OriginalId.Hash();
@@ -182,7 +178,7 @@ namespace NKikimr {
OriginalId.BlobSize(), deadline, NKikimrBlobStorage::AsyncRead);
get->Orbit = std::move(Event->Get()->Orbit);
- SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), std::move(Event->TraceId));
+ SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash());
Become(&TThis::StateWait);
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp
index 581103f6024..e34428b8ede 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp
@@ -80,7 +80,7 @@ namespace NKikimr {
const ui64 bufferSizeBytes = Event->Get()->GetBufferBytes();
auto vMultiPutResult = std::make_unique<TEvBlobStorage::TEvVMultiPutResult>(NKikimrProto::OK, vdisk, cookie,
now, Event->Get()->GetCachedByteSize(), &vMultiPutRecord, SkeletonFrontIDPtr, MultiPutResMsgsPtr,
- nullptr, bufferSizeBytes, std::move(Event->TraceId), IncarnationGuid, TString());
+ nullptr, bufferSizeBytes, IncarnationGuid, TString());
for (ui64 idx = 0; idx < Items.size(); ++idx) {
TItem &result = Items[idx];
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp
index 4fd96a584ee..f28987129d7 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp
@@ -151,8 +151,7 @@ namespace NKikimr::NPrivate {
Y_VERIFY(record.HasCookie());
FoundPartsEvent = std::make_unique<TEvBlobStorage::TEvVPatchFoundParts>(
NKikimrProto::OK, OriginalBlobId, PatchedBlobId, VDiskId, record.GetCookie(), now, ErrorReason, &record,
- SkeletonFrontIDPtr, VPatchFoundPartsMsgsPtr, nullptr,
- std::move(ev->TraceId), IncarnationGuid);
+ SkeletonFrontIDPtr, VPatchFoundPartsMsgsPtr, nullptr, IncarnationGuid);
}
void Bootstrap() {
@@ -425,8 +424,7 @@ namespace NKikimr::NPrivate {
ResultEvent = std::make_unique<TEvBlobStorage::TEvVPatchResult>(
NKikimrProto::OK, TLogoBlobID(OriginalBlobId, OriginalPartId),
TLogoBlobID(PatchedBlobId, PatchedPartId), VDiskId, record.GetCookie(), now,
- &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr,
- std::move(ev->TraceId), IncarnationGuid);
+ &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr, IncarnationGuid);
Sender = ev->Sender;
Cookie = ev->Cookie;
SendVPatchResult(NKikimrProto::ERROR);
@@ -467,8 +465,7 @@ namespace NKikimr::NPrivate {
ResultEvent = std::make_unique<TEvBlobStorage::TEvVPatchResult>(
NKikimrProto::OK, originalPartBlobId, patchedPartBlobId, VDiskId, record.GetCookie(), now,
- &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr,
- std::move(ev->TraceId), IncarnationGuid);
+ &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr, IncarnationGuid);
Sender = ev->Sender;
Cookie = ev->Cookie;
@@ -526,8 +523,7 @@ namespace NKikimr::NPrivate {
NKikimrBlobStorage::TEvVPatchXorDiff &record = ev->Get()->Record;
TInstant now = TActivationContext::Now();
auto resultEvent = std::make_unique<TEvBlobStorage::TEvVPatchXorDiffResult>(
- NKikimrProto::ERROR, now, &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr,
- std::move(ev->TraceId));
+ NKikimrProto::ERROR, now, &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr);
SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), ev->Cookie);
}
@@ -550,7 +546,7 @@ namespace NKikimr::NPrivate {
TInstant now = TActivationContext::Now();
std::unique_ptr<TEvBlobStorage::TEvVPatchXorDiffResult> resultEvent = std::make_unique<TEvBlobStorage::TEvVPatchXorDiffResult>(
- NKikimrProto::OK, now, &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr, std::move(ev->TraceId));
+ NKikimrProto::OK, now, &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr);
if (!CheckDiff(xorDiffs, "XorDiff from datapart")) {
for (auto &[diffs, partId, result, sender, cookie] : ReceivedXorDiffs) {
diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp
index ca54eeec88c..cb9f9708a11 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp
@@ -253,8 +253,7 @@ namespace NKikimr {
UNIT_ASSERT(evVGetRange->Record.HasIndexOnly() && evVGetRange->Record.GetIndexOnly());
std::unique_ptr<TEvBlobStorage::TEvVGetResult> evVGetRangeResult = std::make_unique<TEvBlobStorage::TEvVGetResult>(
vGetStatus, testData.VDiskIds[nodeId], testData.Now, evVGetRange->GetCachedByteSize(), &evVGetRange->Record,
- nullptr, nullptr, nullptr, std::move(handle->TraceId), evVGetRange->Record.GetCookie(),
- handle->GetChannel(), 0);
+ nullptr, nullptr, nullptr, evVGetRange->Record.GetCookie(), handle->GetChannel(), 0);
evVGetRangeResult->AddResult(NKikimrProto::OK, TLogoBlobID(testData.OriginalBlobId, 0));
for (ui8 partId : foundParts) {
@@ -407,8 +406,7 @@ namespace NKikimr {
std::unique_ptr<TEvBlobStorage::TEvVGetResult> evVGetResult = std::make_unique<TEvBlobStorage::TEvVGetResult>(
vGetStatus, testData.VDiskIds[nodeId], testData.Now, evVGet->GetCachedByteSize(), &evVGet->Record,
- nullptr, nullptr, nullptr, std::move(vGetHandle->TraceId), evVGet->Record.GetCookie(),
- vGetHandle->GetChannel(), 0);
+ nullptr, nullptr, nullptr, evVGet->Record.GetCookie(), vGetHandle->GetChannel(), 0);
evVGetResult->AddResult(NKikimrProto::OK, blob.BlobId, 0, blob.Buffer.data(), blob.Buffer.size());
std::unique_ptr<IEventHandle> handle = std::make_unique<IEventHandle>(vPatchActorId, edgeActor, evVGetResult.release());
@@ -437,8 +435,7 @@ namespace NKikimr {
TOutOfSpaceStatus oos = TOutOfSpaceStatus(testData.StatusFlags, testData.ApproximateFreeSpaceShare);
std::unique_ptr<TEvBlobStorage::TEvVPutResult> vPutResult = std::make_unique<TEvBlobStorage::TEvVPutResult>(
vPutStatus, blobId, testData.VDiskIds[nodeId], &cookie, oos, testData.Now,
- 0, &record, nullptr, nullptr, nullptr, vPut->GetBufferBytes(), std::move(handle->TraceId),
- 0, "");
+ 0, &record, nullptr, nullptr, nullptr, vPut->GetBufferBytes(), 0, "");
handle = MakeHolder<IEventHandle>(vPatchActorId, edgeActor, vPutResult.release());
runtime.Send(handle.Release());
@@ -773,7 +770,7 @@ namespace NKikimr {
TActorId patchActor = testData.VPatchActorIds[patchedPartId - 1];
auto handle2 = std::make_unique<IEventHandle>(patchActor, edgeActor, handle->Release().Release(), handle->Flags,
- handle->Cookie, nullptr, std::move(handle->TraceId));
+ handle->Cookie, nullptr);
testData.Runtime.Send(handle2.release());
}
}
diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer.cpp b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer.cpp
index bf0179b4ed9..7cbea4a8fbc 100644
--- a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer.cpp
+++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer.cpp
@@ -364,7 +364,7 @@ namespace NKikimr {
data.Info = info;
// create reply
auto result = std::make_unique<TEvBlobStorage::TEvVSyncGuidResult>(NKikimrProto::OK, selfVDisk,
- TAppData::TimeProvider->Now(), nullptr, nullptr, std::move(ev->TraceId), ev->GetChannel());
+ TAppData::TimeProvider->Now(), nullptr, nullptr, ev->GetChannel());
// put reply into the queue and wait until it would be committed
ui64 seqNum = DelayedQueue.WriteRequest(ev->Sender, std::move(result));
// commit
@@ -378,8 +378,7 @@ namespace NKikimr {
auto guid = data.GetGuid();
// create reply
auto result = std::make_unique<TEvBlobStorage::TEvVSyncGuidResult>(NKikimrProto::OK, selfVDisk,
- TAppData::TimeProvider->Now(), guid, state, nullptr, nullptr, std::move(ev->TraceId),
- ev->GetChannel());
+ TAppData::TimeProvider->Now(), guid, state, nullptr, nullptr, ev->GetChannel());
// put reply into the queue and wait until all required writes are committed
DelayedQueue.ReadRequest(ctx, ev->Sender, std::move(result));
}
diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h
index bf26350068b..1bb55b311f1 100644
--- a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h
+++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h
@@ -57,9 +57,8 @@ namespace NKikimr {
TEvLocalSyncDataResult(NKikimrProto::EReplyStatus status, const TInstant &now,
::NMonitoring::TDynamicCounters::TCounterPtr counterPtr,
- NVDiskMon::TLtcHistoPtr histoPtr, NWilson::TTraceId traceId)
- : TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr, histoPtr,
- std::move(traceId))
+ NVDiskMon::TLtcHistoPtr histoPtr)
+ : TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr, histoPtr)
, Status(status)
{}
};
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp
index 9b7dd49ebb9..868d870ac1a 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp
@@ -140,8 +140,7 @@ namespace NKikimr {
auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(NKikimrProto::RACE, SelfVDiskId,
TSyncState(), true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now,
- SlCtx->CountersMonGroup.VDiskCheckFailedPtr(), nullptr, std::move(ev->TraceId),
- ev->GetChannel());
+ SlCtx->CountersMonGroup.VDiskCheckFailedPtr(), nullptr, ev->GetChannel());
SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie);
return;
}
@@ -160,7 +159,7 @@ namespace NKikimr {
auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(NKikimrProto::BLOCKED, SelfVDiskId,
TSyncState(), true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now,
- SlCtx->CountersMonGroup.DiskLockedPtr(), nullptr, std::move(ev->TraceId), ev->GetChannel());
+ SlCtx->CountersMonGroup.DiskLockedPtr(), nullptr, ev->GetChannel());
SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie);
return;
}
@@ -180,7 +179,7 @@ namespace NKikimr {
TSyncState syncState(VDiskIncarnationGuid, GetDbBirthLsn());
auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(status, SelfVDiskId, syncState,
true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now,
- SlCtx->CountersMonGroup.UnequalGuidPtr(), nullptr, std::move(ev->TraceId), ev->GetChannel());
+ SlCtx->CountersMonGroup.UnequalGuidPtr(), nullptr, ev->GetChannel());
SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie);
return;
}
diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp
index 6cbe05c4ec8..5ce14ae5849 100644
--- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp
+++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp
@@ -167,7 +167,7 @@ namespace NKikimr {
auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(status, SelfVDiskId,
TSyncState(VDiskIncarnationGuid, lsn), finished, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(),
- Now, SlCtx->IFaceMonGroup.SyncReadResMsgsPtr(), nullptr, std::move(Ev->TraceId), Ev->GetChannel());
+ Now, SlCtx->IFaceMonGroup.SyncReadResMsgsPtr(), nullptr, Ev->GetChannel());
if (DiskReads) {
NKikimrBlobStorage::TEvVSyncResult::TStat *stat = nullptr;
stat = result->Record.MutableStat();
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index 110e5dbe088..ad4ed0a2691 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -403,6 +403,8 @@ message TVMultiPutItem {
optional uint64 Cookie = 4;
repeated TEvVPut.TExtraBlockCheck ExtraBlockChecks = 5;
+
+ optional NActorsProto.TTraceId TraceId = 6;
}
message TEvVMultiPut {
diff --git a/ydb/core/yq/libs/audit/CMakeLists.txt b/ydb/core/yq/libs/audit/CMakeLists.txt
index 954f1193322..9bcd97bdd63 100644
--- a/ydb/core/yq/libs/audit/CMakeLists.txt
+++ b/ydb/core/yq/libs/audit/CMakeLists.txt
@@ -11,6 +11,7 @@ add_library(yq-libs-audit)
target_link_libraries(yq-libs-audit PUBLIC
contrib-libs-cxxsupp
yutil
+ cpp-actors-core
)
target_sources(yq-libs-audit PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/audit/yq_audit_service.cpp