aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey Belyakov <serg-belyakov@ydb.tech>2024-01-31 11:49:30 +0300
committerGitHub <noreply@github.com>2024-01-31 11:49:30 +0300
commit3b1a363cecc7a179a26519f4c3f3964c43183c2b (patch)
tree3da07d6c9098757c19a2d703b30c841df2e18eb5
parent2e3d80efffa46aa70783fdb1bcedac569156d995 (diff)
downloadydb-3b1a363cecc7a179a26519f4c3f3964c43183c2b.tar.gz
Add light indicator for bursts to BsCostModel, #1336 (#1337)
* Move TLight to core/util, add BurstDetector to TBsCostTracker * Add UT * Count PDisk responses, fix UT build * Update BurstDetector * Fix UT
-rw-r--r--ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h265
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/monitoring.cpp208
-rw-r--r--ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h41
-rw-r--r--ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h2
-rw-r--r--ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp2
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp1
-rw-r--r--ydb/core/util/hp_timer_helpers.h64
-rw-r--r--ydb/core/util/light.h213
9 files changed, 462 insertions, 336 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h
index 621d28c7d7..664451321e 100644
--- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h
+++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h
@@ -4,6 +4,7 @@
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
#include <ydb/core/mon/mon.h>
#include <ydb/core/protos/node_whiteboard.pb.h>
+#include <ydb/core/util/light.h>
#include <library/cpp/bucket_quoter/bucket_quoter.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -14,270 +15,6 @@ namespace NKikimr {
struct TPDiskConfig;
-inline NHPTimer::STime HPNow() {
- NHPTimer::STime ret;
- GetTimeFast(&ret);
- return ret;
-}
-
-inline double HPSecondsFloat(i64 cycles) {
- if (cycles > 0) {
- return double(cycles) / NHPTimer::GetClockRate();
- } else {
- return 0.0;
- }
-}
-
-inline double HPMilliSecondsFloat(i64 cycles) {
- if (cycles > 0) {
- return double(cycles) * 1000.0 / NHPTimer::GetClockRate();
- } else {
- return 0;
- }
-}
-
-inline ui64 HPMilliSeconds(i64 cycles) {
- return (ui64)HPMilliSecondsFloat(cycles);
-}
-
-inline ui64 HPMicroSecondsFloat(i64 cycles) {
- if (cycles > 0) {
- return double(cycles) * 1000000.0 / NHPTimer::GetClockRate();
- } else {
- return 0;
- }
-}
-
-inline ui64 HPMicroSeconds(i64 cycles) {
- return (ui64)HPMicroSecondsFloat(cycles);
-}
-
-inline ui64 HPNanoSeconds(i64 cycles) {
- if (cycles > 0) {
- return ui64(double(cycles) * 1000000000.0 / NHPTimer::GetClockRate());
- } else {
- return 0;
- }
-}
-
-inline ui64 HPCyclesNs(ui64 ns) {
- return ui64(NHPTimer::GetClockRate() * double(ns) / 1000000000.0);
-}
-
-inline ui64 HPCyclesUs(ui64 us) {
- return ui64(NHPTimer::GetClockRate() * double(us) / 1000000.0);
-}
-
-inline ui64 HPCyclesMs(ui64 ms) {
- return ui64(NHPTimer::GetClockRate() * double(ms) / 1000.0);
-}
-
-class TLightBase {
-protected:
- TString Name;
- ::NMonitoring::TDynamicCounters::TCounterPtr State; // Current state (0=OFF=green, 1=ON=red)
- ::NMonitoring::TDynamicCounters::TCounterPtr Count; // Number of switches to ON state
- ::NMonitoring::TDynamicCounters::TCounterPtr RedMs; // Time elapsed in ON state
- ::NMonitoring::TDynamicCounters::TCounterPtr GreenMs; // Time elapsed in OFF state
-private:
- ui64 RedCycles = 0;
- ui64 GreenCycles = 0;
- NHPTimer::STime AdvancedTill = 0;
- NHPTimer::STime LastNow = 0;
- ui64 UpdateThreshold = 0;
-public:
- void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& name) {
- Name = name;
- State = counters->GetCounter(name + "_state");
- Count = counters->GetCounter(name + "_count", true);
- RedMs = counters->GetCounter(name + "_redMs", true);
- GreenMs = counters->GetCounter(name + "_greenMs", true);
- UpdateThreshold = HPCyclesMs(100);
- AdvancedTill = Now();
- }
-
- void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& countName,
- const TString& redMsName,const TString& greenMsName) {
- Count = counters->GetCounter(countName, true);
- RedMs = counters->GetCounter(redMsName, true);
- GreenMs = counters->GetCounter(greenMsName, true);
- UpdateThreshold = HPCyclesMs(100);
- AdvancedTill = Now();
- }
-
- ui64 GetCount() const {
- return *Count;
- }
-
- ui64 GetRedMs() const {
- return *RedMs;
- }
-
- ui64 GetGreenMs() const {
- return *GreenMs;
- }
-protected:
- void Modify(bool state, bool prevState) {
- if (state && !prevState) { // Switched to ON state
- if (State) {
- *State = true;
- }
- (*Count)++;
- return;
- }
- if (!state && prevState) { // Switched to OFF state
- if (State) {
- *State = false;
- }
- return;
- }
- }
-
- void Advance(bool state, NHPTimer::STime now) {
- if (now == AdvancedTill) {
- return;
- }
- Elapsed(state, now - AdvancedTill);
- if (RedCycles > UpdateThreshold) {
- *RedMs += CutMs(RedCycles);
- }
- if (GreenCycles > UpdateThreshold) {
- *GreenMs += CutMs(GreenCycles);
- }
- AdvancedTill = now;
- }
-
- NHPTimer::STime Now() {
- // Avoid time going backwards
- NHPTimer::STime now = HPNow();
- if (now < LastNow) {
- now = LastNow;
- }
- LastNow = now;
- return now;
- }
-private:
- void Elapsed(bool state, ui64 cycles) {
- if (state) {
- RedCycles += cycles;
- } else {
- GreenCycles += cycles;
- }
- }
-
- ui64 CutMs(ui64& src) {
- ui64 ms = HPMilliSeconds(src);
- ui64 cycles = HPCyclesMs(ms);
- src -= cycles;
- return ms;
- }
-};
-
-// Thread-safe light
-class TLight : public TLightBase {
-private:
- struct TItem {
- bool State;
- bool Filled;
- TItem(bool state = false, bool filled = false)
- : State(state)
- , Filled(filled)
- {}
- };
-
- // Cyclic buffer to enforce event ordering by seqno
- TSpinLock Lock;
- size_t HeadIdx = 0; // Index of current state
- size_t FilledCount = 0;
- ui16 Seqno = 0; // Current seqno
- TStackVec<TItem, 32> Data; // In theory should have not more than thread count items
-public:
- TLight() {
- InitData();
- }
-
- void Set(bool state, ui16 seqno) {
- TGuard<TSpinLock> g(Lock);
- Push(state, seqno);
- bool prevState;
- // Note that 'state' variable is being reused
- NHPTimer::STime now = Now();
- while (Pop(state, prevState)) {
- Modify(state, prevState);
- Advance(prevState, now);
- }
- }
-
- void Update() {
- TGuard<TSpinLock> g(Lock);
- Advance(Data[HeadIdx].State, Now());
- }
-
-private:
- void InitData(bool state = false, bool filled = false) {
- Data.clear();
- Data.emplace_back(state, filled);
- Data.resize(32);
- HeadIdx = 0;
- }
-
- void Push(bool state, ui16 seqno) {
- FilledCount++;
- if (FilledCount == 1) { // First event must initialize seqno
- Seqno = seqno;
- InitData(state, true);
- if (state) {
- Modify(true, false);
- }
- return;
- }
- Y_ABORT_UNLESS(seqno != Seqno, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d",
- (int)Seqno, (int)seqno, (int)state, (int)CountFilled());
- ui16 diff = seqno;
- diff -= Seqno; // Underflow is fine
- size_t size = Data.size();
- if (size <= diff) { // Buffer is full -- extend and move wrapped part
- Data.resize(size * 2);
- for (size_t i = 0; i < HeadIdx; i++) {
- Data[size + i] = Data[i];
- Data[i].Filled = false;
- }
- }
- TItem& item = Data[(HeadIdx + diff) % Data.size()];
- Y_ABORT_UNLESS(!item.Filled, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d",
- (int)Seqno, (int)seqno, (int)state, (int)CountFilled());
- item.Filled = true;
- item.State = state;
- }
-
- bool Pop(bool& state, bool& prevState) {
- size_t nextIdx = (HeadIdx + 1) % Data.size();
- TItem& head = Data[HeadIdx];
- TItem& next = Data[nextIdx];
- if (!head.Filled || !next.Filled) {
- return false;
- }
- state = next.State;
- prevState = head.State;
- head.Filled = false;
- HeadIdx = nextIdx;
- Seqno++; // Overflow is fine
- FilledCount--;
- if (FilledCount == 1 && Data.size() > 32) {
- InitData(state, true);
- }
- return true;
- }
-
- size_t CountFilled() const {
- size_t ret = 0;
- for (const TItem& item : Data) {
- ret += item.Filled;
- }
- return ret;
- }
-};
-
class TBurstmeter {
private:
TBucketQuoter<i64, TSpinLock, THPTimerUs> Bucket;
diff --git a/ydb/core/blobstorage/ut_blobstorage/monitoring.cpp b/ydb/core/blobstorage/ut_blobstorage/monitoring.cpp
index 2af16506ab..f3976518b4 100644
--- a/ydb/core/blobstorage/ut_blobstorage/monitoring.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/monitoring.cpp
@@ -14,9 +14,17 @@ TString MakeData(ui32 dataSize) {
template <typename TDerived>
class TInflightActor : public TActorBootstrapped<TDerived> {
public:
- TInflightActor(ui32 requests, ui32 inflight)
- : RequestCount(requests)
- , RequestInflight(inflight)
+ struct TSettings {
+ ui32 Requests;
+ ui32 MaxInFlight;
+ TDuration Delay = TDuration::Zero();
+ };
+
+public:
+ TInflightActor(TSettings settings)
+ : RequestsToSend(settings.Requests)
+ , RequestInFlight(settings.MaxInFlight)
+ , Settings(settings)
{}
virtual ~TInflightActor() = default;
@@ -29,11 +37,18 @@ public:
}
protected:
- void SendRequests() {
- while (RequestInflight > 0 && RequestCount > 0) {
- RequestInflight--;
- RequestCount--;
- SendRequest();
+ void ScheduleRequests() {
+ while (RequestInFlight > 0 && RequestsToSend > 0) {
+ TMonotonic now = TMonotonic::Now();
+ TDuration timePassed = now - LastTs;
+ if (timePassed >= Settings.Delay) {
+ LastTs = now;
+ RequestInFlight--;
+ RequestsToSend--;
+ SendRequest();
+ } else {
+ TActorBootstrapped<TDerived>::Schedule(Settings.Delay - timePassed, new TEvents::TEvWakeup);
+ }
}
}
@@ -43,89 +58,110 @@ protected:
} else {
Fails++;
}
- ++RequestInflight;
- SendRequests();
+ ++RequestInFlight;
+ ScheduleRequests();
}
virtual void BootstrapImpl(const TActorContext &ctx) = 0;
virtual void SendRequest() = 0;
protected:
- ui32 RequestCount;
- ui32 RequestInflight;
+ ui32 RequestsToSend;
+ ui32 RequestInFlight;
ui32 GroupId;
+ TMonotonic LastTs;
+ TSettings Settings;
public:
ui32 OKs = 0;
ui32 Fails = 0;
};
-template <typename TInflightActor>
-void Test(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* actor) {
- const ui32 groupSize = topology.TotalVDisks;
- const auto& groupErasure = topology.GType;
- TEnvironmentSetup env{{
+ui64 AggregateVDiskCounters(std::unique_ptr<TEnvironmentSetup>& env, const NKikimrBlobStorage::TBaseConfig& baseConfig,
+ TString storagePool, ui32 groupSize, ui32 groupId, const std::vector<ui32>& pdiskLayout, TString subsystem,
+ TString counter, bool derivative = false) {
+ ui64 ctr = 0;
+
+ for (const auto& vslot : baseConfig.GetVSlot()) {
+ auto* appData = env->Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get();
+ for (ui32 i = 0; i < groupSize; ++i) {
+ ctr += GetServiceCounters(appData->Counters, "vdisks")->
+ GetSubgroup("storagePool", storagePool)->
+ GetSubgroup("group", std::to_string(groupId))->
+ GetSubgroup("orderNumber", "0" + std::to_string(i))->
+ GetSubgroup("pdisk", "00000" + std::to_string(pdiskLayout[i]))->
+ GetSubgroup("media", "rot")->
+ GetSubgroup("subsystem", subsystem)->
+ GetCounter(counter, derivative)->Val();
+ }
+ }
+ return ctr;
+};
+
+void SetupEnv(const TBlobStorageGroupInfo::TTopology& topology, std::unique_ptr<TEnvironmentSetup>& env,
+ NKikimrBlobStorage::TBaseConfig& baseConfig, ui32& groupSize, TBlobStorageGroupType& groupType,
+ ui32& groupId, std::vector<ui32>& pdiskLayout) {
+ groupSize = topology.TotalVDisks;
+ groupType = topology.GType;
+ env.reset(new TEnvironmentSetup({
.NodeCount = groupSize,
- .Erasure = groupErasure,
- }};
+ .Erasure = groupType,
+ }));
- env.CreateBoxAndPool(1, 1);
- env.Sim(TDuration::Seconds(30));
+ env->CreateBoxAndPool(1, 1);
+ env->Sim(TDuration::Seconds(30));
NKikimrBlobStorage::TConfigRequest request;
request.AddCommand()->MutableQueryBaseConfig();
- auto response = env.Invoke(request);
+ auto response = env->Invoke(request);
- const auto& baseConfig = response.GetStatus(0).GetBaseConfig();
+ baseConfig = response.GetStatus(0).GetBaseConfig();
UNIT_ASSERT_VALUES_EQUAL(baseConfig.GroupSize(), 1);
- ui32 groupId = baseConfig.GetGroup(0).GetGroupId();
- std::vector<ui32> pdiskIds(groupSize);
+ groupId = baseConfig.GetGroup(0).GetGroupId();
+ pdiskLayout.resize(groupSize);
for (const auto& vslot : baseConfig.GetVSlot()) {
const auto& vslotId = vslot.GetVSlotId();
ui32 orderNumber = topology.GetOrderNumber(TVDiskIdShort(vslot.GetFailRealmIdx(), vslot.GetFailDomainIdx(), vslot.GetVDiskIdx()));
if (vslot.GetGroupId() == groupId) {
- pdiskIds[orderNumber] = vslotId.GetPDiskId();
+ pdiskLayout[orderNumber] = vslotId.GetPDiskId();
}
}
+}
+
+template <typename TInflightActor>
+void TestDSProxyAndVDiskEqualCost(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* actor) {
+ std::unique_ptr<TEnvironmentSetup> env;
+ NKikimrBlobStorage::TBaseConfig baseConfig;
+ ui32 groupSize;
+ TBlobStorageGroupType groupType;
+ ui32 groupId;
+ std::vector<ui32> pdiskLayout;
+ SetupEnv(topology, env, baseConfig, groupSize, groupType, groupId, pdiskLayout);
ui64 dsproxyCost = 0;
ui64 vdiskCost = 0;
- auto vdisksTotal = [&](TString subsystem, TString counter, bool derivative = false) {
- ui64 ctr = 0;
- for (const auto& vslot : baseConfig.GetVSlot()) {
- auto* appData = env.Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get();
- for (ui32 i = 0; i < groupSize; ++i) {
- ctr += GetServiceCounters(appData->Counters, "vdisks")->
- GetSubgroup("storagePool", env.StoragePoolName)->
- GetSubgroup("group", std::to_string(groupId))->
- GetSubgroup("orderNumber", "0" + std::to_string(i))->
- GetSubgroup("pdisk", "00000" + std::to_string(pdiskIds[i]))->
- GetSubgroup("media", "rot")->
- GetSubgroup("subsystem", subsystem)->
- GetCounter(counter, derivative)->Val();
- }
- }
- return ctr;
- };
auto updateCounters = [&]() {
+ dsproxyCost = 0;
+
for (const auto& vslot : baseConfig.GetVSlot()) {
- auto* appData = env.Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get();
+ auto* appData = env->Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get();
dsproxyCost += GetServiceCounters(appData->Counters, "dsproxynode")->
GetSubgroup("subsystem", "request")->
- GetSubgroup("storagePool", env.StoragePoolName)->
+ GetSubgroup("storagePool", env->StoragePoolName)->
GetCounter("DSProxyDiskCostNs")->Val();
}
- vdiskCost = vdisksTotal("cost", "SkeletonFrontUserCostNs");
+ vdiskCost = AggregateVDiskCounters(env, baseConfig, env->StoragePoolName, groupSize, groupId,
+ pdiskLayout, "cost", "SkeletonFrontUserCostNs");
};
updateCounters();
UNIT_ASSERT_VALUES_EQUAL(dsproxyCost, vdiskCost);
actor->SetGroupId(groupId);
- env.Runtime->Register(actor, 1);
- env.Sim(TDuration::Minutes(15));
+ env->Runtime->Register(actor, 1);
+ env->Sim(TDuration::Minutes(15));
updateCounters();
@@ -138,19 +174,21 @@ void Test(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* acto
if constexpr(VERBOSE) {
Cerr << str.Str() << Endl;
+ // env->Runtime->GetAppData()->Counters->OutputPlainText(Cerr);
}
UNIT_ASSERT_VALUES_EQUAL_C(dsproxyCost, vdiskCost, str.Str());
}
class TInflightActorPut : public TInflightActor<TInflightActorPut> {
public:
- TInflightActorPut(ui32 requests, ui32 inflight, ui32 dataSize = 1024)
- : TInflightActor(requests, inflight)
+ TInflightActorPut(TSettings settings, ui32 dataSize = 1024)
+ : TInflightActor(settings)
, DataSize(dataSize)
{}
STRICT_STFUNC(StateWork,
- cFunc(TEvBlobStorage::TEvStatusResult::EventType, SendRequests);
+ cFunc(TEvBlobStorage::TEvStatusResult::EventType, ScheduleRequests);
+ cFunc(TEvents::TEvWakeup::EventType, ScheduleRequests);
hFunc(TEvBlobStorage::TEvPutResult, Handle);
)
@@ -164,7 +202,7 @@ public:
protected:
virtual void SendRequest() override {
TString data = MakeData(DataSize);
- auto ev = new TEvBlobStorage::TEvPut(TLogoBlobID(1, 1, 1, 10, DataSize, RequestCount + 1),
+ auto ev = new TEvBlobStorage::TEvPut(TLogoBlobID(1, 1, 1, 10, DataSize, RequestsToSend + 1),
data, TInstant::Max(), NKikimrBlobStorage::UserData);
SendToBSProxy(SelfId(), GroupId, ev, 0);
}
@@ -184,8 +222,8 @@ Y_UNIT_TEST(Test##requestType##erasure##Requests##requests##Inflight##inflight)
ui32 realms = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 1; \
ui32 domains = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 8; \
TBlobStorageGroupInfo::TTopology topology(groupType, realms, domains, 1, true); \
- auto actor = new TInflightActor##requestType(requests, inflight); \
- Test(topology, actor); \
+ auto actor = new TInflightActor##requestType({requests, inflight}); \
+ TestDSProxyAndVDiskEqualCost(topology, actor); \
}
#define MAKE_TEST_W_DATASIZE(erasure, requestType, requests, inflight, dataSize) \
@@ -194,19 +232,20 @@ Y_UNIT_TEST(Test##requestType##erasure##Requests##requests##Inflight##inflight##
ui32 realms = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 1; \
ui32 domains = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 8; \
TBlobStorageGroupInfo::TTopology topology(groupType, realms, domains, 1, true); \
- auto actor = new TInflightActor##requestType(requests, inflight, dataSize); \
- Test(topology, actor); \
+ auto actor = new TInflightActor##requestType({requests, inflight}, dataSize); \
+ TestDSProxyAndVDiskEqualCost(topology, actor); \
}
class TInflightActorGet : public TInflightActor<TInflightActorGet> {
public:
- TInflightActorGet(ui32 requests, ui32 inflight, ui32 dataSize = 1024)
- : TInflightActor(requests, inflight)
+ TInflightActorGet(TSettings settings, ui32 dataSize = 1024)
+ : TInflightActor(settings)
, DataSize(dataSize)
{}
STRICT_STFUNC(StateWork,
- cFunc(TEvBlobStorage::TEvPutResult::EventType, SendRequests);
+ cFunc(TEvBlobStorage::TEvPutResult::EventType, ScheduleRequests);
+ cFunc(TEvents::TEvWakeup::EventType, ScheduleRequests);
hFunc(TEvBlobStorage::TEvGetResult, Handle);
)
@@ -236,8 +275,8 @@ private:
class TInflightActorPatch : public TInflightActor<TInflightActorPatch> {
public:
- TInflightActorPatch(ui32 requests, ui32 inflight, ui32 dataSize = 1024)
- : TInflightActor(requests, inflight)
+ TInflightActorPatch(TSettings settings, ui32 dataSize = 1024)
+ : TInflightActor(settings)
, DataSize(dataSize)
{}
@@ -248,7 +287,7 @@ public:
virtual void BootstrapImpl(const TActorContext&/* ctx*/) override {
TString data = MakeData(DataSize);
- for (ui32 i = 0; i < RequestInflight; ++i) {
+ for (ui32 i = 0; i < RequestInFlight; ++i) {
TLogoBlobID blobId(1, 1, 1, 10, DataSize, 1 + i);
auto ev = new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max());
SendToBSProxy(SelfId(), GroupId, ev, 0);
@@ -263,7 +302,7 @@ protected:
TLogoBlobID newId(1, 1, oldId.Step() + 1, 10, DataSize, oldId.Cookie());
Y_ABORT_UNLESS(TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement(oldId, &newId, BlobIdMask, GroupId, GroupId));
TArrayHolder<TEvBlobStorage::TEvPatch::TDiff> diffs(new TEvBlobStorage::TEvPatch::TDiff[1]);
- char c = 'a' + RequestCount % 26;
+ char c = 'a' + RequestsToSend % 26;
diffs[0].Set(TString(DataSize, c), 0);
auto ev = new TEvBlobStorage::TEvPatch(GroupId, oldId, newId, BlobIdMask, std::move(diffs), 1, TInstant::Max());
SendToBSProxy(SelfId(), GroupId, ev, 0);
@@ -277,8 +316,8 @@ protected:
void Handle(TEvBlobStorage::TEvPutResult::TPtr res) {
Blobs.push_back(res->Get()->Id);
- if (++BlobsWritten == RequestInflight) {
- SendRequests();
+ if (++BlobsWritten == RequestInFlight) {
+ ScheduleRequests();
}
}
@@ -368,3 +407,42 @@ Y_UNIT_TEST_SUITE(CostMetricsPatchBlock4Plus2) {
MAKE_TEST_W_DATASIZE(4Plus2Block, Patch, 100, 10, 1000);
MAKE_TEST_W_DATASIZE(4Plus2Block, Patch, 10000, 100, 1000);
}
+
+template <typename TInflightActor>
+void TestBurst(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* actor, bool burstExpected) {
+ std::unique_ptr<TEnvironmentSetup> env;
+ NKikimrBlobStorage::TBaseConfig baseConfig;
+ ui32 groupSize;
+ TBlobStorageGroupType groupType;
+ ui32 groupId;
+ std::vector<ui32> pdiskLayout;
+ SetupEnv(topology, env, baseConfig, groupSize, groupType, groupId, pdiskLayout);
+
+ actor->SetGroupId(groupId);
+ env->Runtime->Register(actor, 1);
+ env->Sim(TDuration::Seconds(15));
+
+ ui64 redMs = AggregateVDiskCounters(env, baseConfig, env->StoragePoolName, groupSize, groupId,
+ pdiskLayout, "advancedCost", "BurstDetector_redMs");
+
+ if (burstExpected) {
+ UNIT_ASSERT_GT(redMs, 0);
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(redMs, burstExpected);
+ }
+}
+
+#define MAKE_BURST_TEST(testType, erasure, requestType, requests, inflight, delay, burstExpected) \
+Y_UNIT_TEST(Test##requestType##testType##erasure) { \
+ auto groupType = TBlobStorageGroupType::Erasure##erasure; \
+ ui32 realms = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 1; \
+ ui32 domains = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 8; \
+ TBlobStorageGroupInfo::TTopology topology(groupType, realms, domains, 1, true); \
+ auto* actor = new TInflightActor##requestType({requests, inflight, delay}, 1_KB); \
+ TestBurst(topology, actor, burstExpected); \
+}
+
+Y_UNIT_TEST_SUITE(BurstDetection) {
+ MAKE_BURST_TEST(Evenly, 4Plus2Block, Put, 100000, 1, TDuration::MicroSeconds(1), false);
+ MAKE_BURST_TEST(Burst, 4Plus2Block, Put, 100000, 1000000, TDuration::Zero(), true);
+}
diff --git a/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp b/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp
index 689c577bec..6ebab77acd 100644
--- a/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp
+++ b/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp
@@ -38,7 +38,9 @@ TBsCostTracker::TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::E
, ScrubDiskCost(CostCounters->GetCounter("ScrubDiskCost", true))
, DefragDiskCost(CostCounters->GetCounter("DefragDiskCost", true))
, InternalDiskCost(CostCounters->GetCounter("InternalDiskCost", true))
+ , Bucket(1'000'000'000, BucketCapacity)
{
+ BurstDetector.Initialize(CostCounters, "BurstDetector");
switch (GroupType.GetErasure()) {
case TBlobStorageGroupType::ErasureMirror3dc:
CostModel = std::make_unique<TBsCostModelMirror3dc>(diskType);
diff --git a/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h b/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h
index 27b9f9e9c7..6f84bd229d 100644
--- a/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h
+++ b/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h
@@ -5,8 +5,10 @@
#include "vdisk_events.h"
#include "vdisk_handle_class.h"
+#include <library/cpp/bucket_quoter/bucket_quoter.h>
#include <util/system/compiler.h>
#include <ydb/core/blobstorage/base/blobstorage_events.h>
+#include <ydb/core/util/light.h>
namespace NKikimr {
@@ -262,8 +264,7 @@ class TBsCostTracker {
private:
TBlobStorageGroupType GroupType;
std::unique_ptr<TBsCostModelBase> CostModel;
-
- const TIntrusivePtr<::NMonitoring::TDynamicCounters> CostCounters;
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> CostCounters;
::NMonitoring::TDynamicCounters::TCounterPtr UserDiskCost;
::NMonitoring::TDynamicCounters::TCounterPtr CompactionDiskCost;
@@ -271,6 +272,11 @@ private:
::NMonitoring::TDynamicCounters::TCounterPtr DefragDiskCost;
::NMonitoring::TDynamicCounters::TCounterPtr InternalDiskCost;
+ TBucketQuoter<i64, TSpinLock, THPTimerUs> Bucket;
+ static constexpr ui64 BucketCapacity = 1'000'000'000;
+ TLight BurstDetector;
+ std::atomic<ui64> SeqnoBurstDetector = 0;
+
public:
TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::EDeviceType diskType,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
@@ -296,38 +302,59 @@ public:
}
}
+ void CountRequest(ui64 cost) {
+ Bucket.Use(cost);
+ BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
+ }
+
public:
template<class TEvent>
void CountUserRequest(const TEvent& ev) {
- *UserDiskCost += GetCost(ev);
+ ui64 cost = GetCost(ev);
+ *UserDiskCost += cost;
+ CountRequest(cost);
}
void CountUserCost(ui64 cost) {
*UserDiskCost += cost;
+ CountRequest(cost);
}
template<class TEvent>
void CountCompactionRequest(const TEvent& ev) {
- *CompactionDiskCost += GetCost(ev);
+ ui64 cost = GetCost(ev);
+ *CompactionDiskCost += cost;
+ CountRequest(cost);
}
template<class TEvent>
void CountScrubRequest(const TEvent& ev) {
- *UserDiskCost += GetCost(ev);
+ ui64 cost = GetCost(ev);
+ *UserDiskCost += cost;
+ CountRequest(cost);
}
template<class TEvent>
void CountDefragRequest(const TEvent& ev) {
- *DefragDiskCost += GetCost(ev);
+ ui64 cost = GetCost(ev);
+ *DefragDiskCost += cost;
+ CountRequest(cost);
}
template<class TEvent>
void CountInternalRequest(const TEvent& ev) {
- *InternalDiskCost += GetCost(ev);
+ ui64 cost = GetCost(ev);
+ *InternalDiskCost += cost;
+ CountRequest(cost);
}
void CountInternalCost(ui64 cost) {
*InternalDiskCost += cost;
+ CountRequest(cost);
+ }
+
+ void CountPDiskResponse() {
+ BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
}
};
diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h
index e208983b08..c4ab516044 100644
--- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h
+++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h
@@ -170,6 +170,7 @@ namespace NKikimr {
// the same logic for every yard response: apply response and restart main cycle
void HandleYardResponse(NPDisk::TEvChunkReadResult::TPtr& ev, const TActorContext &ctx) {
--PendingResponses;
+ HullCtx->VCtx->CostTracker->CountPDiskResponse();
if (ev->Get()->Status != NKikimrProto::CORRUPTED) {
CHECK_PDISK_RESPONSE(HullCtx->VCtx, ev, ctx);
}
@@ -202,6 +203,7 @@ namespace NKikimr {
void HandleYardResponse(NPDisk::TEvChunkWriteResult::TPtr& ev, const TActorContext &ctx) {
--PendingResponses;
+ HullCtx->VCtx->CostTracker->CountPDiskResponse();
CHECK_PDISK_RESPONSE(HullCtx->VCtx, ev, ctx);
if (FinalizeIfAborting(ctx)) {
return;
diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp
index ef68ca83e0..e477ce788f 100644
--- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp
+++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp
@@ -11,6 +11,7 @@ namespace NKikimr {
Send(ScrubCtx->PDiskCtx->PDiskId, msg.release());
CurrentState = TStringBuilder() << "reading data from " << part.ToString();
auto res = WaitForPDiskEvent<NPDisk::TEvChunkReadResult>();
+ ScrubCtx->VCtx->CostTracker->CountPDiskResponse();
auto *m = res->Get();
Y_VERIFY_S(m->Status == NKikimrProto::OK || m->Status == NKikimrProto::CORRUPTED,
"Status# " << NKikimrProto::EReplyStatus_Name(m->Status));
@@ -41,6 +42,7 @@ namespace NKikimr {
Send(ScrubCtx->PDiskCtx->PDiskId, msg.release());
CurrentState = TStringBuilder() << "writing index to " << part.ToString();
auto res = WaitForPDiskEvent<NPDisk::TEvChunkWriteResult>();
+ ScrubCtx->VCtx->CostTracker->CountPDiskResponse();
Y_ABORT_UNLESS(res->Get()->Status == NKikimrProto::OK); // FIXME: good logic
}
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
index 238a69e391..2447d28501 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
@@ -1632,6 +1632,7 @@ namespace NKikimr {
extQueue.Completed(ctx, msgCtx, event);
TIntQueueClass &intQueue = GetIntQueue(msgCtx.IntQueueId);
intQueue.Completed(ctx, msgCtx, *this, id);
+ VCtx->CostTracker->CountPDiskResponse();
TActivationContext::Send(event.release());
}
diff --git a/ydb/core/util/hp_timer_helpers.h b/ydb/core/util/hp_timer_helpers.h
new file mode 100644
index 0000000000..e168b33ce8
--- /dev/null
+++ b/ydb/core/util/hp_timer_helpers.h
@@ -0,0 +1,64 @@
+#include <util/system/hp_timer.h>
+#include <ydb/library/actors/util/datetime.h>
+
+namespace NKikimr {
+
+inline NHPTimer::STime HPNow() {
+ NHPTimer::STime ret;
+ GetTimeFast(&ret);
+ return ret;
+}
+
+inline double HPSecondsFloat(i64 cycles) {
+ if (cycles > 0) {
+ return double(cycles) / NHPTimer::GetClockRate();
+ } else {
+ return 0.0;
+ }
+}
+
+inline double HPMilliSecondsFloat(i64 cycles) {
+ if (cycles > 0) {
+ return double(cycles) * 1000.0 / NHPTimer::GetClockRate();
+ } else {
+ return 0;
+ }
+}
+
+inline ui64 HPMilliSeconds(i64 cycles) {
+ return (ui64)HPMilliSecondsFloat(cycles);
+}
+
+inline ui64 HPMicroSecondsFloat(i64 cycles) {
+ if (cycles > 0) {
+ return double(cycles) * 1000000.0 / NHPTimer::GetClockRate();
+ } else {
+ return 0;
+ }
+}
+
+inline ui64 HPMicroSeconds(i64 cycles) {
+ return (ui64)HPMicroSecondsFloat(cycles);
+}
+
+inline ui64 HPNanoSeconds(i64 cycles) {
+ if (cycles > 0) {
+ return ui64(double(cycles) * 1000000000.0 / NHPTimer::GetClockRate());
+ } else {
+ return 0;
+ }
+}
+
+inline ui64 HPCyclesNs(ui64 ns) {
+ return ui64(NHPTimer::GetClockRate() * double(ns) / 1000000000.0);
+}
+
+inline ui64 HPCyclesUs(ui64 us) {
+ return ui64(NHPTimer::GetClockRate() * double(us) / 1000000.0);
+}
+
+inline ui64 HPCyclesMs(ui64 ms) {
+ return ui64(NHPTimer::GetClockRate() * double(ms) / 1000.0);
+}
+
+} // namespace NKikimr
diff --git a/ydb/core/util/light.h b/ydb/core/util/light.h
new file mode 100644
index 0000000000..10a698a45e
--- /dev/null
+++ b/ydb/core/util/light.h
@@ -0,0 +1,213 @@
+#include <ydb/core/mon/mon.h>
+
+#include "hp_timer_helpers.h"
+
+namespace NKikimr {
+
+class TLightBase {
+protected:
+ TString Name;
+ ::NMonitoring::TDynamicCounters::TCounterPtr State; // Current state (0=OFF=green, 1=ON=red)
+ ::NMonitoring::TDynamicCounters::TCounterPtr Count; // Number of switches to ON state
+ ::NMonitoring::TDynamicCounters::TCounterPtr RedMs; // Time elapsed in ON state
+ ::NMonitoring::TDynamicCounters::TCounterPtr GreenMs; // Time elapsed in OFF state
+private:
+ ui64 RedCycles = 0;
+ ui64 GreenCycles = 0;
+ NHPTimer::STime AdvancedTill = 0;
+ NHPTimer::STime LastNow = 0;
+ ui64 UpdateThreshold = 0;
+public:
+ void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& name) {
+ Name = name;
+ State = counters->GetCounter(name + "_state");
+ Count = counters->GetCounter(name + "_count", true);
+ RedMs = counters->GetCounter(name + "_redMs", true);
+ GreenMs = counters->GetCounter(name + "_greenMs", true);
+ UpdateThreshold = HPCyclesMs(100);
+ AdvancedTill = Now();
+ }
+
+ void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& countName,
+ const TString& redMsName,const TString& greenMsName) {
+ Count = counters->GetCounter(countName, true);
+ RedMs = counters->GetCounter(redMsName, true);
+ GreenMs = counters->GetCounter(greenMsName, true);
+ UpdateThreshold = HPCyclesMs(100);
+ AdvancedTill = Now();
+ }
+
+ ui64 GetCount() const {
+ return *Count;
+ }
+
+ ui64 GetRedMs() const {
+ return *RedMs;
+ }
+
+ ui64 GetGreenMs() const {
+ return *GreenMs;
+ }
+protected:
+ void Modify(bool state, bool prevState) {
+ if (state && !prevState) { // Switched to ON state
+ if (State) {
+ *State = true;
+ }
+ (*Count)++;
+ return;
+ }
+ if (!state && prevState) { // Switched to OFF state
+ if (State) {
+ *State = false;
+ }
+ return;
+ }
+ }
+
+ void Advance(bool state, NHPTimer::STime now) {
+ if (now == AdvancedTill) {
+ return;
+ }
+ Elapsed(state, now - AdvancedTill);
+ if (RedCycles > UpdateThreshold) {
+ *RedMs += CutMs(RedCycles);
+ }
+ if (GreenCycles > UpdateThreshold) {
+ *GreenMs += CutMs(GreenCycles);
+ }
+ AdvancedTill = now;
+ }
+
+ NHPTimer::STime Now() {
+ // Avoid time going backwards
+ NHPTimer::STime now = HPNow();
+ if (now < LastNow) {
+ now = LastNow;
+ }
+ LastNow = now;
+ return now;
+ }
+private:
+ void Elapsed(bool state, ui64 cycles) {
+ if (state) {
+ RedCycles += cycles;
+ } else {
+ GreenCycles += cycles;
+ }
+ }
+
+ ui64 CutMs(ui64& src) {
+ ui64 ms = HPMilliSeconds(src);
+ ui64 cycles = HPCyclesMs(ms);
+ src -= cycles;
+ return ms;
+ }
+};
+
+// Thread-safe light
+class TLight : public TLightBase {
+private:
+ struct TItem {
+ bool State;
+ bool Filled;
+ TItem(bool state = false, bool filled = false)
+ : State(state)
+ , Filled(filled)
+ {}
+ };
+
+ // Cyclic buffer to enforce event ordering by seqno
+ TSpinLock Lock;
+ size_t HeadIdx = 0; // Index of current state
+ size_t FilledCount = 0;
+ ui16 Seqno = 0; // Current seqno
+ TStackVec<TItem, 32> Data; // In theory should have not more than thread count items
+public:
+ TLight() {
+ InitData();
+ }
+
+ void Set(bool state, ui16 seqno) {
+ TGuard<TSpinLock> g(Lock);
+ Push(state, seqno);
+ bool prevState;
+ // Note that 'state' variable is being reused
+ NHPTimer::STime now = Now();
+ while (Pop(state, prevState)) {
+ Modify(state, prevState);
+ Advance(prevState, now);
+ }
+ }
+
+ void Update() {
+ TGuard<TSpinLock> g(Lock);
+ Advance(Data[HeadIdx].State, Now());
+ }
+
+private:
+ void InitData(bool state = false, bool filled = false) {
+ Data.clear();
+ Data.emplace_back(state, filled);
+ Data.resize(32);
+ HeadIdx = 0;
+ }
+
+ void Push(bool state, ui16 seqno) {
+ FilledCount++;
+ if (FilledCount == 1) { // First event must initialize seqno
+ Seqno = seqno;
+ InitData(state, true);
+ if (state) {
+ Modify(true, false);
+ }
+ return;
+ }
+ Y_ABORT_UNLESS(seqno != Seqno, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d",
+ (int)Seqno, (int)seqno, (int)state, (int)CountFilled());
+ ui16 diff = seqno;
+ diff -= Seqno; // Underflow is fine
+ size_t size = Data.size();
+ if (size <= diff) { // Buffer is full -- extend and move wrapped part
+ Data.resize(size * 2);
+ for (size_t i = 0; i < HeadIdx; i++) {
+ Data[size + i] = Data[i];
+ Data[i].Filled = false;
+ }
+ }
+ TItem& item = Data[(HeadIdx + diff) % Data.size()];
+ Y_ABORT_UNLESS(!item.Filled, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d",
+ (int)Seqno, (int)seqno, (int)state, (int)CountFilled());
+ item.Filled = true;
+ item.State = state;
+ }
+
+ bool Pop(bool& state, bool& prevState) {
+ size_t nextIdx = (HeadIdx + 1) % Data.size();
+ TItem& head = Data[HeadIdx];
+ TItem& next = Data[nextIdx];
+ if (!head.Filled || !next.Filled) {
+ return false;
+ }
+ state = next.State;
+ prevState = head.State;
+ head.Filled = false;
+ HeadIdx = nextIdx;
+ Seqno++; // Overflow is fine
+ FilledCount--;
+ if (FilledCount == 1 && Data.size() > 32) {
+ InitData(state, true);
+ }
+ return true;
+ }
+
+ size_t CountFilled() const {
+ size_t ret = 0;
+ for (const TItem& item : Data) {
+ ret += item.Filled;
+ }
+ return ret;
+ }
+};
+
+} // namespace NKikimr