diff options
author | kruall <kruall@ydb.tech> | 2023-01-19 14:07:16 +0300 |
---|---|---|
committer | kruall <kruall@ydb.tech> | 2023-01-19 14:07:16 +0300 |
commit | 29ff5da49e72743e9b37dbbc20ebf791999301f8 (patch) | |
tree | 3f1d618c7cc747ea4764cddf85cd0e85309aa168 | |
parent | f93954732305ff52058c5ea612f9cd53de09524a (diff) | |
download | ydb-29ff5da49e72743e9b37dbbc20ebf791999301f8.tar.gz |
Add SkeletonTracing,
-rw-r--r-- | ydb/core/blobstorage/vdisk/common/vdisk_events.h | 48 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp | 155 |
2 files changed, 188 insertions, 15 deletions
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index f5521c25bea..ddd932d94d4 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -103,6 +103,14 @@ namespace NKikimr { } } + EQueueClientType GetType() const { + return Type; + } + + ui64 GetIdentifier() const { + return Identifier; + } + void Serialize(NKikimrBlobStorage::TMsgQoS *msgQoS) const { switch (Type) { case EQueueClientType::None: @@ -275,6 +283,42 @@ namespace NKikimr { } // NBackpressure + struct TVDiskSkeletonTrace { + static constexpr ui32 BufferSize = 32; + const char * Marks[BufferSize]; + ui32 MarkCount = 0; + std::shared_ptr<TVDiskSkeletonTrace> AdditionalTrace; + + TVDiskSkeletonTrace() = default; + + void Clear() { + MarkCount = 0; + AdditionalTrace.reset(); + } + + void AddMark(const char * const mark) { + if (MarkCount < BufferSize) { + Marks[MarkCount++] = mark; + } + } + + TString ToString() const { + TStringBuilder msg; + msg << "["; + for (ui32 idx = 0; idx < MarkCount; ++idx) { + if (idx) { + msg << ','; + } + msg << '"' << Marks[idx] << '"'; + } + msg << "]"; + if (AdditionalTrace) { + msg << '+' << AdditionalTrace->ToString(); + } + return msg; + } + }; + struct TVMsgContext { const NBackpressure::TQueueClientId ClientId; const ui32 RecByteSize = 0; @@ -1423,6 +1467,7 @@ namespace NKikimr { , TEventWithRelevanceTracker { mutable NLWTrace::TOrbit Orbit; + TVDiskSkeletonTrace *VDiskSkeletonTrace = nullptr; TEvVSpecialPatchBase() = default; @@ -1816,6 +1861,7 @@ namespace NKikimr { , TEventWithRelevanceTracker { mutable NLWTrace::TOrbit Orbit; + TVDiskSkeletonTrace *VDiskSkeletonTrace = nullptr; TEvVPatchStart() = default; @@ -1901,6 +1947,7 @@ namespace NKikimr { , TEventWithRelevanceTracker { mutable NLWTrace::TOrbit Orbit; + TVDiskSkeletonTrace *VDiskSkeletonTrace = nullptr; TEvVPatchDiff() = default; @@ -1975,6 +2022,7 @@ namespace NKikimr { , TEventWithRelevanceTracker { mutable NLWTrace::TOrbit Orbit; + TVDiskSkeletonTrace *VDiskSkeletonTrace = nullptr; TEvVPatchXorDiff() = default; diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp index a9d5dd163da..15864a1231b 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp @@ -18,6 +18,7 @@ #include <ydb/core/blobstorage/backpressure/queue_backpressure_server.h> #include <ydb/core/util/queue_inplace.h> +#include <ydb/core/util/stlog.h> #include <ydb/core/base/counters.h> #include <ydb/core/base/wilson.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> @@ -101,12 +102,15 @@ namespace NKikimr { NBackpressure::TQueueClientId ClientId; TActorId ActorId; NWilson::TSpan Span; + std::unique_ptr<TVDiskSkeletonTrace> Trace; + ui64 Cookie; TRecord() = default; TRecord(std::unique_ptr<IEventHandle> ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId, ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId, - const NBackpressure::TQueueClientId& clientId, TString name) + const NBackpressure::TQueueClientId& clientId, TString name, std::unique_ptr<TVDiskSkeletonTrace> &&trace, + ui64 cookie) : Ev(std::move(ev)) , ReceivedTime(now) , Deadline(deadline) @@ -117,6 +121,8 @@ namespace NKikimr { , ClientId(clientId) , ActorId(Ev->Sender) , Span(TWilson::VDiskTopLevel, std::move(Ev->TraceId), "VDisk.SkeletonFront.Queue") + , Trace(std::move(trace)) + , Cookie(cookie) { Span.Attribute("QueueName", std::move(name)); Ev->TraceId = Span.GetTraceId(); @@ -137,6 +143,19 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////// class TIntQueueClass { using TQueueType = TQueueInplace<TRecord, 4096>; + using TFreeTraceObjectsQueue = TQueueInplace<std::unique_ptr<TVDiskSkeletonTrace>, 4096>; + + struct TMsgInfo { + ui64 MsgId; + TInstant ReceivedTime; + std::unique_ptr<TVDiskSkeletonTrace> VDiskSkeletonTrace; + + TMsgInfo(ui64 msgId, TInstant receivedTime, std::unique_ptr<TVDiskSkeletonTrace> &&trace) + : MsgId(msgId) + , ReceivedTime(receivedTime) + , VDiskSkeletonTrace(std::move(trace)) + {} + }; private: std::unique_ptr<TQueueType, TQueueType::TCleanDestructor> Queue; @@ -148,6 +167,8 @@ namespace NKikimr { ui64 Deadlines; const ui64 MaxInFlightCount; const ui64 MaxInFlightCost; + THashMap<ui64, TMsgInfo> Msgs; + std::unique_ptr<TFreeTraceObjectsQueue, TFreeTraceObjectsQueue::TCleanDestructor> FreeTraceObjects; public: const NKikimrBlobStorage::EVDiskInternalQueueId IntQueueId; const TString Name; @@ -183,6 +204,7 @@ namespace NKikimr { , Deadlines(0) , MaxInFlightCount(maxInFlightCount) , MaxInFlightCost(maxInFlightCost) + , FreeTraceObjects(new TFreeTraceObjectsQueue()) , IntQueueId(intQueueId) , Name(name) , SkeletonFrontInFlightCount(skeletonFrontGroup->GetCounter("SkeletonFront/" + Name + "/InFlightCount", false)) @@ -197,12 +219,23 @@ namespace NKikimr { return Queue->GetSize(); } + std::unique_ptr<TVDiskSkeletonTrace> GetFreeTrace() { + if (std::unique_ptr<TVDiskSkeletonTrace> *ptr = FreeTraceObjects->Head()) { + std::unique_ptr<TVDiskSkeletonTrace> tmp = std::move(*ptr); + FreeTraceObjects->Pop(); + return tmp; + } else { + return std::make_unique<TVDiskSkeletonTrace>(); + } + } + template<typename TFront> void Enqueue(const TActorContext &ctx, ui32 recByteSize, std::unique_ptr<IEventHandle> converted, const NBackpressure::TMessageId &msgId, ui64 cost, const TInstant &deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId, TFront& front, - const NBackpressure::TQueueClientId& clientId) { + const NBackpressure::TQueueClientId& clientId, std::unique_ptr<TVDiskSkeletonTrace> &&trace) { Y_UNUSED(front); + ui64 cookie = converted->Cookie; if (!Queue->Head() && CanSendToSkeleton(cost)) { // send to Skeleton for further processing ctx.ExecutorThread.Send(converted.release()); @@ -213,6 +246,8 @@ namespace NKikimr { ++*SkeletonFrontInFlightCount; *SkeletonFrontInFlightCost += cost; *SkeletonFrontInFlightBytes += recByteSize; + + Msgs.emplace(cookie, TMsgInfo(msgId.MsgId, ctx.Now(), std::move(trace))); } else { // enqueue ++DelayedCount; @@ -223,7 +258,7 @@ namespace NKikimr { TInstant now = TAppData::TimeProvider->Now(); Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId, - clientId, Name)); + clientId, Name, std::move(trace), cookie)); } } @@ -270,6 +305,8 @@ namespace NKikimr { ++*SkeletonFrontInFlightCount; *SkeletonFrontInFlightCost += cost; *SkeletonFrontInFlightBytes += recByteSize; + + Msgs.emplace(rec->Cookie, TMsgInfo(rec->MsgId.MsgId, ctx.Now(), std::move(rec->Trace))); } Queue->Pop(); } else { @@ -280,7 +317,7 @@ namespace NKikimr { public: template <class TFront> - void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, TFront &front) { + void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, TFront &front, ui64 cookie) { Y_VERIFY(InFlightCount >= 1 && InFlightBytes >= msgCtx.RecByteSize && InFlightCost >= msgCtx.Cost, "IntQueueId# %s InFlightCount# %" PRIu64 " InFlightBytes# %" PRIu64 " InFlightCost# %" PRIu64 " msgCtx# %s Deadlines# %" PRIu64, @@ -296,9 +333,40 @@ namespace NKikimr { *SkeletonFrontInFlightBytes -= msgCtx.RecByteSize; *SkeletonFrontCostProcessed += msgCtx.Cost; + // TODO(kruall): fix it, cookie always must be found + auto it = Msgs.find(cookie); + if (it != Msgs.end()) { + Y_VERIFY_S(it!= Msgs.end(), "cookie# " << cookie); + if (it->second.VDiskSkeletonTrace) { + it->second.VDiskSkeletonTrace->AdditionalTrace = nullptr; + it->second.VDiskSkeletonTrace->MarkCount = 0; + FreeTraceObjects->Push(std::move(it->second.VDiskSkeletonTrace)); + } + Msgs.erase(it); + } + ProcessNext(ctx, front, false); } + bool Sanitize(const TActorContext &ctx, const TString &vDiskLogPrefix) { + bool hasError = false; + TInstant now = ctx.Now(); + for (auto &pair : Msgs) { + const TMsgInfo &msgInfo = pair.second; + TDuration passedTime = now - msgInfo.ReceivedTime; + if (passedTime > TDuration::Minutes(5)) { + hasError = true; + STLOG(PRI_ERROR, NKikimrServices::BS_SKELETON, BSVSF04, + vDiskLogPrefix << " passed more than 5 munites for message in the internal queue", + (MsgId, msgInfo.MsgId),, + (QueueName, Name), + (PassedTimeSeconds, passedTime.Seconds()), + (Trace, (msgInfo.VDiskSkeletonTrace->ToString() ? msgInfo.VDiskSkeletonTrace->ToString() : "None"))); + } + } + return hasError; + } + // enumeration of parameters we take into account to diagnose overloading enum EParam { EInFlightCount, @@ -406,6 +474,8 @@ namespace NKikimr { ::NMonitoring::TDynamicCounters::TCounterPtr SkeletonFrontOverflow; ::NMonitoring::TDynamicCounters::TCounterPtr SkeletonFrontIncorrectMsgId; + ui64 NextInternalId = 0; + THashMap<ui64, ui64> InternalIdToCookie; void NotifyOtherClients(const TActorContext &ctx, const TFeedback &feedback) { for (const auto &x : feedback.second) { @@ -478,6 +548,11 @@ namespace NKikimr { } front.ReplyFunc(std::exchange(converted, nullptr), ctx, status, errorReason, now, feedback.first); } + + if (converted) { + ui64 id = ++NextInternalId; + InternalIdToCookie[id] = std::exchange(const_cast<ui64&>(converted->Cookie), id); + } return converted; } @@ -485,6 +560,7 @@ namespace NKikimr { void DeadlineHappened(const TActorContext &ctx, TRecord *rec, TInstant now, TFront &front) { ++*SkeletonFrontDeadline; auto feedback = QueueBackpressure->Processed(rec->ActorId, rec->MsgId, rec->Cost, now); + ReturnCookie(rec->Ev, false); front.ReplyFunc(std::move(rec->Ev), ctx, NKikimrProto::DEADLINE, "deadline exceeded", now, feedback.first); NotifyOtherClients(ctx, feedback); } @@ -492,6 +568,7 @@ namespace NKikimr { template <class TFront> void DroppedWithError(const TActorContext &ctx, TRecord *rec, TInstant now, TFront &front) { auto feedback = QueueBackpressure->Processed(rec->ActorId, rec->MsgId, rec->Cost, now); + ReturnCookie(rec->Ev, false); front.ReplyFunc(std::move(rec->Ev), ctx, NKikimrProto::ERROR, "error state", now, feedback.first); } @@ -507,7 +584,19 @@ namespace NKikimr { QueueBackpressure->ForEachWindow(callback); } - void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, IEventBase *ev) { + void ReturnCookie(std::unique_ptr<IEventHandle> &evHandle, bool required) { + if (auto it = InternalIdToCookie.find(evHandle->Cookie); it != InternalIdToCookie.end()) { + const_cast<ui64&>(evHandle->Cookie) = it->second; + InternalIdToCookie.erase(it); + } else { + Y_VERIFY(!required, "Internal error, it can't find internal id"); + } + } + + void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, std::unique_ptr<IEventHandle> &evHandle) { + IEventBase *ev = evHandle->GetBase(); + ReturnCookie(evHandle, true); + TInstant now = TAppData::TimeProvider->Now(); Y_VERIFY(msgCtx.ActorId); auto feedback = QueueBackpressure->Processed(msgCtx.ActorId, msgCtx.MsgId, msgCtx.Cost, now); @@ -615,6 +704,8 @@ namespace NKikimr { NMonGroup::TVDiskStateGroup VDiskMonGroup; TVDiskIncarnationGuid VDiskIncarnationGuid; bool HasUnreadableBlobs = false; + TInstant LastSanitizeTime = TInstant::Zero(); + TInstant LastSanitizeWithErrorTime = TInstant::Zero(); //////////////////////////////////////////////////////////////////////// // NOTIFICATIONS @@ -718,6 +809,31 @@ namespace NKikimr { Become(&TThis::StateLocalRecoveryInProgress); } + bool Sanitize(const TActorContext &ctx) { + TInstant now = ctx.Now(); + if (now - LastSanitizeTime < TDuration::Minutes(5)) { + return false; + } + //if (now - LastSanitizeWithErrorTime > TDuration::Hours(1)) { + // return false; + //} + LastSanitizeTime = now; + auto intQueues = { + IntQueueAsyncGets.get(), IntQueueFastGets.get(), + IntQueueDiscover.get(), IntQueueLowGets.get(), + IntQueueLogPuts.get(), IntQueueHugePutsForeground.get(), + IntQueueHugePutsBackground.get() + }; + bool hasError = false; + for (auto intQueue : intQueues) { + hasError |= intQueue->Sanitize(ctx, VCtx->VDiskLogPrefix); + } + if (hasError) { + LastSanitizeWithErrorTime = now; + } + return hasError; + } + void Handle(TEvFrontRecoveryStatus::TPtr &ev, const TActorContext &ctx) { const auto &msg = ev->Get(); VDiskIncarnationGuid = msg->VDiskIncarnationGuid; @@ -1024,6 +1140,12 @@ namespace NKikimr { } } + template <typename TEv> + static constexpr bool IsPatchEvent = std::is_same_v<TEv, TEvBlobStorage::TEvVMovedPatch> + || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchStart> + || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchDiff> + || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchXorDiff>; + template <class TEventPtr> void HandleRequestWithQoS(const TActorContext &ctx, TEventPtr &ev, const char *msgName, ui64 cost, TIntQueueClass &intQueue) { @@ -1057,9 +1179,17 @@ namespace NKikimr { std::unique_ptr<IEventHandle> event = extQueue.Enqueue(ctx, std::unique_ptr<IEventHandle>( ev->Forward(SkeletonId).Release()), msgId, cost, *this, clientId); if (event) { + std::unique_ptr<TVDiskSkeletonTrace> trace; + if constexpr (IsPatchEvent<std::decay_t<decltype(*ev->Get())>>) { + trace = intQueue.GetFreeTrace(); + event->Get<std::decay_t<decltype(*ev->Get())>>()->VDiskSkeletonTrace = trace.get(); + } // good, enqueue it in intQueue - intQueue.Enqueue(ctx, recByteSize, std::move(event), msgId, cost, deadline, extQueueId, *this, clientId); + intQueue.Enqueue(ctx, recByteSize, std::move(event), msgId, cost, + deadline, extQueueId, *this, clientId, std::move(trace)); } + + Sanitize(ctx); } bool Compatible(NKikimrBlobStorage::EVDiskQueueId extId, NKikimrBlobStorage::EVDiskInternalQueueId intId) { @@ -1387,10 +1517,11 @@ namespace NKikimr { void Handle(TEvVDiskRequestCompleted::TPtr &ev, const TActorContext &ctx) { const TVMsgContext &msgCtx = ev->Get()->Ctx; std::unique_ptr<IEventHandle> event = std::move(ev->Get()->Event); + ui64 id = event->Cookie; TExtQueueClass &extQueue = GetExtQueue(msgCtx.ExtQueueId); - extQueue.Completed(ctx, msgCtx, event->GetBase()); + extQueue.Completed(ctx, msgCtx, event); TIntQueueClass &intQueue = GetIntQueue(msgCtx.IntQueueId); - intQueue.Completed(ctx, msgCtx, *this); + intQueue.Completed(ctx, msgCtx, *this, id); TActivationContext::Send(event.release()); } @@ -1703,13 +1834,7 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Events checking: RACE and access control - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - template <typename TEv> - static constexpr bool IsPatchEvent = std::is_same_v<TEv, TEvBlobStorage::TEvVMovedPatch> - || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchStart> - || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchDiff> - || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchXorDiff>; + /////////////////////////////////////////////////////////////////////////////////////////////////////////////// template <typename TEv> static constexpr bool IsWithoutQoS = std::is_same_v<TEv, TEvBlobStorage::TEvVStatus> |