aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkruall <kruall@ydb.tech>2023-01-19 14:07:16 +0300
committerkruall <kruall@ydb.tech>2023-01-19 14:07:16 +0300
commit29ff5da49e72743e9b37dbbc20ebf791999301f8 (patch)
tree3f1d618c7cc747ea4764cddf85cd0e85309aa168
parentf93954732305ff52058c5ea612f9cd53de09524a (diff)
downloadydb-29ff5da49e72743e9b37dbbc20ebf791999301f8.tar.gz
Add SkeletonTracing,
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h48
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp155
2 files changed, 188 insertions, 15 deletions
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index f5521c25bea..ddd932d94d4 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -103,6 +103,14 @@ namespace NKikimr {
}
}
+ EQueueClientType GetType() const {
+ return Type;
+ }
+
+ ui64 GetIdentifier() const {
+ return Identifier;
+ }
+
void Serialize(NKikimrBlobStorage::TMsgQoS *msgQoS) const {
switch (Type) {
case EQueueClientType::None:
@@ -275,6 +283,42 @@ namespace NKikimr {
} // NBackpressure
+ struct TVDiskSkeletonTrace {
+ static constexpr ui32 BufferSize = 32;
+ const char * Marks[BufferSize];
+ ui32 MarkCount = 0;
+ std::shared_ptr<TVDiskSkeletonTrace> AdditionalTrace;
+
+ TVDiskSkeletonTrace() = default;
+
+ void Clear() {
+ MarkCount = 0;
+ AdditionalTrace.reset();
+ }
+
+ void AddMark(const char * const mark) {
+ if (MarkCount < BufferSize) {
+ Marks[MarkCount++] = mark;
+ }
+ }
+
+ TString ToString() const {
+ TStringBuilder msg;
+ msg << "[";
+ for (ui32 idx = 0; idx < MarkCount; ++idx) {
+ if (idx) {
+ msg << ',';
+ }
+ msg << '"' << Marks[idx] << '"';
+ }
+ msg << "]";
+ if (AdditionalTrace) {
+ msg << '+' << AdditionalTrace->ToString();
+ }
+ return msg;
+ }
+ };
+
struct TVMsgContext {
const NBackpressure::TQueueClientId ClientId;
const ui32 RecByteSize = 0;
@@ -1423,6 +1467,7 @@ namespace NKikimr {
, TEventWithRelevanceTracker
{
mutable NLWTrace::TOrbit Orbit;
+ TVDiskSkeletonTrace *VDiskSkeletonTrace = nullptr;
TEvVSpecialPatchBase() = default;
@@ -1816,6 +1861,7 @@ namespace NKikimr {
, TEventWithRelevanceTracker
{
mutable NLWTrace::TOrbit Orbit;
+ TVDiskSkeletonTrace *VDiskSkeletonTrace = nullptr;
TEvVPatchStart() = default;
@@ -1901,6 +1947,7 @@ namespace NKikimr {
, TEventWithRelevanceTracker
{
mutable NLWTrace::TOrbit Orbit;
+ TVDiskSkeletonTrace *VDiskSkeletonTrace = nullptr;
TEvVPatchDiff() = default;
@@ -1975,6 +2022,7 @@ namespace NKikimr {
, TEventWithRelevanceTracker
{
mutable NLWTrace::TOrbit Orbit;
+ TVDiskSkeletonTrace *VDiskSkeletonTrace = nullptr;
TEvVPatchXorDiff() = default;
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
index a9d5dd163da..15864a1231b 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
@@ -18,6 +18,7 @@
#include <ydb/core/blobstorage/backpressure/queue_backpressure_server.h>
#include <ydb/core/util/queue_inplace.h>
+#include <ydb/core/util/stlog.h>
#include <ydb/core/base/counters.h>
#include <ydb/core/base/wilson.h>
#include <ydb/core/node_whiteboard/node_whiteboard.h>
@@ -101,12 +102,15 @@ namespace NKikimr {
NBackpressure::TQueueClientId ClientId;
TActorId ActorId;
NWilson::TSpan Span;
+ std::unique_ptr<TVDiskSkeletonTrace> Trace;
+ ui64 Cookie;
TRecord() = default;
TRecord(std::unique_ptr<IEventHandle> ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId,
ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId,
- const NBackpressure::TQueueClientId& clientId, TString name)
+ const NBackpressure::TQueueClientId& clientId, TString name, std::unique_ptr<TVDiskSkeletonTrace> &&trace,
+ ui64 cookie)
: Ev(std::move(ev))
, ReceivedTime(now)
, Deadline(deadline)
@@ -117,6 +121,8 @@ namespace NKikimr {
, ClientId(clientId)
, ActorId(Ev->Sender)
, Span(TWilson::VDiskTopLevel, std::move(Ev->TraceId), "VDisk.SkeletonFront.Queue")
+ , Trace(std::move(trace))
+ , Cookie(cookie)
{
Span.Attribute("QueueName", std::move(name));
Ev->TraceId = Span.GetTraceId();
@@ -137,6 +143,19 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////
class TIntQueueClass {
using TQueueType = TQueueInplace<TRecord, 4096>;
+ using TFreeTraceObjectsQueue = TQueueInplace<std::unique_ptr<TVDiskSkeletonTrace>, 4096>;
+
+ struct TMsgInfo {
+ ui64 MsgId;
+ TInstant ReceivedTime;
+ std::unique_ptr<TVDiskSkeletonTrace> VDiskSkeletonTrace;
+
+ TMsgInfo(ui64 msgId, TInstant receivedTime, std::unique_ptr<TVDiskSkeletonTrace> &&trace)
+ : MsgId(msgId)
+ , ReceivedTime(receivedTime)
+ , VDiskSkeletonTrace(std::move(trace))
+ {}
+ };
private:
std::unique_ptr<TQueueType, TQueueType::TCleanDestructor> Queue;
@@ -148,6 +167,8 @@ namespace NKikimr {
ui64 Deadlines;
const ui64 MaxInFlightCount;
const ui64 MaxInFlightCost;
+ THashMap<ui64, TMsgInfo> Msgs;
+ std::unique_ptr<TFreeTraceObjectsQueue, TFreeTraceObjectsQueue::TCleanDestructor> FreeTraceObjects;
public:
const NKikimrBlobStorage::EVDiskInternalQueueId IntQueueId;
const TString Name;
@@ -183,6 +204,7 @@ namespace NKikimr {
, Deadlines(0)
, MaxInFlightCount(maxInFlightCount)
, MaxInFlightCost(maxInFlightCost)
+ , FreeTraceObjects(new TFreeTraceObjectsQueue())
, IntQueueId(intQueueId)
, Name(name)
, SkeletonFrontInFlightCount(skeletonFrontGroup->GetCounter("SkeletonFront/" + Name + "/InFlightCount", false))
@@ -197,12 +219,23 @@ namespace NKikimr {
return Queue->GetSize();
}
+ std::unique_ptr<TVDiskSkeletonTrace> GetFreeTrace() {
+ if (std::unique_ptr<TVDiskSkeletonTrace> *ptr = FreeTraceObjects->Head()) {
+ std::unique_ptr<TVDiskSkeletonTrace> tmp = std::move(*ptr);
+ FreeTraceObjects->Pop();
+ return tmp;
+ } else {
+ return std::make_unique<TVDiskSkeletonTrace>();
+ }
+ }
+
template<typename TFront>
void Enqueue(const TActorContext &ctx, ui32 recByteSize, std::unique_ptr<IEventHandle> converted,
const NBackpressure::TMessageId &msgId, ui64 cost, const TInstant &deadline,
NKikimrBlobStorage::EVDiskQueueId extQueueId, TFront& front,
- const NBackpressure::TQueueClientId& clientId) {
+ const NBackpressure::TQueueClientId& clientId, std::unique_ptr<TVDiskSkeletonTrace> &&trace) {
Y_UNUSED(front);
+ ui64 cookie = converted->Cookie;
if (!Queue->Head() && CanSendToSkeleton(cost)) {
// send to Skeleton for further processing
ctx.ExecutorThread.Send(converted.release());
@@ -213,6 +246,8 @@ namespace NKikimr {
++*SkeletonFrontInFlightCount;
*SkeletonFrontInFlightCost += cost;
*SkeletonFrontInFlightBytes += recByteSize;
+
+ Msgs.emplace(cookie, TMsgInfo(msgId.MsgId, ctx.Now(), std::move(trace)));
} else {
// enqueue
++DelayedCount;
@@ -223,7 +258,7 @@ namespace NKikimr {
TInstant now = TAppData::TimeProvider->Now();
Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
- clientId, Name));
+ clientId, Name, std::move(trace), cookie));
}
}
@@ -270,6 +305,8 @@ namespace NKikimr {
++*SkeletonFrontInFlightCount;
*SkeletonFrontInFlightCost += cost;
*SkeletonFrontInFlightBytes += recByteSize;
+
+ Msgs.emplace(rec->Cookie, TMsgInfo(rec->MsgId.MsgId, ctx.Now(), std::move(rec->Trace)));
}
Queue->Pop();
} else {
@@ -280,7 +317,7 @@ namespace NKikimr {
public:
template <class TFront>
- void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, TFront &front) {
+ void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, TFront &front, ui64 cookie) {
Y_VERIFY(InFlightCount >= 1 && InFlightBytes >= msgCtx.RecByteSize && InFlightCost >= msgCtx.Cost,
"IntQueueId# %s InFlightCount# %" PRIu64 " InFlightBytes# %" PRIu64
" InFlightCost# %" PRIu64 " msgCtx# %s Deadlines# %" PRIu64,
@@ -296,9 +333,40 @@ namespace NKikimr {
*SkeletonFrontInFlightBytes -= msgCtx.RecByteSize;
*SkeletonFrontCostProcessed += msgCtx.Cost;
+ // TODO(kruall): fix it, cookie always must be found
+ auto it = Msgs.find(cookie);
+ if (it != Msgs.end()) {
+ Y_VERIFY_S(it!= Msgs.end(), "cookie# " << cookie);
+ if (it->second.VDiskSkeletonTrace) {
+ it->second.VDiskSkeletonTrace->AdditionalTrace = nullptr;
+ it->second.VDiskSkeletonTrace->MarkCount = 0;
+ FreeTraceObjects->Push(std::move(it->second.VDiskSkeletonTrace));
+ }
+ Msgs.erase(it);
+ }
+
ProcessNext(ctx, front, false);
}
+ bool Sanitize(const TActorContext &ctx, const TString &vDiskLogPrefix) {
+ bool hasError = false;
+ TInstant now = ctx.Now();
+ for (auto &pair : Msgs) {
+ const TMsgInfo &msgInfo = pair.second;
+ TDuration passedTime = now - msgInfo.ReceivedTime;
+ if (passedTime > TDuration::Minutes(5)) {
+ hasError = true;
+ STLOG(PRI_ERROR, NKikimrServices::BS_SKELETON, BSVSF04,
+ vDiskLogPrefix << " passed more than 5 munites for message in the internal queue",
+ (MsgId, msgInfo.MsgId),,
+ (QueueName, Name),
+ (PassedTimeSeconds, passedTime.Seconds()),
+ (Trace, (msgInfo.VDiskSkeletonTrace->ToString() ? msgInfo.VDiskSkeletonTrace->ToString() : "None")));
+ }
+ }
+ return hasError;
+ }
+
// enumeration of parameters we take into account to diagnose overloading
enum EParam {
EInFlightCount,
@@ -406,6 +474,8 @@ namespace NKikimr {
::NMonitoring::TDynamicCounters::TCounterPtr SkeletonFrontOverflow;
::NMonitoring::TDynamicCounters::TCounterPtr SkeletonFrontIncorrectMsgId;
+ ui64 NextInternalId = 0;
+ THashMap<ui64, ui64> InternalIdToCookie;
void NotifyOtherClients(const TActorContext &ctx, const TFeedback &feedback) {
for (const auto &x : feedback.second) {
@@ -478,6 +548,11 @@ namespace NKikimr {
}
front.ReplyFunc(std::exchange(converted, nullptr), ctx, status, errorReason, now, feedback.first);
}
+
+ if (converted) {
+ ui64 id = ++NextInternalId;
+ InternalIdToCookie[id] = std::exchange(const_cast<ui64&>(converted->Cookie), id);
+ }
return converted;
}
@@ -485,6 +560,7 @@ namespace NKikimr {
void DeadlineHappened(const TActorContext &ctx, TRecord *rec, TInstant now, TFront &front) {
++*SkeletonFrontDeadline;
auto feedback = QueueBackpressure->Processed(rec->ActorId, rec->MsgId, rec->Cost, now);
+ ReturnCookie(rec->Ev, false);
front.ReplyFunc(std::move(rec->Ev), ctx, NKikimrProto::DEADLINE, "deadline exceeded", now, feedback.first);
NotifyOtherClients(ctx, feedback);
}
@@ -492,6 +568,7 @@ namespace NKikimr {
template <class TFront>
void DroppedWithError(const TActorContext &ctx, TRecord *rec, TInstant now, TFront &front) {
auto feedback = QueueBackpressure->Processed(rec->ActorId, rec->MsgId, rec->Cost, now);
+ ReturnCookie(rec->Ev, false);
front.ReplyFunc(std::move(rec->Ev), ctx, NKikimrProto::ERROR, "error state", now, feedback.first);
}
@@ -507,7 +584,19 @@ namespace NKikimr {
QueueBackpressure->ForEachWindow(callback);
}
- void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, IEventBase *ev) {
+ void ReturnCookie(std::unique_ptr<IEventHandle> &evHandle, bool required) {
+ if (auto it = InternalIdToCookie.find(evHandle->Cookie); it != InternalIdToCookie.end()) {
+ const_cast<ui64&>(evHandle->Cookie) = it->second;
+ InternalIdToCookie.erase(it);
+ } else {
+ Y_VERIFY(!required, "Internal error, it can't find internal id");
+ }
+ }
+
+ void Completed(const TActorContext &ctx, const TVMsgContext &msgCtx, std::unique_ptr<IEventHandle> &evHandle) {
+ IEventBase *ev = evHandle->GetBase();
+ ReturnCookie(evHandle, true);
+
TInstant now = TAppData::TimeProvider->Now();
Y_VERIFY(msgCtx.ActorId);
auto feedback = QueueBackpressure->Processed(msgCtx.ActorId, msgCtx.MsgId, msgCtx.Cost, now);
@@ -615,6 +704,8 @@ namespace NKikimr {
NMonGroup::TVDiskStateGroup VDiskMonGroup;
TVDiskIncarnationGuid VDiskIncarnationGuid;
bool HasUnreadableBlobs = false;
+ TInstant LastSanitizeTime = TInstant::Zero();
+ TInstant LastSanitizeWithErrorTime = TInstant::Zero();
////////////////////////////////////////////////////////////////////////
// NOTIFICATIONS
@@ -718,6 +809,31 @@ namespace NKikimr {
Become(&TThis::StateLocalRecoveryInProgress);
}
+ bool Sanitize(const TActorContext &ctx) {
+ TInstant now = ctx.Now();
+ if (now - LastSanitizeTime < TDuration::Minutes(5)) {
+ return false;
+ }
+ //if (now - LastSanitizeWithErrorTime > TDuration::Hours(1)) {
+ // return false;
+ //}
+ LastSanitizeTime = now;
+ auto intQueues = {
+ IntQueueAsyncGets.get(), IntQueueFastGets.get(),
+ IntQueueDiscover.get(), IntQueueLowGets.get(),
+ IntQueueLogPuts.get(), IntQueueHugePutsForeground.get(),
+ IntQueueHugePutsBackground.get()
+ };
+ bool hasError = false;
+ for (auto intQueue : intQueues) {
+ hasError |= intQueue->Sanitize(ctx, VCtx->VDiskLogPrefix);
+ }
+ if (hasError) {
+ LastSanitizeWithErrorTime = now;
+ }
+ return hasError;
+ }
+
void Handle(TEvFrontRecoveryStatus::TPtr &ev, const TActorContext &ctx) {
const auto &msg = ev->Get();
VDiskIncarnationGuid = msg->VDiskIncarnationGuid;
@@ -1024,6 +1140,12 @@ namespace NKikimr {
}
}
+ template <typename TEv>
+ static constexpr bool IsPatchEvent = std::is_same_v<TEv, TEvBlobStorage::TEvVMovedPatch>
+ || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchStart>
+ || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchDiff>
+ || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchXorDiff>;
+
template <class TEventPtr>
void HandleRequestWithQoS(const TActorContext &ctx, TEventPtr &ev, const char *msgName, ui64 cost,
TIntQueueClass &intQueue) {
@@ -1057,9 +1179,17 @@ namespace NKikimr {
std::unique_ptr<IEventHandle> event = extQueue.Enqueue(ctx, std::unique_ptr<IEventHandle>(
ev->Forward(SkeletonId).Release()), msgId, cost, *this, clientId);
if (event) {
+ std::unique_ptr<TVDiskSkeletonTrace> trace;
+ if constexpr (IsPatchEvent<std::decay_t<decltype(*ev->Get())>>) {
+ trace = intQueue.GetFreeTrace();
+ event->Get<std::decay_t<decltype(*ev->Get())>>()->VDiskSkeletonTrace = trace.get();
+ }
// good, enqueue it in intQueue
- intQueue.Enqueue(ctx, recByteSize, std::move(event), msgId, cost, deadline, extQueueId, *this, clientId);
+ intQueue.Enqueue(ctx, recByteSize, std::move(event), msgId, cost,
+ deadline, extQueueId, *this, clientId, std::move(trace));
}
+
+ Sanitize(ctx);
}
bool Compatible(NKikimrBlobStorage::EVDiskQueueId extId, NKikimrBlobStorage::EVDiskInternalQueueId intId) {
@@ -1387,10 +1517,11 @@ namespace NKikimr {
void Handle(TEvVDiskRequestCompleted::TPtr &ev, const TActorContext &ctx) {
const TVMsgContext &msgCtx = ev->Get()->Ctx;
std::unique_ptr<IEventHandle> event = std::move(ev->Get()->Event);
+ ui64 id = event->Cookie;
TExtQueueClass &extQueue = GetExtQueue(msgCtx.ExtQueueId);
- extQueue.Completed(ctx, msgCtx, event->GetBase());
+ extQueue.Completed(ctx, msgCtx, event);
TIntQueueClass &intQueue = GetIntQueue(msgCtx.IntQueueId);
- intQueue.Completed(ctx, msgCtx, *this);
+ intQueue.Completed(ctx, msgCtx, *this, id);
TActivationContext::Send(event.release());
}
@@ -1703,13 +1834,7 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Events checking: RACE and access control
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- template <typename TEv>
- static constexpr bool IsPatchEvent = std::is_same_v<TEv, TEvBlobStorage::TEvVMovedPatch>
- || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchStart>
- || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchDiff>
- || std::is_same_v<TEv, TEvBlobStorage::TEvVPatchXorDiff>;
+ ///////////////////////////////////////////////////////////////////////////////////////////////////////////////
template <typename TEv>
static constexpr bool IsWithoutQoS = std::is_same_v<TEv, TEvBlobStorage::TEvVStatus>