diff options
author | alexvru <alexvru@ydb.tech> | 2022-07-28 15:33:09 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-07-28 15:33:09 +0300 |
commit | 0836b1ef7920fc395f848c298b2ea601d83dbab2 (patch) | |
tree | 6c2bb4b2c6dcd32f045c8f8620d68a57e3d1f57a | |
parent | b333b9c0b2519d13f1b00518e4ad398b6c06ace5 (diff) | |
download | ydb-0836b1ef7920fc395f848c298b2ea601d83dbab2.tar.gz |
Mark down DS proxy and VDisk parts to use Wilson tracing
73 files changed, 653 insertions, 478 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_channel.cpp b/library/cpp/actors/interconnect/interconnect_channel.cpp index f839741cb54..fed13946866 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.cpp +++ b/library/cpp/actors/interconnect/interconnect_channel.cpp @@ -18,7 +18,7 @@ namespace NActors { return false; } - const NWilson::TTraceId traceId(event.Span); + auto traceId = event.Span.GetTraceId(); event.Span.EndOk(); LWTRACK(SerializeToPacketEnd, event.Orbit, PeerNodeId, ChannelId, OutputQueueSize, task.GetDataSize()); diff --git a/library/cpp/actors/interconnect/interconnect_channel.h b/library/cpp/actors/interconnect/interconnect_channel.h index 25e996ddcae..8849d19b92c 100644 --- a/library/cpp/actors/interconnect/interconnect_channel.h +++ b/library/cpp/actors/interconnect/interconnect_channel.h @@ -55,12 +55,11 @@ namespace NActors { ~TEventOutputChannel() { } - std::pair<ui32, TEventHolder*> Push(IEventHandle& ev, TInstant now) { + std::pair<ui32, TEventHolder*> Push(IEventHandle& ev) { TEventHolder& event = Pool.Allocate(Queue); const ui32 bytes = event.Fill(ev) + (Params.UseExtendedTraceFmt ? sizeof(TEventDescr2) : sizeof(TEventDescr1)); OutputQueueSize += bytes; - if (auto span = NWilson::TSpan(static_cast<ui8>(15) /*verbosity*/, NWilson::ERelation::ChildOf, - NWilson::TTraceId(ev.TraceId), now, "InInterconnectQueue")) { + if (auto span = NWilson::TSpan(15 /*max verbosity*/, NWilson::TTraceId(ev.TraceId), "Interconnect.Queue")) { event.Span = std::move(span .Attribute("OutputQueueItems", static_cast<i64>(Queue.size())) .Attribute("OutputQueueSize", static_cast<i64>(OutputQueueSize))); diff --git a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp index 0f286153ea5..feb55a16ad6 100644 --- a/library/cpp/actors/interconnect/interconnect_tcp_session.cpp +++ b/library/cpp/actors/interconnect/interconnect_tcp_session.cpp @@ -126,7 +126,7 @@ namespace NActors { auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel); const bool wasWorking = oChannel.IsWorking(); - const auto [dataSize, event] = oChannel.Push(*ev, TActivationContext::Now()); + const auto [dataSize, event] = oChannel.Push(*ev); LWTRACK(ForwardEvent, event->Orbit, Proxy->PeerNodeId, event->Descr.Type, event->Descr.Flags, LWACTORID(event->Descr.Recipient), LWACTORID(event->Descr.Sender), event->Descr.Cookie, event->EventSerializedSize); TotalOutputQueueSize += dataSize; diff --git a/library/cpp/actors/interconnect/packet.h b/library/cpp/actors/interconnect/packet.h index 9161e3af710..c8909c08a76 100644 --- a/library/cpp/actors/interconnect/packet.h +++ b/library/cpp/actors/interconnect/packet.h @@ -138,8 +138,8 @@ struct TEventHolder : TNonCopyable { const TActorId *f = ForwardRecipient ? &ForwardRecipient : nullptr; Span.EndError("nondelivery"); auto ev = Event - ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span) - : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span); + ? std::make_unique<IEventHandle>(r, s, Event.Release(), d.Flags, d.Cookie, f, Span.GetTraceId()) + : std::make_unique<IEventHandle>(d.Type, d.Flags, r, s, std::move(Buffer), d.Cookie, f, Span.GetTraceId()); NActors::TActivationContext::Send(ev->ForwardOnNondelivery(NActors::TEvents::TEvUndelivered::Disconnected, unsure)); } diff --git a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp index 9b47f5b592d..565a511859e 100644 --- a/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp +++ b/library/cpp/actors/interconnect/ut/channel_scheduler_ut.cpp @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ChannelScheduler) { auto ev = MakeHolder<IEventHandle>(1, 0, TActorId(), TActorId(), MakeIntrusive<TEventSerializedData>(payload, false), 0); auto& ch = scheduler.GetOutputChannel(channel); const bool wasWorking = ch.IsWorking(); - ch.Push(*ev, TInstant::Now()); + ch.Push(*ev); if (!wasWorking) { scheduler.AddToHeap(ch, 0); } diff --git a/library/cpp/actors/protos/actors.proto b/library/cpp/actors/protos/actors.proto index ad347dc2907..329eb998dfc 100644 --- a/library/cpp/actors/protos/actors.proto +++ b/library/cpp/actors/protos/actors.proto @@ -7,6 +7,10 @@ message TActorId { required fixed64 RawX2 = 2; } +message TTraceId { + optional bytes Data = 1; +} + message TCallbackException { required TActorId ActorId = 1; required string ExceptionMessage = 2; diff --git a/library/cpp/actors/wilson/CMakeLists.txt b/library/cpp/actors/wilson/CMakeLists.txt index 75c7b16dff8..09a555a1317 100644 --- a/library/cpp/actors/wilson/CMakeLists.txt +++ b/library/cpp/actors/wilson/CMakeLists.txt @@ -12,6 +12,7 @@ target_link_libraries(cpp-actors-wilson PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + cpp-actors-protos actors-wilson-protos ) target_sources(cpp-actors-wilson PRIVATE diff --git a/library/cpp/actors/wilson/wilson_span.cpp b/library/cpp/actors/wilson/wilson_span.cpp index a97184c3f09..c932560254d 100644 --- a/library/cpp/actors/wilson/wilson_span.cpp +++ b/library/cpp/actors/wilson/wilson_span.cpp @@ -53,9 +53,8 @@ namespace NWilson { } void TSpan::Send() { - if (TlsActivationContext) { - TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span))); - } + TActivationContext::Send(new IEventHandle(MakeWilsonUploaderId(), {}, new TEvWilson(&Data->Span))); + Data->Sent = true; } } // NWilson diff --git a/library/cpp/actors/wilson/wilson_span.h b/library/cpp/actors/wilson/wilson_span.h index c2de2f0b68c..3c201abd381 100644 --- a/library/cpp/actors/wilson/wilson_span.h +++ b/library/cpp/actors/wilson/wilson_span.h @@ -1,5 +1,7 @@ #pragma once +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/wilson/protos/trace.pb.h> #include <util/generic/hash.h> #include <util/datetime/cputimer.h> @@ -42,12 +44,17 @@ namespace NWilson { const ui64 StartCycles; const TTraceId TraceId; NTraceProto::Span Span; + bool Sent = false; TData(TInstant startTime, ui64 startCycles, TTraceId traceId) : StartTime(startTime) , StartCycles(startCycles) , TraceId(std::move(traceId)) {} + + ~TData() { + Y_VERIFY_DEBUG(Sent); + } }; std::unique_ptr<TData> Data; @@ -57,94 +64,134 @@ namespace NWilson { TSpan(const TSpan&) = delete; TSpan(TSpan&&) = default; - TSpan(ui8 verbosity, ERelation /*relation*/, TTraceId parentId, TInstant now, std::optional<TString> name) - : Data(parentId ? std::make_unique<TData>(now, GetCycleCount(), parentId.Span(verbosity)) : nullptr) + TSpan(ui8 verbosity, TTraceId parentId, std::optional<TString> name) + : Data(parentId ? std::make_unique<TData>(TInstant::Now(), GetCycleCount(), parentId.Span(verbosity)) : nullptr) { - if (*this) { + if (Y_UNLIKELY(*this)) { if (!parentId.IsRoot()) { Data->Span.set_parent_span_id(parentId.GetSpanIdPtr(), parentId.GetSpanIdSize()); } - Data->Span.set_start_time_unix_nano(now.NanoSeconds()); + Data->Span.set_start_time_unix_nano(Data->StartTime.NanoSeconds()); + Data->Span.set_kind(opentelemetry::proto::trace::v1::Span::SPAN_KIND_INTERNAL); if (name) { Name(std::move(*name)); } + + Attribute("node_id", NActors::TActivationContext::ActorSystem()->NodeId); + } + } + + ~TSpan() { + if (Y_UNLIKELY(*this)) { + EndError("unterminated span"); } } TSpan& operator =(const TSpan&) = delete; - TSpan& operator =(TSpan&&) = default; - operator bool() const { - return static_cast<bool>(Data); + TSpan& operator =(TSpan&& other) { + if (this != &other) { + if (Y_UNLIKELY(*this)) { + EndError("TSpan instance incorrectly overwritten"); + } + Data = std::exchange(other.Data, nullptr); + } + return *this; + } + + explicit operator bool() const { + return Data && !Data->Sent; + } + + TSpan& Relation(ERelation /*relation*/) { + if (Y_UNLIKELY(*this)) { + // update relation in data somehow + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); + } + return *this; } TSpan& Name(TString name) { - if (*this) { + if (Y_UNLIKELY(*this)) { Data->Span.set_name(std::move(name)); + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } return *this; } TSpan& Attribute(TString name, TAttributeValue value) { - if (*this) { + if (Y_UNLIKELY(*this)) { SerializeKeyValue(std::move(name), std::move(value), Data->Span.add_attributes()); + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } return *this; } TSpan& Event(TString name, TKeyValueList attributes) { - if (*this) { + if (Y_UNLIKELY(*this)) { auto *event = Data->Span.add_events(); event->set_time_unix_nano(TimeUnixNano()); event->set_name(std::move(name)); for (auto&& [key, value] : attributes) { SerializeKeyValue(std::move(key), std::move(value), event->add_attributes()); } + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } return *this; } TSpan& Link(const TTraceId& traceId, TKeyValueList attributes) { - if (*this) { + if (Y_UNLIKELY(*this)) { auto *link = Data->Span.add_links(); link->set_trace_id(traceId.GetTraceIdPtr(), traceId.GetTraceIdSize()); link->set_span_id(traceId.GetSpanIdPtr(), traceId.GetSpanIdSize()); for (auto&& [key, value] : attributes) { SerializeKeyValue(std::move(key), std::move(value), link->add_attributes()); } + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } return *this; } void EndOk() { - if (*this) { + if (Y_UNLIKELY(*this)) { auto *status = Data->Span.mutable_status(); status->set_code(NTraceProto::Status::STATUS_CODE_OK); + End(); + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } - End(); } void EndError(TString error) { - if (*this) { + if (Y_UNLIKELY(*this)) { auto *status = Data->Span.mutable_status(); status->set_code(NTraceProto::Status::STATUS_CODE_ERROR); status->set_message(std::move(error)); + End(); + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } - End(); } void End() { - if (*this) { - Data->Span.set_end_time_unix_nano(TimeUnixNano()); + if (Y_UNLIKELY(*this)) { Data->Span.set_trace_id(Data->TraceId.GetTraceIdPtr(), Data->TraceId.GetTraceIdSize()); Data->Span.set_span_id(Data->TraceId.GetSpanIdPtr(), Data->TraceId.GetSpanIdSize()); + Data->Span.set_end_time_unix_nano(TimeUnixNano()); Send(); - Data.reset(); // tracing finished + } else { + Y_VERIFY_DEBUG(!Data, "span has been ended"); } } - operator TTraceId() const { + TTraceId GetTraceId() const { return Data ? TTraceId(Data->TraceId) : TTraceId(); } diff --git a/library/cpp/actors/wilson/wilson_trace.h b/library/cpp/actors/wilson/wilson_trace.h index e3631a2403d..8c1fe05b082 100644 --- a/library/cpp/actors/wilson/wilson_trace.h +++ b/library/cpp/actors/wilson/wilson_trace.h @@ -1,5 +1,8 @@ #pragma once +#include <library/cpp/actors/core/monotonic.h> +#include <library/cpp/actors/protos/actors.pb.h> + #include <library/cpp/string_utils/base64/base64.h> #include <util/stream/output.h> @@ -28,6 +31,11 @@ namespace NWilson { TTraceId(TTrace traceId, ui64 spanId, ui8 verbosity, ui32 timeToLive) : TraceId(traceId) { + if (timeToLive == Max<ui32>()) { + timeToLive = 4095; + } + Y_VERIFY(verbosity <= 15); + Y_VERIFY(timeToLive <= 4095); SpanId = spanId; Verbosity = verbosity; TimeToLive = timeToLive; @@ -86,7 +94,7 @@ namespace NWilson { , Raw(other.Raw) { other.TraceId.fill(0); - other.SpanId = 0; + other.SpanId = 1; // make it explicitly invalid other.Raw = 0; } @@ -95,7 +103,10 @@ namespace NWilson { : TraceId(other.TraceId) , SpanId(other.SpanId) , Raw(other.Raw) - {} + { + // validate trace id only when we are making a copy + other.Validate(); + } TTraceId(const TSerializedTraceId& in) { const char *p = in; @@ -108,6 +119,17 @@ namespace NWilson { Y_VERIFY_DEBUG(p - in == sizeof(TSerializedTraceId)); } + TTraceId(const NActorsProto::TTraceId& pb) + : TTraceId() + { + if (pb.HasData()) { + const auto& data = pb.GetData(); + if (data.size() == sizeof(TSerializedTraceId)) { + *this = *reinterpret_cast<const TSerializedTraceId*>(data.data()); + } + } + } + void Serialize(TSerializedTraceId *out) const { char *p = *out; memcpy(p, TraceId.data(), sizeof(TraceId)); @@ -119,13 +141,21 @@ namespace NWilson { Y_VERIFY_DEBUG(p - *out == sizeof(TSerializedTraceId)); } + void Serialize(NActorsProto::TTraceId *pb) const { + if (*this) { + TSerializedTraceId data; + Serialize(&data); + pb->SetData(reinterpret_cast<const char*>(&data), sizeof(data)); + } + } + TTraceId& operator=(TTraceId&& other) { if (this != &other) { TraceId = other.TraceId; SpanId = other.SpanId; Raw = other.Raw; other.TraceId.fill(0); - other.SpanId = 0; + other.SpanId = 1; // make it explicitly invalid other.Raw = 0; } return *this; @@ -138,11 +168,25 @@ namespace NWilson { return TTraceId(GenerateTraceId(), 0, verbosity, timeToLive); } + static TTraceId NewTraceIdThrottled(ui8 verbosity, ui32 timeToLive, std::atomic<NActors::TMonotonic>& counter, + NActors::TMonotonic now, TDuration periodBetweenSamples) { + static_assert(std::atomic<NActors::TMonotonic>::is_always_lock_free); + for (;;) { + NActors::TMonotonic ts = counter.load(); + if (now < ts) { + return {}; + } else if (counter.compare_exchange_strong(ts, now + periodBetweenSamples)) { + return NewTraceId(verbosity, timeToLive); + } + } + } + static TTraceId NewTraceId() { // NBS stub return TTraceId(); } TTraceId Span(ui8 verbosity) const { + Validate(); return *this && TimeToLive && verbosity <= Verbosity ? TTraceId(TraceId, GenerateSpanId(), Verbosity, TimeToLive - 1) : TTraceId(); @@ -174,10 +218,13 @@ namespace NWilson { const void *GetSpanIdPtr() const { return &SpanId; } static constexpr size_t GetSpanIdSize() { return sizeof(ui64); } + void Validate() const { + Y_VERIFY_DEBUG(*this || !SpanId); + } + // for compatibility with NBS TTraceId Clone() const { return NWilson::TTraceId(*this); } ui64 GetTraceId() const { return 0; } void OutputSpanId(IOutputStream&) const {} }; - } diff --git a/library/cpp/actors/wilson/wilson_uploader.cpp b/library/cpp/actors/wilson/wilson_uploader.cpp index e87776717bb..6b4ef92b812 100644 --- a/library/cpp/actors/wilson/wilson_uploader.cpp +++ b/library/cpp/actors/wilson/wilson_uploader.cpp @@ -1,9 +1,11 @@ #include "wilson_uploader.h" #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> #include <library/cpp/actors/wilson/protos/service.pb.h> #include <library/cpp/actors/wilson/protos/service.grpc.pb.h> #include <util/stream/file.h> +#include <util/string/hex.h> #include <grpc++/grpc++.h> #include <chrono> @@ -32,6 +34,8 @@ namespace NWilson { NServiceProto::ExportTraceServiceResponse Response; grpc::Status Status; + bool WakeupScheduled = false; + public: TWilsonUploader(TString host, ui16 port, TString rootCA) : Host(std::move(host)) @@ -50,6 +54,8 @@ namespace NWilson { .pem_root_certs = TFileInput(RootCA).ReadAll(), })); Stub = NServiceProto::TraceService::NewStub(Channel); + + LOG_INFO_S(*TlsActivationContext, 430, "TWilsonUploader::Bootstrap"); } void Handle(TEvWilson::TPtr ev) { @@ -67,6 +73,17 @@ namespace NWilson { void SendRequest() { Y_VERIFY(!Reader && !Context); Context = std::make_unique<grpc::ClientContext>(); + for (const auto& rs : Request.resource_spans()) { + for (const auto& ss : rs.scope_spans()) { + for (const auto& s : ss.spans()) { + LOG_DEBUG_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, "exporting span" + << " TraceId# " << HexEncode(s.trace_id()) + << " SpanId# " << HexEncode(s.span_id()) + << " ParentSpanId# " << HexEncode(s.parent_span_id()) + << " Name# " << s.name()); + } + } + } Reader = Stub->AsyncExport(Context.get(), std::exchange(Request, {}), &CQ); Reader->Finish(&Response, &Status, nullptr); } @@ -76,18 +93,33 @@ namespace NWilson { void *tag; bool ok; if (CQ.AsyncNext(&tag, &ok, std::chrono::system_clock::now()) == grpc::CompletionQueue::GOT_EVENT) { + if (!Status.ok()) { + LOG_ERROR_S(*TlsActivationContext, 430 /* NKikimrServices::WILSON */, + "failed to commit traces: " << Status.error_message()); + } + Reader.reset(); Context.reset(); if (Request.resource_spans_size()) { SendRequest(); } + } else if (!WakeupScheduled) { + WakeupScheduled = true; + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); } } } + void HandleWakeup() { + Y_VERIFY(WakeupScheduled); + WakeupScheduled = false; + CheckIfDone(); + } + STRICT_STFUNC(StateFunc, hFunc(TEvWilson, Handle); + cFunc(TEvents::TSystem::Wakeup, HandleWakeup); ); }; diff --git a/ydb/core/base/wilson.h b/ydb/core/base/wilson.h new file mode 100644 index 00000000000..1d3f7d2680e --- /dev/null +++ b/ydb/core/base/wilson.h @@ -0,0 +1,14 @@ +#pragma once + +namespace NKikimr { + + struct TWilson { + enum { + BlobStorage = 8, // DS proxy and lower levels + DsProxyInternals = 7, + VDiskTopLevel = 6, + VDiskInternals = 5, + }; + }; + +} // NKikimr diff --git a/ydb/core/blobstorage/backpressure/defs.h b/ydb/core/blobstorage/backpressure/defs.h index d179e1ba76f..6fc9da407f3 100644 --- a/ydb/core/blobstorage/backpressure/defs.h +++ b/ydb/core/blobstorage/backpressure/defs.h @@ -10,6 +10,7 @@ #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> #include <ydb/core/protos/blobstorage.pb.h> #include <ydb/core/base/interconnect_channels.h> +#include <ydb/core/base/wilson.h> #include <library/cpp/actors/core/interconnect.h> #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/actors/core/hfunc.h> diff --git a/ydb/core/blobstorage/backpressure/queue.cpp b/ydb/core/blobstorage/backpressure/queue.cpp index cead08e1b85..e768d2d8417 100644 --- a/ydb/core/blobstorage/backpressure/queue.cpp +++ b/ydb/core/blobstorage/backpressure/queue.cpp @@ -202,7 +202,7 @@ void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& re {"VDiskOrderNumber", vdiskOrderNumber} }}); item.Event.SendToVDisk(ctx, remoteVDisk, item.QueueCookie, item.MsgId, item.SequenceId, sendMeCostSettings, - item.Span, ClientId, item.ProcessingTimer); + item.Span.GetTraceId(), ClientId, item.ProcessingTimer); // update counters as far as item got sent ++NextMsgId; @@ -219,6 +219,8 @@ void TBlobStorageQueue::ReplyWithError(TItem& item, NKikimrProto::EReplyStatus s << " processingTime# " << processingTime); item.Span.EndError(TStringBuilder() << NKikimrProto::EReplyStatus_Name(status) << ": " << errorReason); + item.Span = {}; + ctx.Send(item.Event.GetSender(), item.Event.MakeErrorReply(status, errorReason, QueueDeserializedItems, QueueDeserializedBytes), 0, item.Event.GetCookie()); @@ -249,7 +251,8 @@ bool TBlobStorageQueue::OnResponse(ui64 msgId, ui64 sequenceId, ui64 cookie, TAc it->Event.GetByteSize(), !relevant); InFlightLookup.erase(lookupIt); - it->Span.EndOk(); + auto span = std::exchange(it->Span, {}); + span.EndOk(); EraseItem(Queues.InFlight, it); // unpause execution when InFlight queue gets empty diff --git a/ydb/core/blobstorage/backpressure/queue.h b/ydb/core/blobstorage/backpressure/queue.h index ef0903d4971..a855eef33c9 100644 --- a/ydb/core/blobstorage/backpressure/queue.h +++ b/ydb/core/blobstorage/backpressure/queue.h @@ -59,11 +59,10 @@ class TBlobStorageQueue { const ::NMonitoring::TDynamicCounters::TCounterPtr& serItems, const ::NMonitoring::TDynamicCounters::TCounterPtr& serBytes, const TBSProxyContextPtr& bspctx, ui32 interconnectChannel, - bool local, TInstant now) + bool local) : Queue(EItemQueue::NotSet) , CostEssence(*event->Get()) - , Span(9 /*verbosity*/, NWilson::ERelation::ChildOf, std::move(event->TraceId), now, TStringBuilder() - << "Backpressure(" << TypeName<TEvent>() << ")") + , Span(TWilson::VDiskTopLevel, std::move(event->TraceId), "Backpressure.InFlight") , Event(event, serItems, serBytes, bspctx, interconnectChannel, local) , MsgId(Max<ui64>()) , SequenceId(0) @@ -71,7 +70,11 @@ class TBlobStorageQueue { , QueueCookie(RandomNumber<ui64>()) , Cost(0) , DirtyCost(true) - {} + { + Span + .Attribute("event", TypeName<TEvent>()) + .Attribute("local", local); + } ~TItem() { Y_VERIFY(Queue == EItemQueue::NotSet, "Queue# %" PRIu32, ui32(Queue)); @@ -197,13 +200,13 @@ public: void OnConnect(); template<typename TPtr> - void Enqueue(const TActorContext &ctx, TPtr& event, TInstant deadline, bool local, TInstant now) { + void Enqueue(const TActorContext &ctx, TPtr& event, TInstant deadline, bool local) { Y_UNUSED(ctx); TItemList::iterator newIt; if (Queues.Unused.empty()) { newIt = Queues.Waiting.emplace(Queues.Waiting.end(), event, deadline, - QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local, now); + QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local); ++*QueueSize; } else { newIt = Queues.Unused.begin(); @@ -212,7 +215,7 @@ public: TItem& item = *newIt; item.~TItem(); new(&item) TItem(event, deadline, QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, - InterconnectChannel, local, now); + InterconnectChannel, local); } newIt->Iterator = newIt; diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp index 84337e1eb4f..43625bd5ff4 100644 --- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp +++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp @@ -201,7 +201,7 @@ private: << " cookie# " << ev->Cookie); if (IsReady()) { - Queue.Enqueue(ctx, ev, deadline, RemoteVDisk.NodeId() == SelfId().NodeId(), TActivationContext::Now()); + Queue.Enqueue(ctx, ev, deadline, RemoteVDisk.NodeId() == SelfId().NodeId()); Pump(ctx); UpdateRequestTrackingStats(ctx); } else { diff --git a/ydb/core/blobstorage/backpressure/ut_client/skeleton_front_mock.h b/ydb/core/blobstorage/backpressure/ut_client/skeleton_front_mock.h index 290c529ea56..66805e3a1dd 100644 --- a/ydb/core/blobstorage/backpressure/ut_client/skeleton_front_mock.h +++ b/ydb/core/blobstorage/backpressure/ut_client/skeleton_front_mock.h @@ -79,7 +79,7 @@ public: Reply(*ev, new TEvBlobStorage::TEvVPutResult(NKikimrProto::NOTREADY, blobId, vdiskId, record.HasCookie() ? &cookie : nullptr, TOutOfSpaceStatus(0, 0), TActivationContext::Now(), ev->Get()->GetCachedByteSize(), - &record, nullptr, nullptr, nullptr, record.GetBuffer().size(), {}, 0, TString())); + &record, nullptr, nullptr, nullptr, record.GetBuffer().size(), 0, TString())); if (record.GetNotifyIfNotReady()) { Notify.insert(ev->Sender); } @@ -91,7 +91,7 @@ public: std::unique_ptr<TEvBlobStorage::TEvVPutResult> reply(new TEvBlobStorage::TEvVPutResult(NKikimrProto::OK, blobId, vdiskId, record.HasCookie() ? &cookie : nullptr, TOutOfSpaceStatus(0, 0), TActivationContext::Now(), ev->Get()->GetCachedByteSize(), - &record, nullptr, nullptr, nullptr, record.GetBuffer().size(), {}, 0, TString())); + &record, nullptr, nullptr, nullptr, record.GetBuffer().size(), 0, TString())); auto *qos = reply->Record.MutableMsgQoS(); LOG_DEBUG_S(*TlsActivationContext, NActorsServices::TEST, "Received " << SingleLineProto(*qos)); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index 451e7fa3154..78e7039aa67 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -18,6 +18,7 @@ #include <library/cpp/actors/wilson/wilson_span.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/group_stat.h> +#include <ydb/core/base/wilson.h> #include <library/cpp/containers/stack_vector/stack_vec.h> #include <util/generic/hash_set.h> @@ -146,7 +147,7 @@ public: , Mon(std::move(mon)) , PoolCounters(storagePoolCounters) , LogCtx(logComponent, logAccEnabled) - , Span(8 /*verbosity*/, NWilson::ERelation::ChildOf, std::move(traceId), now, std::move(name)) + , Span(TWilson::BlobStorage, std::move(traceId), std::move(name)) , RestartCounter(restartCounter) , Source(source) , Cookie(cookie) @@ -261,7 +262,7 @@ public: Y_VERIFY_DEBUG(RestartCounter < 100); auto q = self.RestartQuery(RestartCounter + 1); ++*Mon->NodeMon->RestartHisto[Min<size_t>(Mon->NodeMon->RestartHisto.size() - 1, RestartCounter)]; - TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, Span)); + TActivationContext::Send(new IEventHandle(nodeWardenId, Source, q.release(), 0, Cookie, &proxyId, Span.GetTraceId())); PassAway(); return true; } @@ -321,7 +322,7 @@ public: if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus>) { event->MessageRelevanceTracker = MessageRelevanceTracker; } - const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, Span, + const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, Span.GetTraceId(), timeStatsEnabled); ++RequestsInFlight; } @@ -407,7 +408,8 @@ public: TActorBootstrapped<TDerived>::PassAway(); } - void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, ui64 cookie) { + void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats, TActorId source, ui64 cookie, + bool term = true) { const TInstant now = TActivationContext::Now(); NKikimrProto::EReplyStatus status; @@ -457,14 +459,16 @@ public: static_cast<TEvBlobStorage::TEvGetResult&>(*ev).Sent = now; } - if (status == NKikimrProto::OK) { - Span.EndOk(); - } else { - Span.EndError(std::move(errorReason)); + if (term) { + if (status == NKikimrProto::OK) { + Span.EndOk(); + } else { + Span.EndError(std::move(errorReason)); + } } // send the reply to original request sender - Derived().Send(source, ev.release(), 0, cookie, Span); + Derived().Send(source, ev.release(), 0, cookie); }; void SendResponse(std::unique_ptr<IEventBase>&& ev, TBlobStorageGroupProxyTimeStats *timeStats = nullptr) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index 344fde247ef..b093cfb2e3a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -347,10 +347,10 @@ void TGroupDiskRequests::AddGet(const ui32 diskOrderNumber, const TLogoBlobID &i void TGroupDiskRequests::AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer, TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, - ui8 blobIdx) { + NWilson::TSpan *span, ui8 blobIdx) { Y_VERIFY(diskOrderNumber < DiskRequestsForOrderNumber.size()); DiskRequestsForOrderNumber[diskOrderNumber].PutsToSend.emplace_back(id, buffer, putReason, isHandoff, - extraBlockChecks, blobIdx); + extraBlockChecks, span, blobIdx); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -367,11 +367,7 @@ void TBlackboard::AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize) { ui64 size = (inSize ? Min(ui64(inSize), maxSize) : maxSize); //Cerr << "size " << size << " shift " << shift << Endl; if (size > 0) { - TBlobState &state = BlobStates[id];; - if (!bool(state.Id)) { - state.Init(id, *Info); - } - state.AddNeeded(shift, size); + (*this)[id].AddNeeded(shift, size); } else { TStringStream str; str << "It is impossible to read 0 bytes! Do not send such requests."; @@ -380,30 +376,18 @@ void TBlackboard::AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize) { } } -void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData, - std::vector<std::pair<ui64, ui32>> *extraBlockChecks) { +void TBlackboard::AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData) { Y_VERIFY(bool(id)); Y_VERIFY(id.PartId() == 0); Y_VERIFY(id.BlobSize() != 0); - TBlobState &state = BlobStates[id]; - if (!state.Id) { - state.Init(id, *Info); - state.ExtraBlockChecks = extraBlockChecks; - } else { - Y_VERIFY(state.ExtraBlockChecks == extraBlockChecks); - } - state.AddPartToPut(partIdx, partData); + (*this)[id].AddPartToPut(partIdx, partData); } void TBlackboard::MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx) { Y_VERIFY(bool(id)); Y_VERIFY(id.PartId() == 0); Y_VERIFY(id.BlobSize() != 0); - TBlobState &state = BlobStates[id];; - if (!bool(state.Id)) { - state.Init(id, *Info); - } - state.MarkBlobReadyToPut(blobIdx); + (*this)[id].MarkBlobReadyToPut(blobIdx); } void TBlackboard::MoveBlobStateToDone(const TLogoBlobID &id) { @@ -581,6 +565,30 @@ void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, T } } +void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, + NWilson::TSpan *span) { + TBlobState& state = (*this)[id]; + if (!state.ExtraBlockChecks) { + state.ExtraBlockChecks = extraBlockChecks; + } else { + Y_VERIFY(state.ExtraBlockChecks == extraBlockChecks); + } + if (!state.Span) { + state.Span = span; + } else { + Y_VERIFY(state.Span == span); + } +} + +TBlobState& TBlackboard::operator [](const TLogoBlobID& id) { + const auto [it, inserted] = BlobStates.try_emplace(id); + TBlobState& state = it->second; + if (inserted) { + state.Init(id, *Info); + } + return state; +} + TString TBlackboard::ToString() const { TStringStream str; str << "{BlobStates size# " << BlobStates.size(); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index cbffa37eb0c..cfd0eb64999 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -82,6 +82,7 @@ struct TBlobState { bool IsChanged = false; bool IsDone = false; std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks = nullptr; + NWilson::TSpan *Span = nullptr; void Init(const TLogoBlobID &id, const TBlobStorageGroupInfo &Info); void AddNeeded(ui64 begin, ui64 size); @@ -129,15 +130,17 @@ struct TDiskPutRequest { bool IsHandoff; ui8 BlobIdx; std::vector<std::pair<ui64, ui32>> *ExtraBlockChecks; + NWilson::TSpan *Span; TDiskPutRequest(const TLogoBlobID &id, TString buffer, EPutReason reason, bool isHandoff, - std::vector<std::pair<ui64, ui32>> *extraBlockChecks, ui8 blobIdx = 0) + std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span, ui8 blobIdx) : Id(id) , Buffer(std::move(buffer)) , Reason(reason) , IsHandoff(isHandoff) , BlobIdx(blobIdx) , ExtraBlockChecks(extraBlockChecks) + , Span(span) {} }; @@ -156,7 +159,7 @@ struct TGroupDiskRequests { void AddGet(const ui32 diskOrderNumber, const TLogoBlobID &id, const ui32 shift, const ui32 size); void AddPut(const ui32 diskOrderNumber, const TLogoBlobID &id, TString buffer, TDiskPutRequest::EPutReason putReason, bool isHandoff, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, - ui8 blobIdx = 0); + NWilson::TSpan *span, ui8 blobIdx); }; struct TBlackboard; @@ -199,7 +202,7 @@ struct TBlackboard { {} void AddNeeded(const TLogoBlobID &id, ui32 inShift, ui32 inSize); - void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData, std::vector<std::pair<ui64, ui32>> *extraBlockChecks); + void AddPartToPut(const TLogoBlobID &id, ui32 partIdx, TString &partData); void MarkBlobReadyToPut(const TLogoBlobID &id, ui8 blobIdx = 0); void MoveBlobStateToDone(const TLogoBlobID &id); void AddResponseData(const TLogoBlobID &id, ui32 orderNumber, ui32 shift, TString &data); @@ -220,6 +223,10 @@ struct TBlackboard { blob.IsChanged = true; } } + + void RegisterBlobForPut(const TLogoBlobID& id, std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TSpan *span); + + TBlobState& operator [](const TLogoBlobID& id); }; }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp index dad2ecd745f..16a4730e535 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp @@ -602,7 +602,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor<TB getRequest->IsInternal = true; getRequest->TabletId = TabletId; getRequest->AcquireBlockedGeneration = true; - bool isSent = SendToBSProxy(SelfId(), Info->GroupID, getRequest.release(), 0, Span); + bool isSent = SendToBSProxy(SelfId(), Info->GroupID, getRequest.release(), 0, Span.GetTraceId()); Y_VERIFY(isSent); TotalSent++; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index 9729b909811..283f6049d5e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -353,7 +353,7 @@ void TGetImpl::PrepareVPuts(TLogContext &logCtx, } bytes += put.Buffer.size(); lastItemCount++; - vMultiPut->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks); + vMultiPut->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks, NWilson::TTraceId()); // FIXME: trace } vMultiPut->Record.SetCookie(TVMultiPutCookie(diskOrderNumber, lastItemCount, VMultiPutRequests)); ++VMultiPutRequests; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp index dbf547ae745..959f64ff16b 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp @@ -2,6 +2,8 @@ namespace NKikimr { + std::atomic<TMonotonic> TBlobStorageGroupProxy::ThrottlingTimestamp; + TBlobStorageGroupProxy::TBlobStorageGroupProxy(TIntrusivePtr<TBlobStorageGroupInfo>&& info, bool forceWaitAllDrives, TIntrusivePtr<TDsProxyNodeMon> &nodeMon, TIntrusivePtr<TStoragePoolCounters>&& storagePoolCounters, const TControlWrapper &enablePutBatching, const TControlWrapper &enableVPatch) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h index 324f7d1d1f7..e03405a42c3 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h @@ -52,6 +52,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> } }; + static std::atomic<TMonotonic> ThrottlingTimestamp; + const ui32 GroupId; TIntrusivePtr<TBlobStorageGroupInfo> Info; std::shared_ptr<TBlobStorageGroupInfo::TTopology> Topology; @@ -219,7 +221,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy> if (rate) { const ui64 num = RandomNumber<ui64>(1000000); // in range [0, 1000000) if (num < rate) { - ev->TraceId = NWilson::TTraceId::NewTraceId(15, 4095); + ev->TraceId = NWilson::TTraceId::NewTraceIdThrottled(15, Max<ui32>(), ThrottlingTimestamp, + TMonotonic::Now(), TDuration::Seconds(1)); } } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp index 80405d83b15..fa59aff5c1e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_multicollect.cpp @@ -155,7 +155,7 @@ public: R_LOG_DEBUG_S("BPMC3", "SendRequest idx# " << idx << " isLast# " << isLast << " ev# " << ev->ToString()); - SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span); + SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span.GetTraceId()); if (isLast) { CollectRequestsInFlight++; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp index dd4ab92d589..7b73f153a35 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_multiget.cpp @@ -122,7 +122,7 @@ public: void SendRequests() { for (; RequestsInFlight < MaxRequestsInFlight && !PendingGets.empty(); ++RequestsInFlight, PendingGets.pop_front()) { auto& [ev, cookie] = PendingGets.front(); - SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span); + SendToBSProxy(SelfId(), Info->GroupID, ev.release(), cookie, Span.GetTraceId()); } if (!RequestsInFlight && PendingGets.empty()) { auto ev = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, 0, Info->GroupID); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp index b2462144952..9dbba7dd853 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp @@ -172,7 +172,7 @@ public: std::unique_ptr<TEvBlobStorage::TEvPut> put = std::make_unique<TEvBlobStorage::TEvPut>(PatchedId, Buffer, Deadline, NKikimrBlobStorage::AsyncBlob, TEvBlobStorage::TEvPut::TacticDefault); put->Orbit = std::move(Orbit); - Send(ProxyActorId, put.release(), 0, OriginalId.Hash(), Span); + Send(ProxyActorId, put.release(), 0, OriginalId.Hash(), Span.GetTraceId()); } void Handle(TEvBlobStorage::TEvPutResult::TPtr &ev) { @@ -528,9 +528,9 @@ public: NKikimrBlobStorage::AsyncRead); get->Orbit = std::move(Orbit); if (OriginalGroupId == Info->GroupID) { - Send(ProxyActorId, get.release(), 0, PatchedId.Hash(), Span); + Send(ProxyActorId, get.release(), 0, PatchedId.Hash(), Span.GetTraceId()); } else { - SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), Span); + SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), Span.GetTraceId()); } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index e2b717d13f6..db13013f02b 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -36,38 +36,12 @@ struct TEvAccelerate : public TEventLocal<TEvAccelerate, TEvBlobStorage::EvAccel //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobStorageGroupPutRequest> { - struct TMultiPutItemInfo { - TLogoBlobID BlobId; - TString Buffer; - ui64 BufferSize; - TActorId Recipient; - ui64 Cookie; - NWilson::TTraceId TraceId; - NLWTrace::TOrbit Orbit; - bool Replied = false; - std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; - - TMultiPutItemInfo(TLogoBlobID id, const TString& buffer, TActorId recipient, ui64 cookie, - NWilson::TTraceId traceId, NLWTrace::TOrbit &&orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks) - : BlobId(id) - , Buffer(buffer) - , BufferSize(buffer.size()) - , Recipient(recipient) - , Cookie(cookie) - , TraceId(std::move(traceId)) - , Orbit(std::move(orbit)) - , ExtraBlockChecks(std::move(extraBlockChecks)) - {} - }; - TPutImpl PutImpl; TRootCause RootCauseTrack; TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount; ui64 WaitingVDiskCount = 0; - TBatchedVec<TMultiPutItemInfo> ItemsInfo; - bool IsManyPuts = false; TInstant Deadline; @@ -164,10 +138,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt } WaitingVDiskResponseCount[vdisk]--; - Y_VERIFY(idx < ItemsInfo.size()); - Y_VERIFY(origBlobId == ItemsInfo[idx].BlobId); + Y_VERIFY(idx < PutImpl.Blobs.size()); + Y_VERIFY(origBlobId == PutImpl.Blobs[idx].BlobId); if (TimeStatsEnabled && record.GetMsgQoS().HasExecTimeStats()) { - TimeStats.ApplyPut(ItemsInfo[idx].BufferSize, record.GetMsgQoS().GetExecTimeStats()); + TimeStats.ApplyPut(PutImpl.Blobs[idx].BufferSize, record.GetMsgQoS().GetExecTimeStats()); } Y_VERIFY(record.HasVDiskID()); @@ -314,7 +288,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt } Y_VERIFY(RequestsSent > ResponsesReceived, "RequestsSent# %" PRIu64 " ResponsesReceived# %" PRIu64 " ResponsesSent# %" PRIu64 " BlobsCount# %" PRIu64 " TPutImpl# %s", ui64(RequestsSent), ui64(ResponsesReceived), - (ui64)ResponsesSent, (ui64)ItemsInfo.size(), PutImpl.DumpFullState().data()); + (ui64)ResponsesSent, (ui64)PutImpl.Blobs.size(), PutImpl.DumpFullState().data()); if (!IsAccelerateScheduled && !IsAccelerated) { if (WaitingVDiskCount == 1 && RequestsSent > 1) { @@ -343,10 +317,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt bool ReplyAndDieWithLastResponse(TPutImpl::TPutResultVec &putResults) { for (auto& [blobIdx, result] : putResults) { - Y_VERIFY(ResponsesSent != ItemsInfo.size()); + Y_VERIFY(ResponsesSent != PutImpl.Blobs.size()); SendReply(result, blobIdx); } - if (ResponsesSent == ItemsInfo.size()) { + if (ResponsesSent == PutImpl.Blobs.size()) { PassAway(); return true; } @@ -357,14 +331,14 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt NKikimrProto::EReplyStatus status = putResult->Status; A_LOG_LOG_S(false, status == NKikimrProto::OK ? NLog::PRI_DEBUG : NLog::PRI_NOTICE, "BPP21", "SendReply putResult# " << putResult->ToString() << " ResponsesSent# " << ResponsesSent - << " ItemsInfo.size# " << ItemsInfo.size() - << " Last# " << (ResponsesSent + 1 == ItemsInfo.size() ? "true" : "false")); + << " PutImpl.Blobs.size# " << PutImpl.Blobs.size() + << " Last# " << (ResponsesSent + 1 == PutImpl.Blobs.size() ? "true" : "false")); const TDuration duration = TActivationContext::Now() - StartTime; TLogoBlobID blobId = putResult->Id; TLogoBlobID origBlobId = TLogoBlobID(blobId, 0); - Mon->CountPutPesponseTime(Info->GetDeviceType(), HandleClass, ItemsInfo[blobIdx].BufferSize, duration); + Mon->CountPutPesponseTime(Info->GetDeviceType(), HandleClass, PutImpl.Blobs[blobIdx].BufferSize, duration); *Mon->ActivePutCapacity -= ReportedBytes; - Y_VERIFY(PutImpl.GetHandoffPartsSent() <= Info->Type.TotalPartCount() * MaxHandoffNodes * ItemsInfo.size()); + Y_VERIFY(PutImpl.GetHandoffPartsSent() <= Info->Type.TotalPartCount() * MaxHandoffNodes * PutImpl.Blobs.size()); ++*Mon->HandoffPartsSent[Min(Mon->HandoffPartsSent.size() - 1, (size_t)PutImpl.GetHandoffPartsSent())]; ReportedBytes = 0; const bool success = (status == NKikimrProto::OK); @@ -374,27 +348,32 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt blobId.TabletID(), Info->GroupID, blobId.Channel(), NKikimrBlobStorage::EPutHandleClass_Name(HandleClass), success); ResponsesSent++; - Y_VERIFY(ResponsesSent <= ItemsInfo.size()); - RootCauseTrack.RenderTrack(ItemsInfo[blobIdx].Orbit); - LWTRACK(DSProxyPutReply, ItemsInfo[blobIdx].Orbit); - putResult->Orbit = std::move(ItemsInfo[blobIdx].Orbit); + Y_VERIFY(ResponsesSent <= PutImpl.Blobs.size()); + RootCauseTrack.RenderTrack(PutImpl.Blobs[blobIdx].Orbit); + LWTRACK(DSProxyPutReply, PutImpl.Blobs[blobIdx].Orbit); + putResult->Orbit = std::move(PutImpl.Blobs[blobIdx].Orbit); if (!IsManyPuts) { SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr); } else { + if (putResult->Status == NKikimrProto::OK) { + PutImpl.Blobs[blobIdx].Span.EndOk(); + } else { + PutImpl.Blobs[blobIdx].Span.EndError(putResult->ErrorReason); + } SendResponse(std::move(putResult), TimeStatsEnabled ? &TimeStats : nullptr, - ItemsInfo[blobIdx].Recipient, ItemsInfo[blobIdx].Cookie); // FIXME about traces - ItemsInfo[blobIdx].Replied = true; + PutImpl.Blobs[blobIdx].Recipient, PutImpl.Blobs[blobIdx].Cookie, false); + PutImpl.Blobs[blobIdx].Replied = true; } } TString BlobIdSequenceToString() const { TStringBuilder blobIdsStr; blobIdsStr << '['; - for (ui64 blobIdx = 0; blobIdx < ItemsInfo.size(); ++blobIdx) { + for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) { if (blobIdx) { blobIdsStr << ' '; } - blobIdsStr << ItemsInfo[blobIdx].BlobId.ToString(); + blobIdsStr << PutImpl.Blobs[blobIdx].BlobId.ToString(); } return blobIdsStr << ']'; } @@ -402,7 +381,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt std::unique_ptr<IEventBase> RestartQuery(ui32 counter) { ++*Mon->NodeMon->RestartPut; auto ev = std::make_unique<TEvBlobStorage::TEvBunchOfEvents>(); - for (auto& item : ItemsInfo) { + for (auto& item : PutImpl.Blobs) { if (item.Replied) { continue; } @@ -419,7 +398,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt 0 /*flags*/, item.Cookie, nullptr /*forwardOnNondelivery*/, - std::move(item.TraceId) + item.Span.GetTraceId() )); put->RestartCounter = counter; } @@ -450,7 +429,7 @@ public: : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId), NKikimrServices::BS_PROXY_PUT, false, latencyQueueKind, now, storagePoolCounters, ev->RestartCounter, "DSProxy.Put") - , PutImpl(info, state, ev, mon, enableRequestMod3x3ForMinLatecy) + , PutImpl(info, state, ev, mon, enableRequestMod3x3ForMinLatecy, source, cookie, Span.GetTraceId()) , WaitingVDiskResponseCount(info->GetTotalVDisksNum()) , Deadline(ev->Deadline) , StartTime(now) @@ -467,11 +446,7 @@ public: if (ev->Orbit.HasShuttles()) { RootCauseTrack.IsOn = true; } - ItemsInfo.emplace_back(ev->Id, ev->Buffer, source, cookie, NWilson::TTraceId(), std::move(ev->Orbit), - std::move(ev->ExtraBlockChecks)); - LWPROBE(DSProxyBlobPutTactics, ItemsInfo[0].BlobId.TabletID(), Info->GroupID, ItemsInfo[0].BlobId.ToString(), - Tactic, NKikimrBlobStorage::EPutHandleClass_Name(HandleClass)); - ReportBytes(ItemsInfo[0].Buffer.capacity() + sizeof(*this)); + ReportBytes(PutImpl.Blobs[0].Buffer.capacity() + sizeof(*this)); RequestBytes = ev->Buffer.size(); RequestHandleClass = HandleClassToHandleClass(HandleClass); @@ -521,21 +496,10 @@ public: if (!msg.ExtraBlockChecks.empty()) { RequireExtraBlockChecks = true; } - ItemsInfo.emplace_back( - msg.Id, - msg.Buffer, - ev->Sender, - ev->Cookie, - std::move(ev->TraceId), - std::move(msg.Orbit), - std::move(msg.ExtraBlockChecks) - ); - LWPROBE(DSProxyBlobPutTactics, ItemsInfo.back().BlobId.TabletID(), Info->GroupID, - ItemsInfo.back().BlobId.ToString(), Tactic, NKikimrBlobStorage::EPutHandleClass_Name(HandleClass)); } RequestBytes = 0; - for (auto &item: ItemsInfo) { + for (auto &item: PutImpl.Blobs) { ReportBytes(item.Buffer.capacity()); RequestBytes += item.BufferSize; } @@ -553,15 +517,15 @@ public: A_LOG_INFO_S("BPP13", "bootstrap" << " ActorId# " << SelfId() << " Group# " << Info->GroupID - << " BlobCount# " << ItemsInfo.size() + << " BlobCount# " << PutImpl.Blobs.size() << " BlobIDs# " << BlobIdSequenceToString() << " HandleClass# " << NKikimrBlobStorage::EPutHandleClass_Name(HandleClass) << " Tactic# " << TEvBlobStorage::TEvPut::TacticName(Tactic) << " Deadline# " << Deadline << " RestartCounter# " << RestartCounter); - for (ui64 blobIdx = 0; blobIdx < ItemsInfo.size(); ++blobIdx) { - LWTRACK(DSProxyPutBootstrapStart, ItemsInfo[blobIdx].Orbit); + for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) { + LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit); } Become(&TThis::StateWait); @@ -573,15 +537,15 @@ public: const ui32 totalParts = Info->Type.TotalPartCount(); TAutoPtr<TEvResume> resume(new TEvResume()); - resume->PartSets.resize(ItemsInfo.size()); + resume->PartSets.resize(PutImpl.Blobs.size()); - for (ui64 idx = 0; idx < ItemsInfo.size(); ++idx) { - TLogoBlobID blobId = ItemsInfo[idx].BlobId; + for (ui64 idx = 0; idx < PutImpl.Blobs.size(); ++idx) { + TLogoBlobID blobId = PutImpl.Blobs[idx].BlobId; const ui64 partSize = Info->Type.PartSize(blobId); - ui64 bufferSize = ItemsInfo[idx].BufferSize; + ui64 bufferSize = PutImpl.Blobs[idx].BufferSize; - char *data = ItemsInfo[idx].Buffer.Detach(); + char *data = PutImpl.Blobs[idx].Buffer.Detach(); Encrypt(data, data, 0, bufferSize, blobId, *Info); TDataPartSet &partSet = resume->PartSets[idx]; @@ -604,8 +568,8 @@ public: ResumeBootstrap(resume); } else { Send(SelfId(), resume.Release()); - for (ui64 blobIdx = 0; blobIdx < ItemsInfo.size(); ++blobIdx) { - LWTRACK(DSProxyPutPauseBootstrap, ItemsInfo[blobIdx].Orbit); + for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) { + LWTRACK(DSProxyPutPauseBootstrap, PutImpl.Blobs[blobIdx].Orbit); } } @@ -616,8 +580,8 @@ public: void Handle(TEvResume::TPtr &ev) { if (ev->Get()->Count == 0) { // Record only the first resume to keep tracks structure simple - for (ui64 blobIdx = 0; blobIdx < ItemsInfo.size(); ++blobIdx) { - LWTRACK(DSProxyPutResumeBootstrap, ItemsInfo[blobIdx].Orbit); + for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) { + LWTRACK(DSProxyPutResumeBootstrap, PutImpl.Blobs[blobIdx].Orbit); } } ResumeBootstrap(ev->Release()); @@ -662,15 +626,15 @@ public: double waitSec = Timer.PassedReset(); resume->WaitSec += waitSec; - Y_VERIFY(ItemsInfo.size() == resume->PartSets.size()); + Y_VERIFY(PutImpl.Blobs.size() == resume->PartSets.size()); bool splitDone = true; - for (ui64 idx = 0; idx < ItemsInfo.size(); ++idx) { + for (ui64 idx = 0; idx < PutImpl.Blobs.size(); ++idx) { TDataPartSet &partSet = resume->PartSets[idx]; - TString &buffer = ItemsInfo[idx].Buffer; - TLogoBlobID blobId = ItemsInfo[idx].BlobId; + TString &buffer = PutImpl.Blobs[idx].Buffer; + TLogoBlobID blobId = PutImpl.Blobs[idx].BlobId; Info->Type.IncrementalSplitData((TErasureType::ECrcMode)blobId.CrcMode(), buffer, partSet); if (partSet.IsSplitDone()) { - ReportBytes(partSet.MemoryConsumed - ItemsInfo[idx].BufferSize); + ReportBytes(partSet.MemoryConsumed - PutImpl.Blobs[idx].BufferSize); } else { splitDone = false; } @@ -681,8 +645,8 @@ public: LWPROBE(ProxyPutBootstrapPart, RequestBytes, waitSec * 1000.0, splitSec * 1000.0, resume->Count, resume->SplitSec * 1000.0); if (splitDone) { - for (ui64 blobIdx = 0; blobIdx < ItemsInfo.size(); ++blobIdx) { - LWTRACK(DSProxyPutBootstrapDone, ItemsInfo[blobIdx].Orbit, + for (ui64 blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) { + LWTRACK(DSProxyPutBootstrapDone, PutImpl.Blobs[blobIdx].Orbit, RequestBytes, resume->WilsonSec * 1000.0, resume->AllocateSec * 1000.0, resume->WaitSec * 1000.0, resume->SplitSec * 1000.0, resume->Count, blobIdx); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp index 757b313cfb0..7eaa35790f9 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp @@ -66,7 +66,7 @@ void TPutImpl::PrepareReply(NKikimrProto::EReplyStatus status, TLogContext &logC continue; } - outPutResults.emplace_back(idx, new TEvBlobStorage::TEvPutResult(status, Blobs[idx].Id, StatusFlags, + outPutResults.emplace_back(idx, new TEvBlobStorage::TEvPutResult(status, Blobs[idx].BlobId, StatusFlags, Info->GroupID, ApproximateFreeSpaceShare)); outPutResults.back().second->ErrorReason = errorReason; @@ -87,7 +87,7 @@ void TPutImpl::PrepareReply(TLogContext &logCtx, TString errorReason, for (auto item : finished) { auto &[blobId, state] = *item; const ui64 idx = state.BlobIdx; - Y_VERIFY(blobId == Blobs[idx].Id, "BlobIdx# %" PRIu64 " BlobState# %s Blackboard# %s", + Y_VERIFY(blobId == Blobs[idx].BlobId, "BlobIdx# %" PRIu64 " BlobState# %s Blackboard# %s", idx, state.ToString().c_str(), Blackboard.ToString().c_str()); Y_VERIFY(!IsDone[idx]); Y_VERIFY(state.Status != NKikimrProto::UNKNOWN); @@ -149,7 +149,7 @@ TString TPutImpl::DumpFullState() const { bool TPutImpl::MarkBlobAsSent(ui64 idx) { Y_VERIFY(idx < Blobs.size()); Y_VERIFY(!IsDone[idx]); - Blackboard.MoveBlobStateToDone(Blobs[idx].Id); + Blackboard.MoveBlobStateToDone(Blobs[idx].BlobId); IsDone[idx] = true; DoneBlobs++; return true; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index b95d0cf3485..2e3ab60fc30 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -45,11 +45,30 @@ private: const TEvBlobStorage::TEvPut::ETactic Tactic; struct TBlobInfo { - TLogoBlobID Id; + TLogoBlobID BlobId; + TString Buffer; + ui64 BufferSize; + TActorId Recipient; + ui64 Cookie; + NLWTrace::TOrbit Orbit; + bool Replied = false; std::vector<std::pair<ui64, ui32>> ExtraBlockChecks; + NWilson::TSpan Span; + + TBlobInfo(TLogoBlobID id, TString buffer, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId, + NLWTrace::TOrbit&& orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks, bool single) + : BlobId(id) + , Buffer(std::move(buffer)) + , BufferSize(Buffer.size()) + , Recipient(recipient) + , Cookie(cookie) + , Orbit(std::move(orbit)) + , ExtraBlockChecks(std::move(extraBlockChecks)) + , Span(single ? NWilson::TSpan() : NWilson::TSpan(TWilson::BlobStorage, std::move(traceId), "DSProxy.Put.Blob")) + {} void Output(IOutputStream& s) const { - s << Id; + s << BlobId; if (!ExtraBlockChecks.empty()) { s << "{"; for (auto it = ExtraBlockChecks.begin(); it != ExtraBlockChecks.end(); ++it) { @@ -68,8 +87,11 @@ private: return s.Str(); } }; + TBatchedVec<TBlobInfo> Blobs; + friend class TBlobStorageGroupPutRequest; + TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVPutResponses; TStackVec<bool, MaxBatchedPutRequests * TypicalDisksInSubring> ReceivedVMultiPutResponses; @@ -82,7 +104,7 @@ private: public: TPutImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, TEvBlobStorage::TEvPut *ev, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, - bool enableRequestMod3x3ForMinLatecy) + bool enableRequestMod3x3ForMinLatecy, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId) : Deadline(ev->Deadline) , Info(info) , Blackboard(info, state, ev->HandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead, false) @@ -92,10 +114,13 @@ public: , Mon(mon) , EnableRequestMod3x3ForMinLatecy(enableRequestMod3x3ForMinLatecy) , Tactic(ev->Tactic) - , Blobs({{ev->Id, std::move(ev->ExtraBlockChecks)}}) { - Y_VERIFY(Blobs.size()); - Y_VERIFY(Blobs.size() <= MaxBatchedPutRequests); + Blobs.emplace_back(ev->Id, std::move(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit), + std::move(ev->ExtraBlockChecks), true); + + auto& blob = Blobs.back(); + LWPROBE(DSProxyBlobPutTactics, blob.BlobId.TabletID(), Info->GroupID, blob.BlobId.ToString(), Tactic, + NKikimrBlobStorage::EPutHandleClass_Name(GetPutHandleClass())); } TPutImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state, @@ -113,13 +138,20 @@ public: , Tactic(tactic) { Y_VERIFY(events.size(), "TEvPut vector is empty"); + for (auto &ev : events) { auto& msg = *ev->Get(); Y_VERIFY(msg.HandleClass == putHandleClass); Y_VERIFY(msg.Tactic == tactic); - Blobs.push_back({msg.Id, std::move(msg.ExtraBlockChecks)}); + Blobs.emplace_back(msg.Id, std::move(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId), + std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false); Deadline = Max(Deadline, msg.Deadline); + + auto& blob = Blobs.back(); + LWPROBE(DSProxyBlobPutTactics, blob.BlobId.TabletID(), Info->GroupID, blob.BlobId.ToString(), Tactic, + NKikimrBlobStorage::EPutHandleClass_Name(GetPutHandleClass())); } + Y_VERIFY(Blobs.size()); Y_VERIFY(Blobs.size() <= MaxBatchedPutRequests); } @@ -141,12 +173,13 @@ public: const ui32 totalParts = Info->Type.TotalPartCount(); for (ui64 blobIdx = 0; blobIdx < Blobs.size(); ++blobIdx) { TBlobInfo& blob = Blobs[blobIdx]; + Blackboard.RegisterBlobForPut(blob.BlobId, &blob.ExtraBlockChecks, &blob.Span); for (ui32 i = 0; i < totalParts; ++i) { REQUEST_VALGRIND_CHECK_MEM_IS_DEFINED(partSets[blobIdx].Parts[i].OwnedString.Data(), partSets[blobIdx].Parts[i].OwnedString.Size()); - Blackboard.AddPartToPut(blob.Id, i, partSets[blobIdx].Parts[i].OwnedString, &blob.ExtraBlockChecks); + Blackboard.AddPartToPut(blob.BlobId, i, partSets[blobIdx].Parts[i].OwnedString); } - Blackboard.MarkBlobReadyToPut(blob.Id, blobIdx); + Blackboard.MarkBlobReadyToPut(blob.BlobId, blobIdx); } TPutResultVec putResults; @@ -461,7 +494,9 @@ protected: ++VPutRequests; ReceivedVPutResponses.push_back(false); } else if constexpr (isVMultiPut) { - outVPutEvents.back()->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks); + // this request MUST originate from the TEvPut, so the Span field must be filled in + Y_VERIFY(put.Span); + outVPutEvents.back()->AddVPut(put.Id, put.Buffer, &cookie, put.ExtraBlockChecks, put.Span->GetTraceId()); } if (put.IsHandoff) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp index 299dfa2af7d..c83f46b1080 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_range.cpp @@ -250,7 +250,7 @@ class TBlobStorageGroupRangeRequest : public TBlobStorageGroupRequestActor<TBlob A_LOG_DEBUG_S("DSR08", "sending TEvGet# " << get->ToString()); - SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0, Span); + SendToBSProxy(SelfId(), Info->GroupID, get.release(), 0, Span.GetTraceId()); // switch state Become(&TThis::StateGet); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp index 89304e2cd34..efeedb94465 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp @@ -334,7 +334,7 @@ void TStrategyBase::PreparePutsForPartPlacement(TLogContext &logCtx, TBlobState Y_VERIFY(state.Parts[record.PartIdx].Data.IsMonolith()); groupDiskRequests.AddPut(disk.OrderNumber, partId, state.Parts[record.PartIdx].Data.GetMonolith(), TDiskPutRequest::ReasonInitial, info.Type.IsHandoffInSubgroup(record.VDiskIdx), - state.ExtraBlockChecks, state.BlobIdx); + state.ExtraBlockChecks, state.Span, state.BlobIdx); disk.DiskParts[record.PartIdx].Situation = TBlobState::ESituation::Sent; } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h index 5ef2c7ddf4d..4442c48218b 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3of4.h @@ -141,6 +141,7 @@ protected: diskIdx == group.DiskIdx[0] ? TDiskPutRequest::ReasonInitial : TDiskPutRequest::ReasonError, diskIdx != group.DiskIdx[0], state.ExtraBlockChecks, + state.Span, state.BlobIdx); s = TBlobState::ESituation::Sent; any |= {&info.GetTopology(), diskIdx}; @@ -174,6 +175,7 @@ protected: handoff ? TDiskPutRequest::ReasonError : TDiskPutRequest::ReasonInitial, handoff, state.ExtraBlockChecks, + state.Span, state.BlobIdx); part.Situation = TBlobState::ESituation::Sent; any |= {&info.GetTopology(), (ui8)diskIdx}; diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp index 74bd0ed7ee8..aaf73779b16 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_patch_ut.cpp @@ -371,7 +371,7 @@ void ConductVPatchStart(TTestBasicRuntime &runtime, const TDSProxyEnv &env, cons UNIT_ASSERT(startRecord.HasCookie()); std::unique_ptr<TEvBlobStorage::TEvVPatchFoundParts> foundParts = std::make_unique<TEvBlobStorage::TEvVPatchFoundParts>( status, args.OriginalId, args.PatchedId, vdisk, startRecord.GetCookie(), now, "", &startRecord, - nullptr, nullptr, nullptr, NWilson::TTraceId(), 0); + nullptr, nullptr, nullptr, 0); for (auto partId : parts) { foundParts->AddPart(partId); } @@ -406,7 +406,7 @@ void ConductVPatchDiff(TTestBasicRuntime &runtime, const TDSProxyEnv &env, const UNIT_ASSERT(diffRecord.HasCookie()); std::unique_ptr<TEvBlobStorage::TEvVPatchResult> result = std::make_unique<TEvBlobStorage::TEvVPatchResult>( resultStatus, args.OriginalId, args.PatchedId, vdisk, diffRecord.GetCookie(), now, - &diffRecord, nullptr, nullptr, nullptr,NWilson::TTraceId(), 0); + &diffRecord, nullptr, nullptr, nullptr, 0); result->SetStatusFlagsAndFreeSpace(args.StatusFlags, args.ApproximateFreeSpaceShare); SendByHandle(runtime, diffEv, std::move(result)); @@ -449,7 +449,7 @@ void ConductVMovedPatch(TTestBasicRuntime &runtime, const TTestArgs &args, EMove TOutOfSpaceStatus oos(args.StatusFlags, args.ApproximateFreeSpaceShare); std::unique_ptr<TEvBlobStorage::TEvVMovedPatchResult> vPatchResult = std::make_unique<TEvBlobStorage::TEvVMovedPatchResult>( resultStatus, args.OriginalId, args.PatchedId, vDiskId, expectedCookie, oos, - TAppData::TimeProvider->Now(), 0, &vPatchRecord, nullptr, nullptr, nullptr, NWilson::TTraceId(), 0, TString()); + TAppData::TimeProvider->Now(), 0, &vPatchRecord, nullptr, nullptr, nullptr, 0, TString()); SendByHandle(runtime, handle, std::move(vPatchResult)); CTEST << "ConductVMovedPatch: Finish\n"; diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp index 48c88d53505..e94b2c398d5 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_put_ut.cpp @@ -66,7 +66,7 @@ void TestPutMaxPartCountOnHandoff(TErasureType::EErasureSpecies erasureSpecies) TEvBlobStorage::TEvPut ev(blobId, data, TInstant::Max(), NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticDefault); - TPutImpl putImpl(group.GetInfo(), groupQueues, &ev, mon, false); + TPutImpl putImpl(group.GetInfo(), groupQueues, &ev, mon, false, TActorId(), 0, NWilson::TTraceId()); for (ui32 idx = 0; idx < domainCount; ++idx) { group.SetPredictedDelayNs(idx, 1); @@ -362,7 +362,7 @@ struct TTestPutAllOk { TMaybe<TPutImpl> putImpl; TPutImpl::TPutResultVec putResults; if constexpr (IsVPut) { - putImpl.ConstructInPlace(Group.GetInfo(), GroupQueues, events[0]->Get(), Mon, false); + putImpl.ConstructInPlace(Group.GetInfo(), GroupQueues, events[0]->Get(), Mon, false, TActorId(), 0, NWilson::TTraceId()); } else { putImpl.ConstructInPlace(Group.GetInfo(), GroupQueues, events, Mon, NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticDefault, false); @@ -411,7 +411,7 @@ Y_UNIT_TEST(TestMirror3dcWith3x3MinLatencyMod) { TString data = AlphaData(size); TEvBlobStorage::TEvPut ev(blobId, data, TInstant::Max(), NKikimrBlobStorage::TabletLog, TEvBlobStorage::TEvPut::TacticMinLatency); - TPutImpl putImpl(env.Info, env.GroupQueues, &ev, env.Mon, true); + TPutImpl putImpl(env.Info, env.GroupQueues, &ev, env.Mon, true, TActorId(), 0, NWilson::TTraceId()); TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts; TLogContext logCtx(NKikimrServices::BS_PROXY_PUT, false); diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp index dcb316ec9c2..397b975979a 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_sequence_ut.cpp @@ -247,7 +247,7 @@ void SendVGetResult(ui32 vDiskIdx, NKikimrProto::EReplyStatus status, ui32 partI TVDiskState *from = &subgroup[vDiskIdx]; std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult(NKikimrProto::OK, from->VDiskId, - TAppData::TimeProvider->Now(), 0, nullptr, nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U)); + TAppData::TimeProvider->Now(), 0, nullptr, nullptr, nullptr, nullptr, {}, 0U, 0U)); SetPredictedDelaysForAllQueues({}); ui64 queryCookie = from->QueryCookies.size() ? from->QueryCookies[0] : 0; @@ -288,7 +288,7 @@ void SendVGetResult(ui32 blobIdx, ui32 vDiskIdx, NKikimrProto::EReplyStatus stat || status == NKikimrProto::VDISK_ERROR_STATE) { std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult( status, request.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr, nullptr, nullptr, nullptr, - NWilson::TTraceId(), {}, 0U, 0U)); + {}, 0U, 0U)); for (auto it = request.Queries.begin(); it != request.Queries.end(); ++it) { result->AddResult(status, it->LogoBlobId, &it->QueryCookie); } @@ -300,7 +300,7 @@ void SendVGetResult(ui32 blobIdx, ui32 vDiskIdx, NKikimrProto::EReplyStatus stat } else if (status == NKikimrProto::NODATA) { std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult( NKikimrProto::OK, request.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr, - nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U)); + nullptr, nullptr, nullptr, {}, 0U, 0U)); for (auto it = request.Queries.begin(); it != request.Queries.end(); ++it) { result->AddResult(status, it->LogoBlobId, &it->QueryCookie); TLogoBlobID id(it->LogoBlobId); @@ -314,7 +314,7 @@ void SendVGetResult(ui32 blobIdx, ui32 vDiskIdx, NKikimrProto::EReplyStatus stat } else if (status == NKikimrProto::OK) { std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult( NKikimrProto::OK, request.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr, - nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U)); + nullptr, nullptr, nullptr, {}, 0U, 0U)); for (auto it = request.Queries.begin(); it != request.Queries.end(); ++it) { TString data; ui32 partIdx = 0; @@ -366,7 +366,7 @@ void SendVPutResultEvent(TTestActorRuntime &runtime, TVDiskState &vdisk, NKikimr std::unique_ptr<TEvBlobStorage::TEvVPutResult> vPutResult(new TEvBlobStorage::TEvVPutResult( status, vdisk.LogoBlobId, vdisk.VDiskId, &vdisk.InnerCookie, TOutOfSpaceStatus(0u, 0.0), TAppData::TimeProvider->Now(), - 0, nullptr, nullptr, nullptr, nullptr, 0, NWilson::TTraceId(), 0, TString())); + 0, nullptr, nullptr, nullptr, nullptr, 0, 0, TString())); vPutResult->Record.MutableMsgQoS()->MutableMsgId()->SetMsgId(vdisk.MsgId); vPutResult->Record.MutableMsgQoS()->MutableMsgId()->SetSequenceId(vdisk.SequenceId); SetPredictedDelaysForAllQueues({}); @@ -1212,7 +1212,7 @@ Y_UNIT_TEST(TestGivenBlock42PutWhenPartialGetThenSingleDiskRequestOk) { std::unique_ptr<TEvBlobStorage::TEvVGetResult> result( new TEvBlobStorage::TEvVGetResult( NKikimrProto::OK, theRequest.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr, - nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U)); + nullptr, nullptr, nullptr, {}, 0U, 0U)); result->AddResult( NKikimrProto::OK, id, query.Shift, resultData.data(), resultData.size(), &query.QueryCookie); @@ -1241,7 +1241,7 @@ Y_UNIT_TEST(TestGivenBlock42PutWhenPartialGetThenSingleDiskRequestOk) { const auto &request = item.second; std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult( NKikimrProto::RACE, request.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr, - nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U)); + nullptr, nullptr, nullptr, {}, 0U, 0U)); result->Record.MutableMsgQoS()->MutableMsgId()->SetMsgId(request.MsgId); result->Record.MutableMsgQoS()->MutableMsgId()->SetSequenceId(request.SequenceId); result->Record.SetCookie(request.RecordCookie); @@ -1301,7 +1301,7 @@ Y_UNIT_TEST(TestGivenBlock42Put6PartsOnOneVDiskWhenDiscoverThenRecoverFirst) { TGetRangeQuery &query = req.RangeQueries[0]; std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult( NKikimrProto::OK, req.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr, nullptr, nullptr, - nullptr, NWilson::TTraceId(), {}, 0U, 0U)); + nullptr, {}, 0U, 0U)); TIngress ingress; for (ui32 partIdx = 0; partIdx < 6; ++partIdx) { TLogoBlobID blobPartId(logoblobid, partIdx + 1); @@ -1325,7 +1325,7 @@ Y_UNIT_TEST(TestGivenBlock42Put6PartsOnOneVDiskWhenDiscoverThenRecoverFirst) { //TGetRangeQuery &query = req.RangeQueries[0]; std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult( NKikimrProto::OK, req.VDiskId, TAppData::TimeProvider->Now(), 0, nullptr, nullptr, nullptr, - nullptr, NWilson::TTraceId(), {}, 0U, 0U)); + nullptr, {}, 0U, 0U)); result->Record.MutableMsgQoS()->MutableMsgId()->SetMsgId(req.MsgId); result->Record.MutableMsgQoS()->MutableMsgId()->SetSequenceId(req.SequenceId); runtime.Send( diff --git a/ydb/core/blobstorage/dsproxy/ut/dsproxy_test_state_ut.h b/ydb/core/blobstorage/dsproxy/ut/dsproxy_test_state_ut.h index bc805ac12b7..e6b8d2de143 100644 --- a/ydb/core/blobstorage/dsproxy/ut/dsproxy_test_state_ut.h +++ b/ydb/core/blobstorage/dsproxy/ut/dsproxy_test_state_ut.h @@ -172,7 +172,7 @@ struct TTestState { ui64 *cookie = record.HasCookie() ? &cookieValue : nullptr; std::unique_ptr<TEvBlobStorage::TEvVPutResult> result(new TEvBlobStorage::TEvVPutResult(status, blobId, vDiskId, cookie, TOutOfSpaceStatus(0u, 0.0), TAppData::TimeProvider->Now(), - 0, nullptr, nullptr, nullptr, nullptr, 0, NWilson::TTraceId(), 0, TString())); + 0, nullptr, nullptr, nullptr, nullptr, 0, 0, TString())); return result; } @@ -184,7 +184,7 @@ struct TTestState { ui64 *cookie = record.HasCookie() ? &cookieValue : nullptr; std::unique_ptr<TEvBlobStorage::TEvVMultiPutResult> result( new TEvBlobStorage::TEvVMultiPutResult(status, vDiskId, cookie, TAppData::TimeProvider->Now(), 0, - nullptr, nullptr, nullptr, nullptr, 0, NWilson::TTraceId(), 0, TString())); + nullptr, nullptr, nullptr, nullptr, 0, 0, TString())); result->Record.SetStatusFlags(TOutOfSpaceStatus(0u, 0.0).Flags); return result; } @@ -224,7 +224,7 @@ struct TTestState { TVDiskID vDiskId = VDiskIDFromVDiskID(ev->Record.GetVDiskID()); std::unique_ptr<TEvBlobStorage::TEvVGetResult> result(new TEvBlobStorage::TEvVGetResult( status, vDiskId, TAppData::TimeProvider->Now(), 0, nullptr, - nullptr, nullptr, nullptr, NWilson::TTraceId(), {}, 0U, 0U)); + nullptr, nullptr, nullptr, {}, 0U, 0U)); return result; } diff --git a/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp b/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp index 2e51f3e5bfc..23bbd621bd1 100644 --- a/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/incorrect_queries.cpp @@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) { TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr)); for(auto [blob, data, status] : blobs) { - static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr); + static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr, NWilson::TTraceId()); } static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->Record = proto; @@ -105,7 +105,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) { TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::TabletLog, false, nullptr)); for(auto [blob, data, status] : blobs) { - static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr); + static_cast<TEvBlobStorage::TEvVMultiPut*>(ev.get())->AddVPut(blob, data, nullptr, nullptr, NWilson::TTraceId()); } env.WithQueueId(test.Info->GetVDiskInSubgroup(0, blobs[0].BlobId.Hash()), NKikimrBlobStorage::EVDiskQueueId::PutTabletLog, [&](TActorId queueId) { @@ -528,7 +528,7 @@ Y_UNIT_TEST_SUITE(IncorrectQueries) { if (i % 19 != 18) { ++goodCount; TLogoBlobID blob(i, 1, 0, 0, blobSize, 0, 1); - static_cast<TEvBlobStorage::TEvVMultiPut*>(events[i].get())->AddVPut(blob, data, nullptr, nullptr); + static_cast<TEvBlobStorage::TEvVMultiPut*>(events[i].get())->AddVPut(blob, data, nullptr, nullptr, NWilson::TTraceId()); } } diff --git a/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp b/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp index 931cd7958dd..f60f1616793 100644 --- a/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp +++ b/ydb/core/blobstorage/ut_vdisk/lib/helpers.cpp @@ -448,7 +448,7 @@ class TManyMultiPuts : public TActorBootstrapped<TManyMultiPuts> { TVDiskIdShort mainVDiskId = TIngress::GetMainReplica(&Conf->GroupInfo->GetTopology(), logoBlobID); if (mainVDiskId == VDiskInfo.VDiskID) { ui64 cookieValue = Step; - vMultiPut->AddVPut(logoBlobID, MsgData, &cookieValue, nullptr); + vMultiPut->AddVPut(logoBlobID, MsgData, &cookieValue, nullptr, NWilson::TTraceId()); putCount++; Step++; diff --git a/ydb/core/blobstorage/ut_vdisk/lib/vdisk_mock.cpp b/ydb/core/blobstorage/ut_vdisk/lib/vdisk_mock.cpp index d00daeecd30..9dcec09c1bc 100644 --- a/ydb/core/blobstorage/ut_vdisk/lib/vdisk_mock.cpp +++ b/ydb/core/blobstorage/ut_vdisk/lib/vdisk_mock.cpp @@ -102,7 +102,7 @@ public: auto response = std::make_unique<TEvBlobStorage::TEvVPutResult>(status, id, VDiskIDFromVDiskID(record.GetVDiskID()), record.HasCookie() ? &cookie : nullptr, TOutOfSpaceStatus(0u, 0.0), TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(), - &record, nullptr, nullptr, nullptr, 0, NWilson::TTraceId(), 0, errorReason); + &record, nullptr, nullptr, nullptr, 0, 0, errorReason); FinalizeAndSend(std::move(response), ctx, ev->Sender); }; @@ -141,7 +141,7 @@ public: auto response = std::make_unique<TEvBlobStorage::TEvVMultiPutResult>(NKikimrProto::OK, VDiskIDFromVDiskID(record.GetVDiskID()), record.HasCookie() ? &cookie : nullptr, TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(), - &record, nullptr, nullptr, nullptr, 0, NWilson::TTraceId(), 0, TString()); + &record, nullptr, nullptr, nullptr, 0, 0, TString()); if (ErrorMode) { response->MakeError(NKikimrProto::ERROR, "error mode", record); LOG_DEBUG(ctx, NActorsServices::TEST, "TEvVMultiPut %s -> %s", ev->Get()->ToString().data(), @@ -179,7 +179,7 @@ public: auto response = std::make_unique<TEvBlobStorage::TEvVGetResult>(NKikimrProto::OK, VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(), - &record, nullptr, nullptr, nullptr, NWilson::TTraceId(), cookie, 0U, 0U); + &record, nullptr, nullptr, nullptr, cookie, 0U, 0U); if (ErrorMode) { response->MakeError(NKikimrProto::ERROR, "error mode", record); @@ -336,7 +336,7 @@ public: TEvBlobStorage::TEvVBlockResult::TTabletActGen actual(record.GetTabletId(), record.GetGeneration()); auto response = std::make_unique<TEvBlobStorage::TEvVBlockResult>(NKikimrProto::OK, &actual, VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(), - (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, NWilson::TTraceId(), 0); + (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, 0); FinalizeAndSend(std::move(response), ctx, ev->Sender); } @@ -349,11 +349,11 @@ public: if (it != Blocks.end()) { response.reset(new TEvBlobStorage::TEvVGetBlockResult(NKikimrProto::OK, it->first, it->second, VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(), - ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, NWilson::TTraceId())); + ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr)); } else { response.reset(new TEvBlobStorage::TEvVGetBlockResult(NKikimrProto::NODATA, record.GetTabletId(), VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(), - ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, NWilson::TTraceId())); + ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr)); } FinalizeAndSend(std::move(response), ctx, ev->Sender); @@ -367,7 +367,7 @@ public: auto response = std::make_unique<TEvBlobStorage::TEvVCollectGarbageResult>(NKikimrProto::BLOCKED, record.GetTabletId(), record.GetRecordGeneration(), record.GetChannel(), VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(), - (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, NWilson::TTraceId(), 0); + (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, 0); FinalizeAndSend(std::move(response), ctx, ev->Sender); return; } @@ -387,8 +387,7 @@ public: auto response = std::make_unique<TEvBlobStorage::TEvVCollectGarbageResult>(NKikimrProto::OK, record.GetTabletId(), record.GetRecordGeneration(), record.GetChannel(), VDiskIDFromVDiskID(record.GetVDiskID()), - TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, - NWilson::TTraceId(), 0); + TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(), &record, nullptr, nullptr, nullptr, 0); FinalizeAndSend(std::move(response), ctx, ev->Sender); } @@ -406,7 +405,7 @@ public: auto response = std::make_unique<TEvBlobStorage::TEvVGetBarrierResult>(NKikimrProto::OK, VDiskIDFromVDiskID(record.GetVDiskID()), TAppData::TimeProvider->Now(), (ui32)ev->Get()->GetCachedByteSize(), - &record, nullptr, nullptr, nullptr, NWilson::TTraceId()); + &record, nullptr, nullptr, nullptr); auto it = Barriers.lower_bound(first); while (it != Barriers.end() && it->first <= last) { diff --git a/ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h b/ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h index e44bcae7e7c..1ec81167f1e 100644 --- a/ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h +++ b/ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h @@ -83,10 +83,8 @@ namespace NKikimr { TEvAnubisOsirisPutResult(NKikimrProto::EReplyStatus status, const TInstant &now, ::NMonitoring::TDynamicCounters::TCounterPtr counterPtr, - NVDiskMon::TLtcHistoPtr histoPtr, - NWilson::TTraceId traceId) - : TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr, histoPtr, - std::move(traceId)) + NVDiskMon::TLtcHistoPtr histoPtr) + : TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr, histoPtr) , Status(status) {} }; diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_events.cpp index f02de6350c3..2a2a8d69b17 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.cpp +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.cpp @@ -9,10 +9,10 @@ namespace NKikimr { const TLogoBlobID &logoBlobId, const TVDiskID &vdisk, const ui64 *cookie, TOutOfSpaceStatus oosStatus, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVPut *record, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, const ui64 bufferSizeBytes, NWilson::TTraceId traceId, + const NVDiskMon::TLtcHistoPtr &histoPtr, const ui64 bufferSizeBytes, ui64 incarnationGuid, const TString& errorReason) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), - TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, record, skeletonFrontIDPtr) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, + recByteSize, record, skeletonFrontIDPtr) { IncrementSize(bufferSizeBytes); Record.SetStatus(status); diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index 51040aefacf..82e3dbd8652 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -355,13 +355,12 @@ namespace NKikimr { TEvVResultBase() = default; TEvVResultBase(const TInstant &now, ui32 channel, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId) + const NVDiskMon::TLtcHistoPtr &histoPtr) : TVDiskNonlocalResultBase(channel) , Start(now) , Size(0) , CounterPtr(counterPtr) , HistoPtr(histoPtr) - , TraceId(std::move(traceId)) {} virtual ~TEvVResultBase() { @@ -394,9 +393,6 @@ namespace NKikimr { ::NMonitoring::TDynamicCounters::TCounterPtr CounterPtr; NVDiskMon::TLtcHistoPtr HistoPtr; bool Finalized = false; - - public: - NWilson::TTraceId TraceId; }; template<typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType> @@ -410,8 +406,8 @@ namespace NKikimr { TEvVResultBasePB() = default; TEvVResultBasePB(const TInstant &now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel) - : TEvVResultBase(now, channel, counterPtr, histoPtr, std::move(traceId)) + const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel) + : TEvVResultBase(now, channel, counterPtr, histoPtr) {} }; @@ -433,10 +429,9 @@ namespace NKikimr { template<typename TQueryRecord> TEvVResultBaseWithQoSPB(TInstant now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel, + const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel, ui32 recByteSize, TQueryRecord *queryRecord, const TActorIDPtr &skeletonFrontIDPtr) - : TBase(queryRecord ? GetReceivedTimestamp(queryRecord) : now, - counterPtr, histoPtr, std::move(traceId), channel) + : TBase(queryRecord ? GetReceivedTimestamp(queryRecord) : now, counterPtr, histoPtr, channel) , MsgCtx(queryRecord ? TVMsgContext(recByteSize, queryRecord->GetMsgQoS()) : TVMsgContext()) , SkeletonFrontIDPtr(skeletonFrontIDPtr) { @@ -717,8 +712,7 @@ namespace NKikimr { const ui64 *cookie, TOutOfSpaceStatus oosStatus, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVPut *queryRecord, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr, - const ui64 bufferSizeBytes, NWilson::TTraceId traceId, ui64 incarnationGuid, - const TString& errorReason); + const ui64 bufferSizeBytes, ui64 incarnationGuid, const TString& errorReason); TString ToString() const override { return ToString(Record); @@ -838,7 +832,7 @@ namespace NKikimr { TRope GetItemBuffer(ui64 itemIdx) const; void AddVPut(const TLogoBlobID &logoBlobId, const TString &buffer, ui64 *cookie, - std::vector<std::pair<ui64, ui32>> *extraBlockChecks) { + std::vector<std::pair<ui64, ui32>> *extraBlockChecks, NWilson::TTraceId traceId) { NKikimrBlobStorage::TVMultiPutItem *item = Record.AddItems(); LogoBlobIDFromLogoBlobID(logoBlobId, item->MutableBlobID()); item->SetFullDataSize(logoBlobId.BlobSize()); @@ -854,6 +848,9 @@ namespace NKikimr { p->SetGeneration(generation); } } + if (traceId) { + traceId.Serialize(item->MutableTraceId()); + } } bool Validate(TString& errorReason) { @@ -929,10 +926,10 @@ namespace NKikimr { TEvVMultiPutResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const ui64 *cookie, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVMultiPut *record, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, const ui64 bufferSizeBytes, NWilson::TTraceId traceId, - ui64 incarnationGuid, const TString& errorReason) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), - TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, record, skeletonFrontIDPtr) + const NVDiskMon::TLtcHistoPtr &histoPtr, const ui64 bufferSizeBytes, ui64 incarnationGuid, + const TString& errorReason) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, + recByteSize, record, skeletonFrontIDPtr) { IncrementSize(bufferSizeBytes); Record.SetStatus(status); @@ -1265,9 +1262,8 @@ namespace NKikimr { TEvVGetResult(NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVGet *queryRecord, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr, - NWilson::TTraceId traceId, TMaybe<ui64> cookie, ui32 channel, ui64 incarnationGuid) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), channel, - recByteSize, queryRecord, skeletonFrontIDPtr) + TMaybe<ui64> cookie, ui32 channel, ui64 incarnationGuid) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, channel, recByteSize, queryRecord, skeletonFrontIDPtr) { Record.SetStatus(status); VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID()); @@ -1517,9 +1513,8 @@ namespace NKikimr { TOutOfSpaceStatus oosStatus, const TInstant &now, ui32 recByteSize, TReqRecord *record, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr, - NWilson::TTraceId traceId, ui64 incarnationGuid, const TString& errorReason) - : TBase(now, counterPtr, histoPtr, std::move(traceId), - TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, record, skeletonFrontIDPtr) + ui64 incarnationGuid, const TString& errorReason) + : TBase(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, record, skeletonFrontIDPtr) { this->Record.SetStatus(status); LogoBlobIDFromLogoBlobID(originalBlobId, this->Record.MutableOriginalBlobId()); @@ -1640,9 +1635,9 @@ namespace NKikimr { TOutOfSpaceStatus oosStatus, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVMovedPatch *record, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr, - NWilson::TTraceId traceId, ui64 incarnationGuid, const TString& errorReason) + ui64 incarnationGuid, const TString& errorReason) : TBase(status, originalBlobId, patchedBlobId, vdisk, cookie, oosStatus, now, recByteSize, - record, skeletonFrontIDPtr, counterPtr, histoPtr, std::move(traceId), incarnationGuid, errorReason) + record, skeletonFrontIDPtr, counterPtr, histoPtr, incarnationGuid, errorReason) {} }; @@ -1688,9 +1683,9 @@ namespace NKikimr { TOutOfSpaceStatus oosStatus, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVInplacePatch *record, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr, - NWilson::TTraceId traceId, ui64 incarnationGuid, const TString& errorReason) + ui64 incarnationGuid, const TString& errorReason) : TBase(status, originalBlobId, patchedBlobId, vdisk, cookie, oosStatus, now, recByteSize, - record, skeletonFrontIDPtr, counterPtr, histoPtr, std::move(traceId), incarnationGuid, errorReason) + record, skeletonFrontIDPtr, counterPtr, histoPtr, incarnationGuid, errorReason) {} }; @@ -1738,9 +1733,9 @@ namespace NKikimr { TEvVBlockResult(NKikimrProto::EReplyStatus status, const TTabletActGen *actual, const TVDiskID &vdisk, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVBlock *queryRecord, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui64 incarnationGuid) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), - TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, queryRecord, skeletonFrontIDPtr) + const NVDiskMon::TLtcHistoPtr &histoPtr, ui64 incarnationGuid) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, + recByteSize, queryRecord, skeletonFrontIDPtr) { Record.SetStatus(status); if (actual) { @@ -1832,8 +1827,8 @@ namespace NKikimr { const TLogoBlobID &patchedBlobId, const TVDiskID &vDiskId, TMaybe<ui64> cookie, const TInstant &now, const TString &errorReason, NKikimrBlobStorage::TEvVPatchStart *record, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui64 incarnationGuid) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), + const NVDiskMon::TLtcHistoPtr &histoPtr, ui64 incarnationGuid) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, record->GetCachedSize(), record, skeletonFrontIDPtr) { @@ -1996,8 +1991,8 @@ namespace NKikimr { TEvVPatchXorDiffResult(NKikimrProto::EReplyStatus status, TInstant now, NKikimrBlobStorage::TEvVPatchXorDiff *record, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), + const NVDiskMon::TLtcHistoPtr &histoPtr) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, record->GetCachedSize(), record, skeletonFrontIDPtr) { @@ -2030,8 +2025,8 @@ namespace NKikimr { const TLogoBlobID &patchedPartBlobId, const TVDiskID &vDiskId, TMaybe<ui64> cookie, TInstant now, NKikimrBlobStorage::TEvVPatchDiff *record, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr, - NWilson::TTraceId traceId, ui64 incarnationGuid) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), + ui64 incarnationGuid) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, record->GetCachedSize(), record, skeletonFrontIDPtr) { @@ -2109,9 +2104,9 @@ namespace NKikimr { TEvVGetBlockResult(NKikimrProto::EReplyStatus status, ui64 tabletId, const TVDiskID &vdisk, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVGetBlock *queryRecord, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), - TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, queryRecord, skeletonFrontIDPtr) + const NVDiskMon::TLtcHistoPtr &histoPtr) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, + recByteSize, queryRecord, skeletonFrontIDPtr) { Record.SetStatus(status); Record.SetTabletId(tabletId); @@ -2121,9 +2116,9 @@ namespace NKikimr { TEvVGetBlockResult(NKikimrProto::EReplyStatus status, ui64 tabletId, ui32 generation, const TVDiskID &vdisk, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVGetBlock *queryRecord, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), - TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, queryRecord, skeletonFrontIDPtr) + const NVDiskMon::TLtcHistoPtr &histoPtr) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, + recByteSize, queryRecord, skeletonFrontIDPtr) { Record.SetStatus(status); Record.SetTabletId(tabletId); @@ -2234,9 +2229,9 @@ namespace NKikimr { const TVDiskID &vdisk, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVCollectGarbage *queryRecord, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr, - NWilson::TTraceId traceId, ui64 incarnationGuid) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), - TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, queryRecord, skeletonFrontIDPtr) + ui64 incarnationGuid) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, + recByteSize, queryRecord, skeletonFrontIDPtr) { Record.SetStatus(status); Record.SetTabletId(tabletId); @@ -2329,9 +2324,9 @@ namespace NKikimr { TEvVGetBarrierResult(NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TInstant &now, ui32 recByteSize, NKikimrBlobStorage::TEvVGetBarrier *queryRecord, const TActorIDPtr &skeletonFrontIDPtr, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId) - : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, std::move(traceId), - TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, recByteSize, queryRecord, skeletonFrontIDPtr) + const NVDiskMon::TLtcHistoPtr &histoPtr) + : TEvVResultBaseWithQoSPB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, + recByteSize, queryRecord, skeletonFrontIDPtr) { Record.SetStatus(status); VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID()); @@ -2498,10 +2493,8 @@ namespace NKikimr { TEvVDbStatResult() = default; TEvVDbStatResult(NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TInstant &now, - const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr, - NWilson::TTraceId traceId) - : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), - TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG) + const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, const NVDiskMon::TLtcHistoPtr &histoPtr) + : TEvVResultBasePB(now, counterPtr, histoPtr, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG) { Record.SetStatus(status); VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID()); @@ -2566,8 +2559,8 @@ namespace NKikimr { // read response TEvVSyncGuidResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TInstant &now, ui64 guid, EState state, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel) - : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel) + const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel) + : TEvVResultBasePB(now, counterPtr, histoPtr, channel) { Record.SetStatus(status); auto guidInfo = Record.MutableReadInfo(); @@ -2579,8 +2572,8 @@ namespace NKikimr { // write or error response TEvVSyncGuidResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TInstant &now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel) - : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel) + const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel) + : TEvVResultBasePB(now, counterPtr, histoPtr, channel) { Record.SetStatus(status); VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID()); @@ -2625,8 +2618,8 @@ namespace NKikimr { TEvVSyncResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TSyncState &newSyncState, bool finished, NPDisk::TStatusFlags flags, const TInstant &now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel) - : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel) + const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel) + : TEvVResultBasePB(now, counterPtr, histoPtr, channel) { Record.SetStatus(status); SyncStateFromSyncState(newSyncState, Record.MutableNewSyncState()); @@ -2637,8 +2630,8 @@ namespace NKikimr { TEvVSyncResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, NPDisk::TStatusFlags flags, const TInstant &now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel) - : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel) + const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel) + : TEvVResultBasePB(now, counterPtr, histoPtr, channel) { Record.SetStatus(status); VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID()); @@ -2691,8 +2684,8 @@ namespace NKikimr { TEvVSyncFullResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, const TSyncState &syncState, ui64 cookie, const TInstant &now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel) - : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel) + const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel) + : TEvVResultBasePB(now, counterPtr, histoPtr, channel) { Record.SetStatus(status); VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID()); @@ -2702,8 +2695,8 @@ namespace NKikimr { TEvVSyncFullResult(const NKikimrProto::EReplyStatus status, const TVDiskID &vdisk, ui64 cookie, const TInstant &now, const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr, - const NVDiskMon::TLtcHistoPtr &histoPtr, NWilson::TTraceId traceId, ui32 channel) - : TEvVResultBasePB(now, counterPtr, histoPtr, std::move(traceId), channel) + const NVDiskMon::TLtcHistoPtr &histoPtr, ui32 channel) + : TEvVResultBasePB(now, counterPtr, histoPtr, channel) { Record.SetStatus(status); VDiskIDFromVDiskID(vdisk, Record.MutableVDiskID()); diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h index 840f5f7da1f..64c92463539 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_private_events.h @@ -61,10 +61,9 @@ namespace NKikimr { const ui64 OrderId; TEvDelLogoBlobDataSyncLogResult(ui64 orderId, const TInstant &now, - ::NMonitoring::TDynamicCounters::TCounterPtr counterPtr, NVDiskMon::TLtcHistoPtr histoPtr, - NWilson::TTraceId traceId) + ::NMonitoring::TDynamicCounters::TCounterPtr counterPtr, NVDiskMon::TLtcHistoPtr histoPtr) : TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr, - histoPtr, std::move(traceId)) + histoPtr) , OrderId(orderId) {} }; diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp index 6555ca46312..8613e05c0ef 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp +++ b/ydb/core/blobstorage/vdisk/common/vdisk_response.cpp @@ -14,27 +14,23 @@ void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEve } void SendVDiskResponse(const TActorContext &ctx, const TActorId &recipient, IEventBase *ev, ui64 cookie, ui32 channel) { - NWilson::TTraceId traceId; - switch (const ui32 type = ev->Type()) { -#define WILSON_HANDLE_EVENT(T, EV) \ +#define HANDLE_EVENT(T, EV) \ case TEvBlobStorage::T::EventType: { \ TEvBlobStorage::T *event = static_cast<TEvBlobStorage::T *>(ev); \ - traceId = std::move(event->TraceId); \ const double usPerCycle = 1000000.0 / NHPTimer::GetCyclesPerSecond(); \ event->Record.MutableTimestamps()->SetSentByVDiskUs(GetCycleCountFast() * usPerCycle); \ break; \ } - WILSON_HANDLE_EVENT(TEvVPutResult, EvVPutResultSent) - WILSON_HANDLE_EVENT(TEvVMultiPutResult, EvVMultiPutResultSent) - WILSON_HANDLE_EVENT(TEvVGetResult, EvVGetResultSent) + HANDLE_EVENT(TEvVPutResult, EvVPutResultSent) + HANDLE_EVENT(TEvVMultiPutResult, EvVMultiPutResultSent) + HANDLE_EVENT(TEvVGetResult, EvVGetResultSent) -#undef WILSON_HANDLE_EVENT +#undef HANDLE_EVENT } - auto event = std::make_unique<IEventHandle>(recipient, ctx.SelfID, ev, IEventHandle::MakeFlags(channel, 0), cookie, - nullptr, std::move(traceId)); + auto event = std::make_unique<IEventHandle>(recipient, ctx.SelfID, ev, IEventHandle::MakeFlags(channel, 0), cookie); if (TEvVResultBase *base = dynamic_cast<TEvVResultBase *>(ev)) { base->FinalizeAndSend(ctx, std::move(event)); } else { diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp index 6c154281eb8..e306630ee2c 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp @@ -10,6 +10,7 @@ #include <ydb/core/blobstorage/vdisk/common/vdisk_lsnmngr.h> #include <ydb/core/blobstorage/vdisk/common/blobstorage_dblogcutter.h> #include <ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h> +#include <ydb/core/base/wilson.h> #include <library/cpp/monlib/service/pages/templates.h> using namespace NKikimrServices; @@ -154,6 +155,7 @@ namespace NKikimr { ui64 WriteId; TDiskPart DiskAddr; static void *Cookie; + NWilson::TSpan Span; friend class TActorBootstrapped<THullHugeBlobWriter>; @@ -186,6 +188,7 @@ namespace NKikimr { "Writer: bootstrap: id# %s chunkId# %u offset# %u storedBlobSize# %u " "writtenSize# %u", HugeSlot.ToString().data(), chunkId, offset, storedBlobSize, writtenSize)); + Span.Event("Send_TEvChunkWrite", NWilson::TKeyValueList{{{"ChunkId", chunkId}, {"Offset", offset}, {"WrittenSize", writtenSize}}}); ctx.Send(HugeKeeperCtx->PDiskCtx->PDiskId, new NPDisk::TEvChunkWrite(HugeKeeperCtx->PDiskCtx->Dsk->Owner, HugeKeeperCtx->PDiskCtx->Dsk->OwnerRound, chunkId, offset, @@ -197,12 +200,16 @@ namespace NKikimr { } void Handle(NPDisk::TEvChunkWriteResult::TPtr &ev, const TActorContext &ctx) { + if (ev->Get()->Status == NKikimrProto::OK) { + Span.EndOk(); + } else { + Span.EndError(TStringBuilder() << NKikimrProto::EReplyStatus_Name(ev->Get()->Status)); + } CHECK_PDISK_RESPONSE(HugeKeeperCtx->VCtx, ev, ctx); ctx.Send(NotifyID, new TEvHullHugeWritten(HugeSlot)); - ctx.Send(HugeKeeperCtx->SkeletonId, - new TEvHullLogHugeBlob(WriteId, Item->LogoBlobId, Item->Ingress, DiskAddr, - Item->IgnoreBlock, Item->SenderId, Item->Cookie, std::move(Item->Result), - &Item->ExtraBlockChecks)); + ctx.Send(HugeKeeperCtx->SkeletonId, new TEvHullLogHugeBlob(WriteId, Item->LogoBlobId, Item->Ingress, DiskAddr, + Item->IgnoreBlock, Item->SenderId, Item->Cookie, std::move(Item->Result), &Item->ExtraBlockChecks), 0, 0, + Span.GetTraceId()); LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, "Writer: finish: id# %s diskAddr# %s", @@ -228,7 +235,8 @@ namespace NKikimr { const TActorId ¬ifyID, const NHuge::THugeSlot &hugeSlot, std::unique_ptr<TEvHullWriteHugeBlob> item, - ui64 wId) + ui64 wId, + NWilson::TTraceId traceId) : TActorBootstrapped<TThis>() , HugeKeeperCtx(std::move(hugeKeeperCtx)) , NotifyID(notifyID) @@ -236,6 +244,7 @@ namespace NKikimr { , Item(std::move(item)) , WriteId(wId) , DiskAddr() + , Span(TWilson::VDiskInternals, std::move(traceId), "VDisk.HugeBlobKeeper.Write") {} }; @@ -527,18 +536,9 @@ namespace NKikimr { // THullHugeKeeperState //////////////////////////////////////////////////////////////////////////// struct THullHugeKeeperState { - //////////////////////////// TMsgQueue ///////////////////////////////// - struct TDestroy { - static void Destroy(TEvHullWriteHugeBlob *item) { - delete item; - } - }; - typedef TIntrusiveListWithAutoDelete<TEvHullWriteHugeBlob, TDestroy> TMsgQueue; - //////////////////////////// TMsgQueue ///////////////////////////////// - ui64 WaitQueueSize = 0; ui64 WaitQueueByteSize = 0; - TMsgQueue WaitQueue; // Huge blobs for writing are placed here + std::deque<std::unique_ptr<TEvHullWriteHugeBlob::THandle>> WaitQueue; bool Committing = false; ui64 FreeUpToLsn = 0; // last value we got from PDisk @@ -627,27 +627,27 @@ namespace NKikimr { TActiveActors ActiveActors; std::unordered_set<ui32> AllocatingChunkPerSlotSize; - void PutToWaitQueue(std::unique_ptr<TEvHullWriteHugeBlob> item) { + void PutToWaitQueue(std::unique_ptr<TEvHullWriteHugeBlob::THandle> item) { State.WaitQueueSize++; - State.WaitQueueByteSize += item->ByteSize(); - State.WaitQueue.PushBack(item.release()); + State.WaitQueueByteSize += item->Get()->ByteSize(); + State.WaitQueue.push_back(std::move(item)); } - bool ProcessWrite(TEvHullWriteHugeBlob *ev, const TActorContext& ctx) { + bool ProcessWrite(TEvHullWriteHugeBlob::THandle& ev, const TActorContext& ctx, bool fromWaitQueue) { + auto& msg = *ev.Get(); NHuge::THugeSlot hugeSlot; ui32 slotSize; - if (State.Pers->Heap->Allocate(ev->Data.GetSize(), &hugeSlot, &slotSize)) { - if (!ev->Empty()) { // remove item from the WaitQueue + if (State.Pers->Heap->Allocate(msg.Data.GetSize(), &hugeSlot, &slotSize)) { + if (fromWaitQueue) { --State.WaitQueueSize; - State.WaitQueueByteSize -= ev->ByteSize(); - ev->Unlink(); + State.WaitQueueByteSize -= msg.ByteSize(); } const bool inserted = State.Pers->AllocatedSlots.insert(hugeSlot).second; Y_VERIFY(inserted); const ui64 wId = State.LogLsnFifo.Push(HugeKeeperCtx->LsnMngr->GetLsn()); auto aid = ctx.Register(new THullHugeBlobWriter(HugeKeeperCtx, ctx.SelfID, hugeSlot, - std::unique_ptr<TEvHullWriteHugeBlob>(ev), wId)); + std::unique_ptr<TEvHullWriteHugeBlob>(ev.Release().Release()), wId, std::move(ev.TraceId))); ActiveActors.Insert(aid); return true; } else if (AllocatingChunkPerSlotSize.insert(slotSize).second) { @@ -658,7 +658,11 @@ namespace NKikimr { } void ProcessQueue(const TActorContext &ctx) { - State.WaitQueue.ForEach(std::bind(&TThis::ProcessWrite, this, std::placeholders::_1, std::cref(ctx))); + auto it = State.WaitQueue.begin(); + while (it != State.WaitQueue.end() && ProcessWrite(**it, ctx, true)) { + ++it; + } + State.WaitQueue.erase(State.WaitQueue.begin(), it); } void FreeChunks(const TActorContext &ctx) { @@ -739,11 +743,10 @@ namespace NKikimr { //////////// Event Handlers //////////////////////////////////// void Handle(TEvHullWriteHugeBlob::TPtr &ev, const TActorContext &ctx) { - std::unique_ptr<TEvHullWriteHugeBlob> item(ev->Release().Release()); - LOG_DEBUG(ctx, BS_HULLHUGE, - VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, - "THullHugeKeeper: TEvHullWriteHugeBlob: %s", item->ToString().data())); - if (ProcessWrite(item.get(), ctx)) { + LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, + "THullHugeKeeper: TEvHullWriteHugeBlob: %s", std::data(ev->Get()->ToString()))); + std::unique_ptr<TEvHullWriteHugeBlob::THandle> item(ev.Release()); + if (ProcessWrite(*item, ctx, false)) { (void)item.release(); } else { PutToWaitQueue(std::move(item)); diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h index 1876855d797..0eaaca14f56 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h @@ -5,13 +5,14 @@ #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_pdiskctx.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_defrag.h> +#include <library/cpp/actors/wilson/wilson_span.h> namespace NKikimr { //////////////////////////////////////////////////////////////////////////// // TEvHullWriteHugeBlob //////////////////////////////////////////////////////////////////////////// - class TEvHullWriteHugeBlob : public TEventLocal<TEvHullWriteHugeBlob, TEvBlobStorage::EvHullWriteHugeBlob>, public TIntrusiveListItem<TEvHullWriteHugeBlob> { + class TEvHullWriteHugeBlob : public TEventLocal<TEvHullWriteHugeBlob, TEvBlobStorage::EvHullWriteHugeBlob> { public: const TActorId SenderId; const ui64 Cookie; diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp index 58c881fb027..714d0e322ae 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.cpp @@ -150,9 +150,10 @@ namespace NKikimr { IEventBase *msg, const TActorId &recipient, ui64 recipientCookie, + NWilson::TTraceId traceId, ui64 lsn) { - Fields->DelayedResponses.Put(msg, recipient, recipientCookie, lsn); + Fields->DelayedResponses.Put(msg, recipient, recipientCookie, std::move(traceId), lsn); } //////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h index 60c68a51114..938230dce42 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h @@ -84,7 +84,8 @@ namespace NKikimr { // Request from PDisk to cut the recovery log void CutRecoveryLog(const TActorContext &ctx, std::unique_ptr<NPDisk::TEvCutLog> msg); - void PostponeReplyUntilCommitted(IEventBase *msg, const TActorId &recipient, ui64 recipientCookie, ui64 lsn); + void PostponeReplyUntilCommitted(IEventBase *msg, const TActorId &recipient, ui64 recipientCookie, + NWilson::TTraceId traceId, ui64 lsn); //////////////////////////////////////////////////////////////////////// // LogoBlobs @@ -135,7 +136,7 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////// // Blocks //////////////////////////////////////////////////////////////////////// - using TReplySender = std::function<void (const TActorId &, ui64, IEventBase *)>; + using TReplySender = std::function<void (const TActorId &, ui64, NWilson::TTraceId, IEventBase *)>; THullCheckStatus CheckBlockCmdAndAllocLsn( ui64 tabletID, diff --git a/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp.h b/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp.h index 8d65c474c9b..99c87171716 100644 --- a/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp.h +++ b/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp.h @@ -1,6 +1,10 @@ #pragma once + #include "defs.h" +#include <ydb/core/base/wilson.h> +#include <library/cpp/actors/wilson/wilson_span.h> + namespace NKikimr { /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -11,17 +15,23 @@ namespace NKikimr { /////////////////////////////////////////////////////////////////////////////////////////////////////// class TDelayedResponses { public: - using TAction = std::function<void (const TActorId &id, ui64 cookie, IEventBase *msg)>; - - void Put(IEventBase *msg, const TActorId &recipient, ui64 recipientCookie, ui64 lsn) { - Map.emplace(lsn, TValue {recipient, recipientCookie, std::unique_ptr<IEventBase>(msg)}); + using TAction = std::function<void (const TActorId &id, ui64 cookie, NWilson::TTraceId traceId, IEventBase *msg)>; + + void Put(IEventBase *msg, const TActorId &recipient, ui64 recipientCookie, NWilson::TTraceId traceId, ui64 lsn) { + Map.emplace(lsn, TValue{ + recipient, + recipientCookie, + NWilson::TSpan(TWilson::VDiskInternals, std::move(traceId), "VDisk.DelayedResponses.Queue"), + std::unique_ptr<IEventBase>(msg) + }); } void ConfirmLsn(ui64 lsn, const TAction &action) { TMap::iterator it = Map.begin(); while (it != Map.end() && it->first <= lsn) { TValue &v = it->second; - action(v.Recipient, v.RecipientCookie, v.Msg.release()); + v.Span.EndOk(); + action(v.Recipient, v.RecipientCookie, v.Span.GetTraceId(), v.Msg.release()); ++it; } // remove all traversed elements @@ -32,6 +42,7 @@ namespace NKikimr { struct TValue { TActorId Recipient; ui64 RecipientCookie; + NWilson::TSpan Span; std::unique_ptr<IEventBase> Msg; }; diff --git a/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp_ut.cpp b/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp_ut.cpp index f95ace015a2..29fed34e8d0 100644 --- a/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp_ut.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/hullop_delayedresp_ut.cpp @@ -10,7 +10,7 @@ namespace NKikimr { Y_UNIT_TEST(Test) { TVector<ui64> res; - auto action = [&res] (const TActorId &actorId, ui64 cookie, IEventBase *msg) { + auto action = [&res] (const TActorId &actorId, ui64 cookie, NWilson::TTraceId, IEventBase *msg) { Y_UNUSED(actorId); STR << "cookie# " << cookie << "\n"; res.push_back(cookie); @@ -18,12 +18,12 @@ namespace NKikimr { }; auto dr = std::make_unique<TDelayedResponses>(); - dr->Put(nullptr, TActorId(), 1, 500); - dr->Put(nullptr, TActorId(), 2, 500); - dr->Put(nullptr, TActorId(), 3, 501); - dr->Put(nullptr, TActorId(), 4, 500); + dr->Put(nullptr, TActorId(), 1, {}, 500); + dr->Put(nullptr, TActorId(), 2, {}, 500); + dr->Put(nullptr, TActorId(), 3, {}, 501); + dr->Put(nullptr, TActorId(), 4, {}, 500); dr->ConfirmLsn(500, action); - dr->Put(nullptr, TActorId(), 5, 502); + dr->Put(nullptr, TActorId(), 5, {}, 502); dr->ConfirmLsn(501, action); dr->ConfirmLsn(502, action); diff --git a/ydb/core/blobstorage/vdisk/query/query_extr.cpp b/ydb/core/blobstorage/vdisk/query/query_extr.cpp index 6315bd47f28..328f2f207a8 100644 --- a/ydb/core/blobstorage/vdisk/query/query_extr.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_extr.cpp @@ -345,8 +345,8 @@ namespace NKikimr { SendResponseAndDie(ctx, this); } else { ui8 priority = PDiskPriority(); - std::unique_ptr<IActor> a(Batcher.CreateAsyncDataReader(ctx.SelfID, priority, std::move(Result->TraceId), - IsRepl(), TActivationContext::Now())); + std::unique_ptr<IActor> a(Batcher.CreateAsyncDataReader(ctx.SelfID, priority, /*std::move(Result->TraceId)*/ NWilson::TTraceId(), // FIXME: trace + IsRepl())); if (a) { auto aid = ctx.Register(a.release()); ActiveActors.Insert(aid); diff --git a/ydb/core/blobstorage/vdisk/query/query_readactor.cpp b/ydb/core/blobstorage/vdisk/query/query_readactor.cpp index 9a27de1b3f7..664e57d90bb 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readactor.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_readactor.cpp @@ -1,5 +1,6 @@ #include "query_readbatch.h" #include <ydb/core/blobstorage/base/vdisk_priorities.h> +#include <ydb/core/base/wilson.h> #include <library/cpp/actors/wilson/wilson_span.h> #include <util/generic/algorithm.h> @@ -47,7 +48,7 @@ namespace NKikimr { // send request TReplQuoter::QuoteMessage(quoter, std::make_unique<IEventHandle>(Ctx->PDiskCtx->PDiskId, SelfId(), - msg.release(), 0, 0, nullptr, Span), it->Part.Size); + msg.release(), 0, 0, nullptr, Span.GetTraceId()), it->Part.Size); Counter++; } @@ -116,14 +117,13 @@ namespace NKikimr { std::shared_ptr<TReadBatcherResult> result, ui8 priority, NWilson::TTraceId traceId, - bool isRepl, - TInstant now) + bool isRepl) : TActorBootstrapped<TTReadBatcherActor>() , Ctx(ctx) , NotifyID(notifyID) , Result(std::move(result)) , Priority(priority) - , Span(12, NWilson::ERelation::ChildOf, std::move(traceId), now, "VDisk.TReadBatcherActor") + , Span(TWilson::VDiskInternals, std::move(traceId), "VDisk.Query.ReadBatcher") , IsRepl(isRepl) {} }; @@ -134,10 +134,9 @@ namespace NKikimr { std::shared_ptr<TReadBatcherResult> result, ui8 priority, NWilson::TTraceId traceId, - bool isRepl, - TInstant now) + bool isRepl) { - return new TTReadBatcherActor(ctx, notifyID, result, priority, std::move(traceId), isRepl, now); + return new TTReadBatcherActor(ctx, notifyID, result, priority, std::move(traceId), isRepl); } } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/query/query_readactor.h b/ydb/core/blobstorage/vdisk/query/query_readactor.h index daee28aaf6f..614625ee8b3 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readactor.h +++ b/ydb/core/blobstorage/vdisk/query/query_readactor.h @@ -11,8 +11,7 @@ namespace NKikimr { std::shared_ptr<TReadBatcherResult> result, ui8 priority, NWilson::TTraceId traceId, - bool isRepl, - TInstant now); + bool isRepl); } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp index 566af60d43c..8b9ac0caeca 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp @@ -218,7 +218,7 @@ namespace NKikimr { } IActor *TReadBatcher::CreateAsyncDataReader(const TActorId ¬ifyID, ui8 priority, NWilson::TTraceId traceId, - bool isRepl, TInstant now) { + bool isRepl) { if (Result->DiskDataItemPtrs.empty()) return nullptr; else { @@ -239,7 +239,7 @@ namespace NKikimr { PDiskReadBytes += size; } // start reader - return CreateReadBatcherActor(Ctx, notifyID, Result, priority, std::move(traceId), isRepl, now); + return CreateReadBatcherActor(Ctx, notifyID, Result, priority, std::move(traceId), isRepl); } } diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.h b/ydb/core/blobstorage/vdisk/query/query_readbatch.h index fba173b8092..b0592a9a202 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readbatch.h +++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.h @@ -329,8 +329,7 @@ namespace NKikimr { // creates an actor for efficient async data reads, returns nullptr // if no read required - IActor *CreateAsyncDataReader(const TActorId ¬ifyID, ui8 priority, NWilson::TTraceId traceId, bool isRepl, - TInstant now); + IActor *CreateAsyncDataReader(const TActorId ¬ifyID, ui8 priority, NWilson::TTraceId traceId, bool isRepl); const TReadBatcherResult &GetResult() const { return *Result; } ui64 GetPDiskReadBytes() const { return PDiskReadBytes; } diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index f2e2847b698..0e6c0816fe5 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -295,19 +295,22 @@ namespace NKikimr { } struct TVPutInfo { - TRope Buffer = {}; - TLogoBlobID BlobId = {}; - TIngress Ingress = {}; - TLsnSeg Lsn = {}; + TRope Buffer; + TLogoBlobID BlobId; + TIngress Ingress; + TLsnSeg Lsn; THullCheckStatus HullStatus; bool IsHugeBlob = false; NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> ExtraBlockChecks; + NWilson::TTraceId TraceId; TVPutInfo(TLogoBlobID blobId, TRope &&buffer, - NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks) + NProtoBuf::RepeatedPtrField<NKikimrBlobStorage::TEvVPut::TExtraBlockCheck> *extraBlockChecks, + NWilson::TTraceId traceId) : Buffer(std::move(buffer)) , BlobId(blobId) , HullStatus({NKikimrProto::UNKNOWN, 0, false}) + , TraceId(std::move(traceId)) { ExtraBlockChecks.Swap(extraBlockChecks); } @@ -318,20 +321,9 @@ namespace NKikimr { // batched along with other VDisk log entries on the PDisk } - TLoggedRecVPut* CreateLoggedRec(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id, - const TIngress &ingress, TRope &&buffer, std::unique_ptr<TEvBlobStorage::TEvVPutResult> res, - const TActorId &sender, ui64 cookie) - { - return new TLoggedRecVPut(seg, confirmSyncLogAlso, id, ingress, std::move(buffer), std::move(res), sender, cookie); - } - - TLoggedRecVMultiPutItem* CreateLoggedRec(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id, - const TIngress &ingress, TRope &&buffer, std::unique_ptr<TEvVMultiPutItemResult> res, - const TActorId &sender, ui64 cookie) - { - return new TLoggedRecVMultiPutItem(seg, confirmSyncLogAlso, id, ingress, std::move(buffer), std::move(res), - sender, cookie); - } + template<typename TEvResult> struct TLoggedRecType {}; + template<> struct TLoggedRecType<TEvBlobStorage::TEvVPutResult> { using T = TLoggedRecVPut; }; + template<> struct TLoggedRecType<TEvVMultiPutItemResult> { using T = TLoggedRecVMultiPutItem; }; template <typename TEvResult> std::unique_ptr<NPDisk::TEvLog> CreatePutLogEvent(const TActorContext &ctx, TString evPrefix, NActors::TActorId sender, @@ -362,8 +354,8 @@ namespace NKikimr { UpdatePDiskWriteBytes(dataToWrite.size()); bool confirmSyncLogAlso = static_cast<bool>(syncLogMsg); - intptr_t loggedRecId = LoggedRecsVault.Put( - CreateLoggedRec(seg, confirmSyncLogAlso, id, ingress, std::move(buffer), std::move(result), sender, cookie)); + intptr_t loggedRecId = LoggedRecsVault.Put(new typename TLoggedRecType<TEvResult>::T(seg, confirmSyncLogAlso, + id, ingress, std::move(buffer), std::move(result), sender, cookie, std::move(info.TraceId))); void *loggedRecCookie = reinterpret_cast<void *>(loggedRecId); // create log msg auto logMsg = CreateHullUpdate(HullLogCtx, TLogSignature::SignatureLogoBlobOpt, dataToWrite, @@ -482,7 +474,8 @@ namespace NKikimr { for (ui64 itemIdx = 0; itemIdx < record.ItemsSize(); ++itemIdx) { auto &item = *record.MutableItems(itemIdx); TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(item.GetBlobID()); - putsInfo.emplace_back(blobId, ev->Get()->GetItemBuffer(itemIdx), item.MutableExtraBlockChecks()); + putsInfo.emplace_back(blobId, ev->Get()->GetItemBuffer(itemIdx), item.MutableExtraBlockChecks(), + item.HasTraceId() ? item.GetTraceId() : NWilson::TTraceId()); TVPutInfo &info = putsInfo.back(); try { @@ -545,7 +538,6 @@ namespace NKikimr { std::unique_ptr<NPDisk::TEvMultiLog> evLogs = std::make_unique<NPDisk::TEvMultiLog>(); ui64 cookie = ev->Cookie; - const NWilson::TTraceId traceId(ev->TraceId); IActor* vMultiPutActor = CreateSkeletonVMultiPutActor(SelfId(), statuses, oosStatus, ev, SkeletonFrontIDPtr, IFaceMonGroup->MultiPutResMsgsPtr(), Db->GetVDiskIncarnationGuid()); @@ -567,7 +559,8 @@ namespace NKikimr { if (info.HullStatus.Postponed) { auto result = std::make_unique<TEvVMultiPutItemResult>(info.BlobId, itemIdx, status, errorReason); - Hull->PostponeReplyUntilCommitted(result.release(), vMultiPutActorId, itemIdx, info.HullStatus.Lsn); + Hull->PostponeReplyUntilCommitted(result.release(), vMultiPutActorId, itemIdx, std::move(info.TraceId), + info.HullStatus.Lsn); continue; } @@ -586,14 +579,15 @@ namespace NKikimr { new TEvBlobStorage::TEvVPutResult(status, info.BlobId, SelfVDiskId, &itemIdx, oosStatus, now, vPut.GetCachedByteSize(), &vPut.Record, SkeletonFrontIDPtr, nullptr, VCtx->Histograms.GetHistogram(handleClass), info.Buffer.GetSize(), - NWilson::TTraceId(), Db->GetVDiskIncarnationGuid(), errorReason)); + Db->GetVDiskIncarnationGuid(), errorReason)); if (info.Buffer) { auto hugeWrite = CreateHullWriteHugeBlob(vMultiPutActorId, cookie, ignoreBlock, handleClass, info, std::move(result)); - ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, NWilson::TTraceId(traceId)); + ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(info.TraceId)); } else { ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(), - ignoreBlock, vMultiPutActorId, cookie, std::move(result), &info.ExtraBlockChecks)); + ignoreBlock, vMultiPutActorId, cookie, std::move(result), &info.ExtraBlockChecks), 0, 0, + std::move(info.TraceId)); } } else { Y_VERIFY(lsnBatch.First <= lsnBatch.Last); @@ -625,7 +619,7 @@ namespace NKikimr { std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status.Status, status.ErrorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo)); if (status.Postponed) { - Hull->PostponeReplyUntilCommitted(res.release(), ev->Sender, ev->Cookie, status.Lsn); + Hull->PostponeReplyUntilCommitted(res.release(), ev->Sender, ev->Cookie, std::move(ev->TraceId), status.Lsn); } else { SendReply(ctx, std::move(res), ev, BS_VDISK_PUT); } @@ -654,7 +648,7 @@ namespace NKikimr { const TLogoBlobID id = LogoBlobIDFromLogoBlobID(record.GetBlobID()); LWTRACK(VDiskSkeletonVPutRecieved, ev->Get()->Orbit, VCtx->NodeId, VCtx->GroupId, VCtx->Top->GetFailDomainOrderNumber(VCtx->ShortSelfVDisk), id.TabletID(), id.BlobSize()); - TVPutInfo info(id, ev->Get()->GetBuffer(), record.MutableExtraBlockChecks()); + TVPutInfo info(id, ev->Get()->GetBuffer(), record.MutableExtraBlockChecks(), std::move(ev->TraceId)); const ui64 bufSize = info.Buffer.GetSize(); try { @@ -723,10 +717,10 @@ namespace NKikimr { NKikimrBlobStorage::EPutHandleClass handleClass = record.GetHandleClass(); auto hugeWrite = CreateHullWriteHugeBlob(ev->Sender, ev->Cookie, ignoreBlock, handleClass, info, std::move(result)); - ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(ev->TraceId)); + ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(info.TraceId)); } else { - ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(), - ignoreBlock, ev->Sender, ev->Cookie, std::move(result), &info.ExtraBlockChecks)); + ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(), ignoreBlock, + ev->Sender, ev->Cookie, std::move(result), &info.ExtraBlockChecks), 0, 0, std::move(info.TraceId)); } } @@ -746,7 +740,7 @@ namespace NKikimr { } if (status.Postponed) { Hull->PostponeReplyUntilCommitted(msg->Result.release(), msg->OrigClient, msg->OrigCookie, - status.Lsn); + std::move(ev->TraceId), status.Lsn); } else { SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), msg->OrigCookie); } @@ -777,8 +771,7 @@ namespace NKikimr { UpdatePDiskWriteBytes(dataToWrite.size()); // prepare TLoggedRecVPutHuge bool confirmSyncLogAlso = static_cast<bool>(syncLogMsg); - intptr_t loggedRecId = LoggedRecsVault.Put(new TLoggedRecVPutHuge(seg, confirmSyncLogAlso, - Db->HugeKeeperID, ev)); + intptr_t loggedRecId = LoggedRecsVault.Put(new TLoggedRecVPutHuge(seg, confirmSyncLogAlso, Db->HugeKeeperID, ev)); void *loggedRecCookie = reinterpret_cast<void *>(loggedRecId); // create log msg auto logMsg = CreateHullUpdate(HullLogCtx, TLogSignature::SignatureHugeLogoBlob, dataToWrite, seg, @@ -806,7 +799,7 @@ namespace NKikimr { std::unique_ptr<NSyncLog::TEvSyncLogPut> syncLogMsg( new NSyncLog::TEvSyncLogPut(Db->GType, seg.Point(), msg->Id, msg->Ingress)); std::unique_ptr<TEvDelLogoBlobDataSyncLogResult> result(new TEvDelLogoBlobDataSyncLogResult(msg->OrderId, now, - nullptr, nullptr, NWilson::TTraceId())); + nullptr, nullptr)); bool confirmSyncLogAlso = static_cast<bool>(syncLogMsg); intptr_t loggedRecId = LoggedRecsVault.Put( @@ -870,8 +863,8 @@ namespace NKikimr { auto handleClass = ev->Get()->Record.GetHandleClass(); auto result = std::make_unique<TEvBlobStorage::TEvVGetResult>(NKikimrProto::OK, SelfVDiskId, now, ev->Get()->GetCachedByteSize(), &record, ev->Get()->GetIsLocalMon() ? nullptr : SkeletonFrontIDPtr, - IFaceMonGroup->GetResMsgsPtr(), VCtx->Histograms.GetHistogram(handleClass), std::move(ev->TraceId), - cookie, ev->GetChannel(), Db->GetVDiskIncarnationGuid()); + IFaceMonGroup->GetResMsgsPtr(), VCtx->Histograms.GetHistogram(handleClass), cookie, ev->GetChannel(), + Db->GetVDiskIncarnationGuid()); if (record.GetAcquireBlockedGeneration()) { ui64 tabletId = record.GetTabletId(); if (tabletId) { @@ -950,7 +943,8 @@ namespace NKikimr { if (status != NKikimrProto::OK) { if (postponed) { - Hull->PostponeReplyUntilCommitted(result.release(), ev->Sender, ev->Cookie, postponeUntilLsn); + Hull->PostponeReplyUntilCommitted(result.release(), ev->Sender, ev->Cookie, std::move(ev->TraceId), + postponeUntilLsn); } else { LOG_DEBUG_S(ctx, BS_VDISK_BLOCK, VCtx->VDiskLogPrefix << "TEvVBlockResult: " << result->ToString() << " Marker# BSVS15"); @@ -994,18 +988,18 @@ namespace NKikimr { if (!SelfVDiskId.SameDisk(record.GetVDiskID())) { result = std::make_unique<TEvBlobStorage::TEvVGetBlockResult>(NKikimrProto::RACE, tabletId, SelfVDiskId, now, ev->Get()->GetCachedByteSize(), &ev->Get()->Record, SkeletonFrontIDPtr, - IFaceMonGroup->GetBlockResMsgsPtr(), nullptr, std::move(ev->TraceId)); + IFaceMonGroup->GetBlockResMsgsPtr(), nullptr); } else { ui32 blockedGen = 0; bool isBlocked = Hull->GetBlocked(tabletId, &blockedGen); if (isBlocked) { result = std::make_unique<TEvBlobStorage::TEvVGetBlockResult>(NKikimrProto::OK, tabletId, blockedGen, SelfVDiskId, now, ev->Get()->GetCachedByteSize(), &ev->Get()->Record, SkeletonFrontIDPtr, - IFaceMonGroup->GetBlockResMsgsPtr(), nullptr, std::move(ev->TraceId)); + IFaceMonGroup->GetBlockResMsgsPtr(), nullptr); } else { result = std::make_unique<TEvBlobStorage::TEvVGetBlockResult>(NKikimrProto::NODATA, tabletId, SelfVDiskId, now, ev->Get()->GetCachedByteSize(), &ev->Get()->Record, SkeletonFrontIDPtr, - IFaceMonGroup->GetBlockResMsgsPtr(), nullptr, std::move(ev->TraceId)); + IFaceMonGroup->GetBlockResMsgsPtr(), nullptr); } } @@ -1024,7 +1018,7 @@ namespace NKikimr { std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status.Status, status.ErrorReason, ev, now, SkeletonFrontIDPtr, SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo)); if (status.Postponed) { - Hull->PostponeReplyUntilCommitted(res.release(), ev->Sender, ev->Cookie, status.Lsn); + Hull->PostponeReplyUntilCommitted(res.release(), ev->Sender, ev->Cookie, std::move(ev->TraceId), status.Lsn); } else { SendReply(ctx, std::move(res), ev, BS_VDISK_GC); } @@ -1108,7 +1102,7 @@ namespace NKikimr { std::unique_ptr<TEvBlobStorage::TEvVGetBarrierResult> result; result = std::make_unique<TEvBlobStorage::TEvVGetBarrierResult>(NKikimrProto::OK, SelfVDiskId, now, ev->Get()->GetCachedByteSize(), &record, SkeletonFrontIDPtr, - IFaceMonGroup->GetBarrierResMsgsPtr(), nullptr, std::move(ev->TraceId)); + IFaceMonGroup->GetBarrierResMsgsPtr(), nullptr); THullDsSnap fullSnap = Hull->GetIndexSnapshot(); fullSnap.LogoBlobsSnap.Destroy(); fullSnap.BlocksSnap.Destroy(); @@ -1166,7 +1160,7 @@ namespace NKikimr { ReplyError(NKikimrProto::RACE, "group generation mismatch", ev, ctx, now); } else { auto result = std::make_unique<TEvBlobStorage::TEvVDbStatResult>(NKikimrProto::OK, SelfVDiskId, now, - IFaceMonGroup->DbStatResMsgsPtr(), nullptr, std::move(ev->TraceId)); + IFaceMonGroup->DbStatResMsgsPtr(), nullptr); THullDsSnap fullSnap = Hull->GetIndexSnapshot(); IActor *actor = CreateDbStatActor(HullCtx, HugeBlobCtx, ctx, std::move(fullSnap), ctx.SelfID, ev, std::move(result)); @@ -1356,7 +1350,7 @@ namespace NKikimr { void ReplyError(const NKikimrProto::EReplyStatus status, const TString& /*errorReason*/, TEvLocalSyncData::TPtr &ev, const TActorContext &ctx, const TInstant &now) { auto result = std::make_unique<TEvLocalSyncDataResult>(status, now, SyncLogIFaceGroup.LocalSyncResMsgsPtr(), - nullptr, NWilson::TTraceId()); + nullptr); SendReply(ctx, std::move(result), ev, BS_VDISK_OTHER); } @@ -1384,7 +1378,7 @@ namespace NKikimr { #endif std::unique_ptr<TEvLocalSyncDataResult> result( new TEvLocalSyncDataResult(NKikimrProto::OK, now, SyncLogIFaceGroup.LocalSyncResMsgsPtr(), - nullptr, NWilson::TTraceId())); + nullptr)); OverloadHandler->ActualizeWeights(ctx, AllEHullDbTypes); @@ -1427,8 +1421,7 @@ namespace NKikimr { TEvAnubisOsirisPut::TPtr &ev, const TActorContext &ctx, const TInstant &now) { - std::unique_ptr<IEventBase> res(new TEvAnubisOsirisPutResult(status, now, IFaceMonGroup->PutResMsgsPtr(), - nullptr, NWilson::TTraceId())); + std::unique_ptr<IEventBase> res(new TEvAnubisOsirisPutResult(status, now, IFaceMonGroup->PutResMsgsPtr(), nullptr)); SendReply(ctx, std::move(res), ev, BS_VDISK_PUT); } @@ -1451,8 +1444,7 @@ namespace NKikimr { OverloadHandler->ActualizeWeights(ctx, Mask(EHullDbType::LogoBlobs)); std::unique_ptr<TEvAnubisOsirisPutResult> result(new TEvAnubisOsirisPutResult(NKikimrProto::OK, now, - (msg->IsAnubis() ? IFaceMonGroup->AnubisPutResMsgsPtr() : IFaceMonGroup->OsirisPutResMsgsPtr()), - nullptr, NWilson::TTraceId())); + (msg->IsAnubis() ? IFaceMonGroup->AnubisPutResMsgsPtr() : IFaceMonGroup->OsirisPutResMsgsPtr()), nullptr)); // log data TAnubisOsirisPutRecoveryLogRec logRec(*msg); TString data = logRec.Serialize(); @@ -1541,7 +1533,7 @@ namespace NKikimr { auto oosStatus = VCtx->GetOutOfSpaceState().GetGlobalStatusFlags(); auto result = std::make_unique<TEvBlobStorage::TEvVPutResult>(NKikimrProto::OK, id, SelfVDiskId, nullptr, oosStatus, now, 0, nullptr, nullptr, IFaceMonGroup->RecoveredHugeBlobResMsgsPtr(), nullptr, bufSize, - std::move(ev->TraceId), 0, TString()); + 0, TString()); // pass the work to huge blob writer TIngress ingress = *TIngress::CreateIngressWithLocal(VCtx->Top.get(), SelfVDiskId, id); diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h index bd98ea87bdf..f209e77fca5 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h @@ -111,7 +111,7 @@ namespace NKikimr { return std::make_unique<TEvBlobStorage::TEvVMovedPatchResult>(status, originalId, patchedId, vdiskID, cookie, oosStatus, now, ev->Get()->GetCachedByteSize(), &record, skeletonFrontIDPtr, counterPtr, nullptr, - std::move(ev->TraceId), vdiskIncarnationGuid, errorReason); + vdiskIncarnationGuid, errorReason); } static inline std::unique_ptr<TEvBlobStorage::TEvVPatchFoundParts> @@ -128,7 +128,7 @@ namespace NKikimr { const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); return std::make_unique<TEvBlobStorage::TEvVPatchFoundParts>(status, originalId, patchedId, vdiskID, cookie, now, - errorReason, &record, skeletonFrontIDPtr, counterPtr, nullptr, std::move(ev->TraceId), vdiskIncarnationGuid); + errorReason, &record, skeletonFrontIDPtr, counterPtr, nullptr, vdiskIncarnationGuid); } static inline std::unique_ptr<TEvBlobStorage::TEvVPatchResult> @@ -146,7 +146,7 @@ namespace NKikimr { const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); auto res = std::make_unique<TEvBlobStorage::TEvVPatchResult>(status, originalId, patchedId, vdiskID, cookie, - now, &record, skeletonFrontIDPtr, counterPtr, nullptr, std::move(ev->TraceId), vdiskIncarnationGuid); + now, &record, skeletonFrontIDPtr, counterPtr, nullptr, vdiskIncarnationGuid); res->SetStatus(status, errorReason); return res; } @@ -160,7 +160,7 @@ namespace NKikimr { auto &record = ev->Get()->Record; const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); return std::make_unique<TEvBlobStorage::TEvVPatchXorDiffResult>(status, now, &record, skeletonFrontIDPtr, - counterPtr, nullptr, std::move(ev->TraceId)); + counterPtr, nullptr); } static inline std::unique_ptr<TEvBlobStorage::TEvVPutResult> @@ -180,7 +180,7 @@ namespace NKikimr { return std::make_unique<TEvBlobStorage::TEvVPutResult>(status, id, vdiskID, cookie, oosStatus, now, ev->Get()->GetCachedByteSize(), &record, skeletonFrontIDPtr, counterPtr, histoPtr, - bufferSizeBytes, std::move(ev->TraceId), vdiskIncarnationGuid, errorReason); + bufferSizeBytes, vdiskIncarnationGuid, errorReason); } static inline std::unique_ptr<TEvBlobStorage::TEvVBlockResult> @@ -192,7 +192,7 @@ namespace NKikimr { const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); return std::make_unique<TEvBlobStorage::TEvVBlockResult>(status, actual, vdiskID, now, ev->Get()->GetCachedByteSize(), &ev->Get()->Record, skeletonFrontIDPtr, counterPtr, - nullptr, std::move(ev->TraceId), vdiskIncarnationGuid); + nullptr, vdiskIncarnationGuid); } static inline std::unique_ptr<TEvBlobStorage::TEvVCollectGarbageResult> @@ -207,7 +207,7 @@ namespace NKikimr { const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); return std::make_unique<TEvBlobStorage::TEvVCollectGarbageResult>(status, tabletId, gen, channel, vdiskID, now, ev->Get()->GetCachedByteSize(), &record, skeletonFrontIDPtr, counterPtr, nullptr, - std::move(ev->TraceId), vdiskIncarnationGuid); + vdiskIncarnationGuid); } //////////////////////////////////////////////////////////////////////////// @@ -272,7 +272,7 @@ namespace NKikimr { const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); auto result = std::make_unique<TEvBlobStorage::TEvVMultiPutResult>(status, vdiskID, cookie, now, ev->Get()->GetCachedByteSize(), &record, skeletonFrontIDPtr, counterPtr, histoPtr, bufferSizeBytes, - std::move(ev->TraceId), vdiskIncarnationGuid, errorReason); + vdiskIncarnationGuid, errorReason); Y_VERIFY(record.ItemsSize() == statuses.size()); for (ui64 itemIdx = 0; itemIdx < record.ItemsSize(); ++itemIdx) { auto &item = record.GetItems(itemIdx); @@ -317,7 +317,7 @@ namespace NKikimr { const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); auto result = std::make_unique<TEvBlobStorage::TEvVGetResult>(status, vdiskID, now, ev->Get()->GetCachedByteSize(), &record, skeletonFrontIDPtr, counterPtr, histoPtr, - std::move(ev->TraceId), cookie, ev->GetChannel(), vdiskIncarnationGuid); + cookie, ev->GetChannel(), vdiskIncarnationGuid); // range query if (record.HasRangeQuery()) { const NKikimrBlobStorage::TRangeQuery *q = &record.GetRangeQuery(); @@ -360,7 +360,7 @@ namespace NKikimr { const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); auto result = std::make_unique<TEvBlobStorage::TEvVGetBlockResult>(status, ev->Get()->Record.GetTabletId(), vdiskID, now, ev->Get()->GetCachedByteSize(), &ev->Get()->Record, skeletonFrontIDPtr, counterPtr, - nullptr, std::move(ev->TraceId)); + nullptr); SetRacingGroupInfo(ev->Get()->Record, result->Record, groupInfo); return result; } @@ -386,8 +386,7 @@ namespace NKikimr { Y_UNUSED(vdiskIncarnationGuid); const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); auto result = std::make_unique<TEvBlobStorage::TEvVGetBarrierResult>(status,vdiskID, now, - ev->Get()->GetCachedByteSize(), &ev->Get()->Record, skeletonFrontIDPtr, counterPtr, nullptr, - std::move(ev->TraceId)); + ev->Get()->GetCachedByteSize(), &ev->Get()->Record, skeletonFrontIDPtr, counterPtr, nullptr); SetRacingGroupInfo(ev->Get()->Record, result->Record, groupInfo); return result; } @@ -400,8 +399,7 @@ namespace NKikimr { { Y_UNUSED(vdiskIncarnationGuid); const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); - return std::make_unique<TEvBlobStorage::TEvVDbStatResult>(status, vdiskID, now, counterPtr, nullptr, - std::move(ev->TraceId)); + return std::make_unique<TEvBlobStorage::TEvVDbStatResult>(status, vdiskID, now, counterPtr, nullptr); } static inline std::unique_ptr<IEventBase> @@ -414,7 +412,7 @@ namespace NKikimr { const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); auto flags = vctx->GetOutOfSpaceState().GetLocalStatusFlags(); return std::make_unique<TEvBlobStorage::TEvVSyncResult>(status, vdiskID, flags, now, counterPtr, nullptr, - std::move(ev->TraceId), ev->GetChannel()); + ev->GetChannel()); } static inline std::unique_ptr<IEventBase> @@ -427,7 +425,7 @@ namespace NKikimr { const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); ui64 cookie = ev->Get()->Record.GetCookie(); return std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(status, vdiskID, cookie, now, counterPtr, nullptr, - std::move(ev->TraceId), ev->GetChannel()); + ev->GetChannel()); } static inline std::unique_ptr<IEventBase> @@ -439,7 +437,7 @@ namespace NKikimr { Y_UNUSED(vdiskIncarnationGuid); const ::NMonitoring::TDynamicCounters::TCounterPtr &counterPtr = ResultingCounterForEvent(vctx, ev); return std::make_unique<TEvBlobStorage::TEvVSyncGuidResult>(status, vdiskID, now, counterPtr, nullptr, - std::move(ev->TraceId), ev->GetChannel()); + ev->GetChannel()); } } // NErrBuilder diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp index 0d6e09be6a8..9a1ddd7a9e1 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp @@ -19,9 +19,11 @@ #include <ydb/core/util/queue_inplace.h> #include <ydb/core/base/counters.h> +#include <ydb/core/base/wilson.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> #include <library/cpp/monlib/service/pages/templates.h> +#include <library/cpp/actors/wilson/wilson_span.h> #include <util/generic/set.h> #include <util/generic/maybe.h> @@ -114,9 +116,10 @@ namespace NKikimr { , ExtQueueId(extQueueId) , ClientId(clientId) , ActorId(Ev->Sender) - , Span(9 /*verbosity*/, NWilson::ERelation::FollowsFrom, std::move(Ev->TraceId), now, "VDisk.PutInQueue") + , Span(TWilson::VDiskTopLevel, std::move(Ev->TraceId), "VDisk.SkeletonFront.Queue") { Span.Attribute("QueueName", std::move(name)); + Ev->TraceId = Span.GetTraceId(); } }; diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp index 8452a571343..71443d172bc 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_syncfullhandler.cpp @@ -57,8 +57,7 @@ namespace NKikimr { if (!SelfVDiskId.SameGroupAndGeneration(SourceVDisk) || !SelfVDiskId.SameDisk(TargetVDisk)) { auto result = std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(NKikimrProto::ERROR, SelfVDiskId, - Record.GetCookie(), Now, IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, std::move(Ev->TraceId), - Ev->GetChannel()); + Record.GetCookie(), Now, IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, Ev->GetChannel()); SendVDiskResponse(ctx, recipient, result.release(), cookie); Die(ctx); return; @@ -73,7 +72,7 @@ namespace NKikimr { << " Marker# BSVSFH02"); auto result = std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(NKikimrProto::NODATA, SelfVDiskId, TSyncState(Db->GetVDiskIncarnationGuid(), DbBirthLsn), Record.GetCookie(), Now, - IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, std::move(Ev->TraceId), Ev->GetChannel()); + IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, Ev->GetChannel()); SendVDiskResponse(ctx, recipient, result.release(), cookie); Die(ctx); return; @@ -106,7 +105,7 @@ namespace NKikimr { TSyncState newSyncState(Db->GetVDiskIncarnationGuid(), syncedLsn); auto result = std::make_unique<TEvBlobStorage::TEvVSyncFullResult>(NKikimrProto::OK, SelfVDiskId, newSyncState, Record.GetCookie(), Now, IFaceMonGroup->SyncFullResMsgsPtr(), nullptr, - std::move(Ev->TraceId), Ev->GetChannel()); + Ev->GetChannel()); // snapshotLsn is _always_ the last confirmed lsn THullDsSnap fullSnap = Hull->GetIndexSnapshot(); diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp index 474714864b0..277789353c6 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp @@ -2,6 +2,7 @@ #include <ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_response.h> #include <ydb/core/blobstorage/vdisk/common/circlebufresize.h> +#include <ydb/core/base/wilson.h> namespace NKikimr { @@ -25,7 +26,8 @@ namespace NKikimr { TRope &&buffer, std::unique_ptr<TEvBlobStorage::TEvVPutResult> result, const TActorId &recipient, - ui64 recipientCookie) + ui64 recipientCookie, + NWilson::TTraceId traceId) : ILoggedRec(seg, confirmSyncLogAlso) , Id(id) , Ingress(ingress) @@ -33,6 +35,7 @@ namespace NKikimr { , Result(std::move(result)) , Recipient(recipient) , RecipientCookie(recipientCookie) + , Span(TWilson::VDiskInternals, std::move(traceId), "VDisk.Log.Put") {} void TLoggedRecVPut::Replay(THull &hull, const TActorContext &ctx) { @@ -44,6 +47,7 @@ namespace NKikimr { << " msg# " << Result->ToString() << " Marker# BSVSLR01"); + Span.EndOk(); SendVDiskResponse(ctx, Recipient, Result.release(), RecipientCookie); } @@ -58,7 +62,8 @@ namespace NKikimr { TRope &&buffer, std::unique_ptr<TEvVMultiPutItemResult> result, const TActorId &recipient, - ui64 recipientCookie) + ui64 recipientCookie, + NWilson::TTraceId traceId) : ILoggedRec(seg, confirmSyncLogAlso) , Id(id) , Ingress(ingress) @@ -66,6 +71,7 @@ namespace NKikimr { , Result(std::move(result)) , Recipient(recipient) , RecipientCookie(recipientCookie) + , Span(TWilson::VDiskInternals, std::move(traceId), "VDisk.Log.MultiPutItem") {} void TLoggedRecVMultiPutItem::Replay(THull &hull, const TActorContext &ctx) { @@ -78,6 +84,7 @@ namespace NKikimr { << " msg# " << Result->ToString() << " Marker# BSVSLR02"); + Span.EndOk(); ctx.Send(Recipient, Result.release(), RecipientCookie); } @@ -92,6 +99,7 @@ namespace NKikimr { : ILoggedRec(seg, confirmSyncLogAlso) , HugeKeeperId(hugeKeeperId) , Ev(ev) + , Span(TWilson::VDiskInternals, std::move(Ev->TraceId), "VDisk.Log.PutHuge") {} void TLoggedRecVPutHuge::Replay(THull &hull, const TActorContext &ctx) { @@ -107,6 +115,7 @@ namespace NKikimr { LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_PUT, hull.GetHullCtx()->VCtx->VDiskLogPrefix << "TEvVPut: realtime# false result# " << msg->Result->ToString() << " Marker# BSVSLR03"); + Span.EndOk(); SendVDiskResponse(ctx, msg->OrigClient, msg->Result.release(), msg->OrigCookie); } @@ -132,7 +141,7 @@ namespace NKikimr { {} void TLoggedRecVBlock::Replay(THull &hull, const TActorContext &ctx) { - auto replySender = [&ctx] (const TActorId &id, ui64 cookie, IEventBase *msg) { + auto replySender = [&ctx] (const TActorId &id, ui64 cookie, NWilson::TTraceId, IEventBase *msg) { SendVDiskResponse(ctx, id, msg, cookie); }; @@ -183,7 +192,7 @@ namespace NKikimr { {} void TLoggedRecLocalSyncData::Replay(THull &hull, const TActorContext &ctx) { - auto replySender = [&ctx] (const TActorId &id, ui64 cookie, IEventBase *msg) { + auto replySender = [&ctx] (const TActorId &id, ui64 cookie, NWilson::TTraceId, IEventBase *msg) { SendVDiskResponse(ctx, id, msg, cookie); }; diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h index 34c3c4a5dfb..71d9bbf9a9b 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h @@ -6,6 +6,7 @@ #include <ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h> #include <ydb/core/blobstorage/vdisk/anubis_osiris/blobstorage_anubis_osiris.h> #include <ydb/core/blobstorage/vdisk/repl/blobstorage_repl.h> +#include <library/cpp/actors/wilson/wilson_span.h> namespace NKikimr { @@ -49,7 +50,7 @@ namespace NKikimr { public: TLoggedRecVPut(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id, const TIngress &ingress, TRope &&buffer, std::unique_ptr<TEvBlobStorage::TEvVPutResult> result, const TActorId &recipient, - ui64 recipientCookie); + ui64 recipientCookie, NWilson::TTraceId traceId); void Replay(THull &hull, const TActorContext &ctx) override; private: @@ -59,6 +60,7 @@ namespace NKikimr { std::unique_ptr<TEvBlobStorage::TEvVPutResult> Result; TActorId Recipient; ui64 RecipientCookie; + NWilson::TSpan Span; }; /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -68,7 +70,7 @@ namespace NKikimr { public: TLoggedRecVMultiPutItem(TLsnSeg seg, bool confirmSyncLogAlso, const TLogoBlobID &id, const TIngress &ingress, TRope &&buffer, std::unique_ptr<TEvVMultiPutItemResult> result, const TActorId &recipient, - ui64 recipientCookie); + ui64 recipientCookie, NWilson::TTraceId traceId); void Replay(THull &hull, const TActorContext &ctx) override; private: @@ -78,6 +80,7 @@ namespace NKikimr { std::unique_ptr<TEvVMultiPutItemResult> Result; TActorId Recipient; ui64 RecipientCookie; + NWilson::TSpan Span; }; /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -92,6 +95,7 @@ namespace NKikimr { private: const TActorId HugeKeeperId; TEvHullLogHugeBlob::TPtr Ev; + NWilson::TSpan Span; }; /////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp index fbe53c19680..b9e74248389 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp @@ -3,6 +3,8 @@ #include <ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_hullsatisfactionrank.h> #include <ydb/core/blobstorage/vdisk/hullop/blobstorage_hull.h> #include <ydb/core/util/queue_inplace.h> +#include <ydb/core/base/wilson.h> +#include <library/cpp/actors/wilson/wilson_span.h> namespace NKikimr { @@ -12,13 +14,17 @@ namespace NKikimr { class TEmergencyQueue { struct TItem { std::unique_ptr<IEventHandle> Ev; + NWilson::TSpan Span; TItem() = default; template<typename T> TItem(TAutoPtr<TEventHandle<T>> ev) : Ev(ev.Release()) - {} + , Span(TWilson::VDiskInternals, std::move(Ev->TraceId), "VDisk.Skeleton.EmergencyQueue") + { + Ev->TraceId = Span.GetTraceId(); + } }; // emergency queue of 'put' events @@ -95,6 +101,7 @@ namespace NKikimr { auto item = Queue.Head(); Y_VERIFY(item); TAutoPtr<IEventHandle> ev = item->Ev.release(); + item->Span.EndOk(); Queue.Pop(); switch (ev->GetTypeRewrite()) { case TEvBlobStorage::EvVMovedPatch: { diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp index 34bc4abbc81..3ac5c446dd1 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmovedpatch_actor.cpp @@ -30,7 +30,6 @@ namespace NKikimr { TOutOfSpaceStatus OOSStatus; - NWilson::TTraceId TraceId; NLWTrace::TOrbit Orbit; const ui64 IncarnationGuid; @@ -86,8 +85,7 @@ namespace NKikimr { TInstant now = TAppData::TimeProvider->Now(); auto vMovedPatchResult = std::make_unique<TEvBlobStorage::TEvVMovedPatchResult>(status, OriginalId, PatchedId, vdisk, cookie, OOSStatus, now, Event->Get()->GetCachedByteSize(), &record, - SkeletonFrontIDPtr, MovedPatchResMsgsPtr, nullptr, std::move(TraceId), IncarnationGuid, - ErrorReason); + SkeletonFrontIDPtr, MovedPatchResMsgsPtr, nullptr, IncarnationGuid, ErrorReason); vMovedPatchResult->Orbit = std::move(Orbit); if (status == NKikimrProto::ERROR) { @@ -112,7 +110,6 @@ namespace NKikimr { void Handle(TEvBlobStorage::TEvGetResult::TPtr &ev, const TActorContext &ctx) { TEvBlobStorage::TEvGetResult *result = ev->Get(); Orbit = std::move(result->Orbit); - TraceId = std::move(ev->TraceId); ui32 patchedIdHash = PatchedId.Hash(); @@ -150,13 +147,12 @@ namespace NKikimr { NKikimrBlobStorage::UserData, TEvBlobStorage::TEvPut::TacticDefault); put->Orbit = std::move(Orbit); - SendToBSProxy(SelfId(), PatchedGroupId, put.release(), OriginalId.Hash(), std::move(Event->TraceId)); + SendToBSProxy(SelfId(), PatchedGroupId, put.release(), OriginalId.Hash()); } void Handle(TEvBlobStorage::TEvPutResult::TPtr &ev, const TActorContext &ctx) { TEvBlobStorage::TEvPutResult *result = ev->Get(); Orbit = std::move(result->Orbit); - TraceId = std::move(ev->TraceId); ui32 originalIdHash = OriginalId.Hash(); @@ -182,7 +178,7 @@ namespace NKikimr { OriginalId.BlobSize(), deadline, NKikimrBlobStorage::AsyncRead); get->Orbit = std::move(Event->Get()->Orbit); - SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash(), std::move(Event->TraceId)); + SendToBSProxy(SelfId(), OriginalGroupId, get.release(), PatchedId.Hash()); Become(&TThis::StateWait); } diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp index 581103f6024..e34428b8ede 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vmultiput_actor.cpp @@ -80,7 +80,7 @@ namespace NKikimr { const ui64 bufferSizeBytes = Event->Get()->GetBufferBytes(); auto vMultiPutResult = std::make_unique<TEvBlobStorage::TEvVMultiPutResult>(NKikimrProto::OK, vdisk, cookie, now, Event->Get()->GetCachedByteSize(), &vMultiPutRecord, SkeletonFrontIDPtr, MultiPutResMsgsPtr, - nullptr, bufferSizeBytes, std::move(Event->TraceId), IncarnationGuid, TString()); + nullptr, bufferSizeBytes, IncarnationGuid, TString()); for (ui64 idx = 0; idx < Items.size(); ++idx) { TItem &result = Items[idx]; diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp index 4fd96a584ee..f28987129d7 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp @@ -151,8 +151,7 @@ namespace NKikimr::NPrivate { Y_VERIFY(record.HasCookie()); FoundPartsEvent = std::make_unique<TEvBlobStorage::TEvVPatchFoundParts>( NKikimrProto::OK, OriginalBlobId, PatchedBlobId, VDiskId, record.GetCookie(), now, ErrorReason, &record, - SkeletonFrontIDPtr, VPatchFoundPartsMsgsPtr, nullptr, - std::move(ev->TraceId), IncarnationGuid); + SkeletonFrontIDPtr, VPatchFoundPartsMsgsPtr, nullptr, IncarnationGuid); } void Bootstrap() { @@ -425,8 +424,7 @@ namespace NKikimr::NPrivate { ResultEvent = std::make_unique<TEvBlobStorage::TEvVPatchResult>( NKikimrProto::OK, TLogoBlobID(OriginalBlobId, OriginalPartId), TLogoBlobID(PatchedBlobId, PatchedPartId), VDiskId, record.GetCookie(), now, - &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr, - std::move(ev->TraceId), IncarnationGuid); + &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr, IncarnationGuid); Sender = ev->Sender; Cookie = ev->Cookie; SendVPatchResult(NKikimrProto::ERROR); @@ -467,8 +465,7 @@ namespace NKikimr::NPrivate { ResultEvent = std::make_unique<TEvBlobStorage::TEvVPatchResult>( NKikimrProto::OK, originalPartBlobId, patchedPartBlobId, VDiskId, record.GetCookie(), now, - &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr, - std::move(ev->TraceId), IncarnationGuid); + &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr, IncarnationGuid); Sender = ev->Sender; Cookie = ev->Cookie; @@ -526,8 +523,7 @@ namespace NKikimr::NPrivate { NKikimrBlobStorage::TEvVPatchXorDiff &record = ev->Get()->Record; TInstant now = TActivationContext::Now(); auto resultEvent = std::make_unique<TEvBlobStorage::TEvVPatchXorDiffResult>( - NKikimrProto::ERROR, now, &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr, - std::move(ev->TraceId)); + NKikimrProto::ERROR, now, &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr); SendVDiskResponse(TActivationContext::AsActorContext(), ev->Sender, resultEvent.release(), ev->Cookie); } @@ -550,7 +546,7 @@ namespace NKikimr::NPrivate { TInstant now = TActivationContext::Now(); std::unique_ptr<TEvBlobStorage::TEvVPatchXorDiffResult> resultEvent = std::make_unique<TEvBlobStorage::TEvVPatchXorDiffResult>( - NKikimrProto::OK, now, &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr, std::move(ev->TraceId)); + NKikimrProto::OK, now, &record, SkeletonFrontIDPtr, VPatchResMsgsPtr, nullptr); if (!CheckDiff(xorDiffs, "XorDiff from datapart")) { for (auto &[diffs, partId, result, sender, cookie] : ReceivedXorDiffs) { diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp index ca54eeec88c..cb9f9708a11 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor_ut.cpp @@ -253,8 +253,7 @@ namespace NKikimr { UNIT_ASSERT(evVGetRange->Record.HasIndexOnly() && evVGetRange->Record.GetIndexOnly()); std::unique_ptr<TEvBlobStorage::TEvVGetResult> evVGetRangeResult = std::make_unique<TEvBlobStorage::TEvVGetResult>( vGetStatus, testData.VDiskIds[nodeId], testData.Now, evVGetRange->GetCachedByteSize(), &evVGetRange->Record, - nullptr, nullptr, nullptr, std::move(handle->TraceId), evVGetRange->Record.GetCookie(), - handle->GetChannel(), 0); + nullptr, nullptr, nullptr, evVGetRange->Record.GetCookie(), handle->GetChannel(), 0); evVGetRangeResult->AddResult(NKikimrProto::OK, TLogoBlobID(testData.OriginalBlobId, 0)); for (ui8 partId : foundParts) { @@ -407,8 +406,7 @@ namespace NKikimr { std::unique_ptr<TEvBlobStorage::TEvVGetResult> evVGetResult = std::make_unique<TEvBlobStorage::TEvVGetResult>( vGetStatus, testData.VDiskIds[nodeId], testData.Now, evVGet->GetCachedByteSize(), &evVGet->Record, - nullptr, nullptr, nullptr, std::move(vGetHandle->TraceId), evVGet->Record.GetCookie(), - vGetHandle->GetChannel(), 0); + nullptr, nullptr, nullptr, evVGet->Record.GetCookie(), vGetHandle->GetChannel(), 0); evVGetResult->AddResult(NKikimrProto::OK, blob.BlobId, 0, blob.Buffer.data(), blob.Buffer.size()); std::unique_ptr<IEventHandle> handle = std::make_unique<IEventHandle>(vPatchActorId, edgeActor, evVGetResult.release()); @@ -437,8 +435,7 @@ namespace NKikimr { TOutOfSpaceStatus oos = TOutOfSpaceStatus(testData.StatusFlags, testData.ApproximateFreeSpaceShare); std::unique_ptr<TEvBlobStorage::TEvVPutResult> vPutResult = std::make_unique<TEvBlobStorage::TEvVPutResult>( vPutStatus, blobId, testData.VDiskIds[nodeId], &cookie, oos, testData.Now, - 0, &record, nullptr, nullptr, nullptr, vPut->GetBufferBytes(), std::move(handle->TraceId), - 0, ""); + 0, &record, nullptr, nullptr, nullptr, vPut->GetBufferBytes(), 0, ""); handle = MakeHolder<IEventHandle>(vPatchActorId, edgeActor, vPutResult.release()); runtime.Send(handle.Release()); @@ -773,7 +770,7 @@ namespace NKikimr { TActorId patchActor = testData.VPatchActorIds[patchedPartId - 1]; auto handle2 = std::make_unique<IEventHandle>(patchActor, edgeActor, handle->Release().Release(), handle->Flags, - handle->Cookie, nullptr, std::move(handle->TraceId)); + handle->Cookie, nullptr); testData.Runtime.Send(handle2.release()); } } diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer.cpp b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer.cpp index bf0179b4ed9..7cbea4a8fbc 100644 --- a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer.cpp +++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer.cpp @@ -364,7 +364,7 @@ namespace NKikimr { data.Info = info; // create reply auto result = std::make_unique<TEvBlobStorage::TEvVSyncGuidResult>(NKikimrProto::OK, selfVDisk, - TAppData::TimeProvider->Now(), nullptr, nullptr, std::move(ev->TraceId), ev->GetChannel()); + TAppData::TimeProvider->Now(), nullptr, nullptr, ev->GetChannel()); // put reply into the queue and wait until it would be committed ui64 seqNum = DelayedQueue.WriteRequest(ev->Sender, std::move(result)); // commit @@ -378,8 +378,7 @@ namespace NKikimr { auto guid = data.GetGuid(); // create reply auto result = std::make_unique<TEvBlobStorage::TEvVSyncGuidResult>(NKikimrProto::OK, selfVDisk, - TAppData::TimeProvider->Now(), guid, state, nullptr, nullptr, std::move(ev->TraceId), - ev->GetChannel()); + TAppData::TimeProvider->Now(), guid, state, nullptr, nullptr, ev->GetChannel()); // put reply into the queue and wait until all required writes are committed DelayedQueue.ReadRequest(ctx, ev->Sender, std::move(result)); } diff --git a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h index bf26350068b..1bb55b311f1 100644 --- a/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h +++ b/ydb/core/blobstorage/vdisk/syncer/blobstorage_syncer_localwriter.h @@ -57,9 +57,8 @@ namespace NKikimr { TEvLocalSyncDataResult(NKikimrProto::EReplyStatus status, const TInstant &now, ::NMonitoring::TDynamicCounters::TCounterPtr counterPtr, - NVDiskMon::TLtcHistoPtr histoPtr, NWilson::TTraceId traceId) - : TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr, histoPtr, - std::move(traceId)) + NVDiskMon::TLtcHistoPtr histoPtr) + : TEvVResultBase(now, TInterconnectChannels::IC_BLOBSTORAGE_SMALL_MSG, counterPtr, histoPtr) , Status(status) {} }; diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp index 9b7dd49ebb9..868d870ac1a 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclog.cpp @@ -140,8 +140,7 @@ namespace NKikimr { auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(NKikimrProto::RACE, SelfVDiskId, TSyncState(), true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now, - SlCtx->CountersMonGroup.VDiskCheckFailedPtr(), nullptr, std::move(ev->TraceId), - ev->GetChannel()); + SlCtx->CountersMonGroup.VDiskCheckFailedPtr(), nullptr, ev->GetChannel()); SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie); return; } @@ -160,7 +159,7 @@ namespace NKikimr { auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(NKikimrProto::BLOCKED, SelfVDiskId, TSyncState(), true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now, - SlCtx->CountersMonGroup.DiskLockedPtr(), nullptr, std::move(ev->TraceId), ev->GetChannel()); + SlCtx->CountersMonGroup.DiskLockedPtr(), nullptr, ev->GetChannel()); SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie); return; } @@ -180,7 +179,7 @@ namespace NKikimr { TSyncState syncState(VDiskIncarnationGuid, GetDbBirthLsn()); auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(status, SelfVDiskId, syncState, true, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), now, - SlCtx->CountersMonGroup.UnequalGuidPtr(), nullptr, std::move(ev->TraceId), ev->GetChannel()); + SlCtx->CountersMonGroup.UnequalGuidPtr(), nullptr, ev->GetChannel()); SendVDiskResponse(ctx, ev->Sender, result.release(), ev->Cookie); return; } diff --git a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp index 6cbe05c4ec8..5ce14ae5849 100644 --- a/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp +++ b/ydb/core/blobstorage/vdisk/synclog/blobstorage_synclogreader.cpp @@ -167,7 +167,7 @@ namespace NKikimr { auto result = std::make_unique<TEvBlobStorage::TEvVSyncResult>(status, SelfVDiskId, TSyncState(VDiskIncarnationGuid, lsn), finished, SlCtx->VCtx->GetOutOfSpaceState().GetLocalStatusFlags(), - Now, SlCtx->IFaceMonGroup.SyncReadResMsgsPtr(), nullptr, std::move(Ev->TraceId), Ev->GetChannel()); + Now, SlCtx->IFaceMonGroup.SyncReadResMsgsPtr(), nullptr, Ev->GetChannel()); if (DiskReads) { NKikimrBlobStorage::TEvVSyncResult::TStat *stat = nullptr; stat = result->Record.MutableStat(); diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto index 110e5dbe088..ad4ed0a2691 100644 --- a/ydb/core/protos/blobstorage.proto +++ b/ydb/core/protos/blobstorage.proto @@ -403,6 +403,8 @@ message TVMultiPutItem { optional uint64 Cookie = 4; repeated TEvVPut.TExtraBlockCheck ExtraBlockChecks = 5; + + optional NActorsProto.TTraceId TraceId = 6; } message TEvVMultiPut { diff --git a/ydb/core/yq/libs/audit/CMakeLists.txt b/ydb/core/yq/libs/audit/CMakeLists.txt index 954f1193322..9bcd97bdd63 100644 --- a/ydb/core/yq/libs/audit/CMakeLists.txt +++ b/ydb/core/yq/libs/audit/CMakeLists.txt @@ -11,6 +11,7 @@ add_library(yq-libs-audit) target_link_libraries(yq-libs-audit PUBLIC contrib-libs-cxxsupp yutil + cpp-actors-core ) target_sources(yq-libs-audit PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/audit/yq_audit_service.cpp |