diff options
author | Sergey Belyakov <serg-belyakov@ydb.tech> | 2024-01-31 11:49:30 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 11:49:30 +0300 |
commit | 3b1a363cecc7a179a26519f4c3f3964c43183c2b (patch) | |
tree | 3da07d6c9098757c19a2d703b30c841df2e18eb5 | |
parent | 2e3d80efffa46aa70783fdb1bcedac569156d995 (diff) | |
download | ydb-3b1a363cecc7a179a26519f4c3f3964c43183c2b.tar.gz |
Add light indicator for bursts to BsCostModel, #1336 (#1337)
* Move TLight to core/util, add BurstDetector to TBsCostTracker
* Add UT
* Count PDisk responses, fix UT build
* Update BurstDetector
* Fix UT
-rw-r--r-- | ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h | 265 | ||||
-rw-r--r-- | ydb/core/blobstorage/ut_blobstorage/monitoring.cpp | 208 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h | 41 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h | 2 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp | 1 | ||||
-rw-r--r-- | ydb/core/util/hp_timer_helpers.h | 64 | ||||
-rw-r--r-- | ydb/core/util/light.h | 213 |
9 files changed, 462 insertions, 336 deletions
diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h index 621d28c7d7..664451321e 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_mon.h @@ -4,6 +4,7 @@ #include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h> #include <ydb/core/mon/mon.h> #include <ydb/core/protos/node_whiteboard.pb.h> +#include <ydb/core/util/light.h> #include <library/cpp/bucket_quoter/bucket_quoter.h> #include <library/cpp/containers/stack_vector/stack_vec.h> @@ -14,270 +15,6 @@ namespace NKikimr { struct TPDiskConfig; -inline NHPTimer::STime HPNow() { - NHPTimer::STime ret; - GetTimeFast(&ret); - return ret; -} - -inline double HPSecondsFloat(i64 cycles) { - if (cycles > 0) { - return double(cycles) / NHPTimer::GetClockRate(); - } else { - return 0.0; - } -} - -inline double HPMilliSecondsFloat(i64 cycles) { - if (cycles > 0) { - return double(cycles) * 1000.0 / NHPTimer::GetClockRate(); - } else { - return 0; - } -} - -inline ui64 HPMilliSeconds(i64 cycles) { - return (ui64)HPMilliSecondsFloat(cycles); -} - -inline ui64 HPMicroSecondsFloat(i64 cycles) { - if (cycles > 0) { - return double(cycles) * 1000000.0 / NHPTimer::GetClockRate(); - } else { - return 0; - } -} - -inline ui64 HPMicroSeconds(i64 cycles) { - return (ui64)HPMicroSecondsFloat(cycles); -} - -inline ui64 HPNanoSeconds(i64 cycles) { - if (cycles > 0) { - return ui64(double(cycles) * 1000000000.0 / NHPTimer::GetClockRate()); - } else { - return 0; - } -} - -inline ui64 HPCyclesNs(ui64 ns) { - return ui64(NHPTimer::GetClockRate() * double(ns) / 1000000000.0); -} - -inline ui64 HPCyclesUs(ui64 us) { - return ui64(NHPTimer::GetClockRate() * double(us) / 1000000.0); -} - -inline ui64 HPCyclesMs(ui64 ms) { - return ui64(NHPTimer::GetClockRate() * double(ms) / 1000.0); -} - -class TLightBase { -protected: - TString Name; - ::NMonitoring::TDynamicCounters::TCounterPtr State; // Current state (0=OFF=green, 1=ON=red) - ::NMonitoring::TDynamicCounters::TCounterPtr Count; // Number of switches to ON state - ::NMonitoring::TDynamicCounters::TCounterPtr RedMs; // Time elapsed in ON state - ::NMonitoring::TDynamicCounters::TCounterPtr GreenMs; // Time elapsed in OFF state -private: - ui64 RedCycles = 0; - ui64 GreenCycles = 0; - NHPTimer::STime AdvancedTill = 0; - NHPTimer::STime LastNow = 0; - ui64 UpdateThreshold = 0; -public: - void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& name) { - Name = name; - State = counters->GetCounter(name + "_state"); - Count = counters->GetCounter(name + "_count", true); - RedMs = counters->GetCounter(name + "_redMs", true); - GreenMs = counters->GetCounter(name + "_greenMs", true); - UpdateThreshold = HPCyclesMs(100); - AdvancedTill = Now(); - } - - void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& countName, - const TString& redMsName,const TString& greenMsName) { - Count = counters->GetCounter(countName, true); - RedMs = counters->GetCounter(redMsName, true); - GreenMs = counters->GetCounter(greenMsName, true); - UpdateThreshold = HPCyclesMs(100); - AdvancedTill = Now(); - } - - ui64 GetCount() const { - return *Count; - } - - ui64 GetRedMs() const { - return *RedMs; - } - - ui64 GetGreenMs() const { - return *GreenMs; - } -protected: - void Modify(bool state, bool prevState) { - if (state && !prevState) { // Switched to ON state - if (State) { - *State = true; - } - (*Count)++; - return; - } - if (!state && prevState) { // Switched to OFF state - if (State) { - *State = false; - } - return; - } - } - - void Advance(bool state, NHPTimer::STime now) { - if (now == AdvancedTill) { - return; - } - Elapsed(state, now - AdvancedTill); - if (RedCycles > UpdateThreshold) { - *RedMs += CutMs(RedCycles); - } - if (GreenCycles > UpdateThreshold) { - *GreenMs += CutMs(GreenCycles); - } - AdvancedTill = now; - } - - NHPTimer::STime Now() { - // Avoid time going backwards - NHPTimer::STime now = HPNow(); - if (now < LastNow) { - now = LastNow; - } - LastNow = now; - return now; - } -private: - void Elapsed(bool state, ui64 cycles) { - if (state) { - RedCycles += cycles; - } else { - GreenCycles += cycles; - } - } - - ui64 CutMs(ui64& src) { - ui64 ms = HPMilliSeconds(src); - ui64 cycles = HPCyclesMs(ms); - src -= cycles; - return ms; - } -}; - -// Thread-safe light -class TLight : public TLightBase { -private: - struct TItem { - bool State; - bool Filled; - TItem(bool state = false, bool filled = false) - : State(state) - , Filled(filled) - {} - }; - - // Cyclic buffer to enforce event ordering by seqno - TSpinLock Lock; - size_t HeadIdx = 0; // Index of current state - size_t FilledCount = 0; - ui16 Seqno = 0; // Current seqno - TStackVec<TItem, 32> Data; // In theory should have not more than thread count items -public: - TLight() { - InitData(); - } - - void Set(bool state, ui16 seqno) { - TGuard<TSpinLock> g(Lock); - Push(state, seqno); - bool prevState; - // Note that 'state' variable is being reused - NHPTimer::STime now = Now(); - while (Pop(state, prevState)) { - Modify(state, prevState); - Advance(prevState, now); - } - } - - void Update() { - TGuard<TSpinLock> g(Lock); - Advance(Data[HeadIdx].State, Now()); - } - -private: - void InitData(bool state = false, bool filled = false) { - Data.clear(); - Data.emplace_back(state, filled); - Data.resize(32); - HeadIdx = 0; - } - - void Push(bool state, ui16 seqno) { - FilledCount++; - if (FilledCount == 1) { // First event must initialize seqno - Seqno = seqno; - InitData(state, true); - if (state) { - Modify(true, false); - } - return; - } - Y_ABORT_UNLESS(seqno != Seqno, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d", - (int)Seqno, (int)seqno, (int)state, (int)CountFilled()); - ui16 diff = seqno; - diff -= Seqno; // Underflow is fine - size_t size = Data.size(); - if (size <= diff) { // Buffer is full -- extend and move wrapped part - Data.resize(size * 2); - for (size_t i = 0; i < HeadIdx; i++) { - Data[size + i] = Data[i]; - Data[i].Filled = false; - } - } - TItem& item = Data[(HeadIdx + diff) % Data.size()]; - Y_ABORT_UNLESS(!item.Filled, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d", - (int)Seqno, (int)seqno, (int)state, (int)CountFilled()); - item.Filled = true; - item.State = state; - } - - bool Pop(bool& state, bool& prevState) { - size_t nextIdx = (HeadIdx + 1) % Data.size(); - TItem& head = Data[HeadIdx]; - TItem& next = Data[nextIdx]; - if (!head.Filled || !next.Filled) { - return false; - } - state = next.State; - prevState = head.State; - head.Filled = false; - HeadIdx = nextIdx; - Seqno++; // Overflow is fine - FilledCount--; - if (FilledCount == 1 && Data.size() > 32) { - InitData(state, true); - } - return true; - } - - size_t CountFilled() const { - size_t ret = 0; - for (const TItem& item : Data) { - ret += item.Filled; - } - return ret; - } -}; - class TBurstmeter { private: TBucketQuoter<i64, TSpinLock, THPTimerUs> Bucket; diff --git a/ydb/core/blobstorage/ut_blobstorage/monitoring.cpp b/ydb/core/blobstorage/ut_blobstorage/monitoring.cpp index 2af16506ab..f3976518b4 100644 --- a/ydb/core/blobstorage/ut_blobstorage/monitoring.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/monitoring.cpp @@ -14,9 +14,17 @@ TString MakeData(ui32 dataSize) { template <typename TDerived> class TInflightActor : public TActorBootstrapped<TDerived> { public: - TInflightActor(ui32 requests, ui32 inflight) - : RequestCount(requests) - , RequestInflight(inflight) + struct TSettings { + ui32 Requests; + ui32 MaxInFlight; + TDuration Delay = TDuration::Zero(); + }; + +public: + TInflightActor(TSettings settings) + : RequestsToSend(settings.Requests) + , RequestInFlight(settings.MaxInFlight) + , Settings(settings) {} virtual ~TInflightActor() = default; @@ -29,11 +37,18 @@ public: } protected: - void SendRequests() { - while (RequestInflight > 0 && RequestCount > 0) { - RequestInflight--; - RequestCount--; - SendRequest(); + void ScheduleRequests() { + while (RequestInFlight > 0 && RequestsToSend > 0) { + TMonotonic now = TMonotonic::Now(); + TDuration timePassed = now - LastTs; + if (timePassed >= Settings.Delay) { + LastTs = now; + RequestInFlight--; + RequestsToSend--; + SendRequest(); + } else { + TActorBootstrapped<TDerived>::Schedule(Settings.Delay - timePassed, new TEvents::TEvWakeup); + } } } @@ -43,89 +58,110 @@ protected: } else { Fails++; } - ++RequestInflight; - SendRequests(); + ++RequestInFlight; + ScheduleRequests(); } virtual void BootstrapImpl(const TActorContext &ctx) = 0; virtual void SendRequest() = 0; protected: - ui32 RequestCount; - ui32 RequestInflight; + ui32 RequestsToSend; + ui32 RequestInFlight; ui32 GroupId; + TMonotonic LastTs; + TSettings Settings; public: ui32 OKs = 0; ui32 Fails = 0; }; -template <typename TInflightActor> -void Test(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* actor) { - const ui32 groupSize = topology.TotalVDisks; - const auto& groupErasure = topology.GType; - TEnvironmentSetup env{{ +ui64 AggregateVDiskCounters(std::unique_ptr<TEnvironmentSetup>& env, const NKikimrBlobStorage::TBaseConfig& baseConfig, + TString storagePool, ui32 groupSize, ui32 groupId, const std::vector<ui32>& pdiskLayout, TString subsystem, + TString counter, bool derivative = false) { + ui64 ctr = 0; + + for (const auto& vslot : baseConfig.GetVSlot()) { + auto* appData = env->Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get(); + for (ui32 i = 0; i < groupSize; ++i) { + ctr += GetServiceCounters(appData->Counters, "vdisks")-> + GetSubgroup("storagePool", storagePool)-> + GetSubgroup("group", std::to_string(groupId))-> + GetSubgroup("orderNumber", "0" + std::to_string(i))-> + GetSubgroup("pdisk", "00000" + std::to_string(pdiskLayout[i]))-> + GetSubgroup("media", "rot")-> + GetSubgroup("subsystem", subsystem)-> + GetCounter(counter, derivative)->Val(); + } + } + return ctr; +}; + +void SetupEnv(const TBlobStorageGroupInfo::TTopology& topology, std::unique_ptr<TEnvironmentSetup>& env, + NKikimrBlobStorage::TBaseConfig& baseConfig, ui32& groupSize, TBlobStorageGroupType& groupType, + ui32& groupId, std::vector<ui32>& pdiskLayout) { + groupSize = topology.TotalVDisks; + groupType = topology.GType; + env.reset(new TEnvironmentSetup({ .NodeCount = groupSize, - .Erasure = groupErasure, - }}; + .Erasure = groupType, + })); - env.CreateBoxAndPool(1, 1); - env.Sim(TDuration::Seconds(30)); + env->CreateBoxAndPool(1, 1); + env->Sim(TDuration::Seconds(30)); NKikimrBlobStorage::TConfigRequest request; request.AddCommand()->MutableQueryBaseConfig(); - auto response = env.Invoke(request); + auto response = env->Invoke(request); - const auto& baseConfig = response.GetStatus(0).GetBaseConfig(); + baseConfig = response.GetStatus(0).GetBaseConfig(); UNIT_ASSERT_VALUES_EQUAL(baseConfig.GroupSize(), 1); - ui32 groupId = baseConfig.GetGroup(0).GetGroupId(); - std::vector<ui32> pdiskIds(groupSize); + groupId = baseConfig.GetGroup(0).GetGroupId(); + pdiskLayout.resize(groupSize); for (const auto& vslot : baseConfig.GetVSlot()) { const auto& vslotId = vslot.GetVSlotId(); ui32 orderNumber = topology.GetOrderNumber(TVDiskIdShort(vslot.GetFailRealmIdx(), vslot.GetFailDomainIdx(), vslot.GetVDiskIdx())); if (vslot.GetGroupId() == groupId) { - pdiskIds[orderNumber] = vslotId.GetPDiskId(); + pdiskLayout[orderNumber] = vslotId.GetPDiskId(); } } +} + +template <typename TInflightActor> +void TestDSProxyAndVDiskEqualCost(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* actor) { + std::unique_ptr<TEnvironmentSetup> env; + NKikimrBlobStorage::TBaseConfig baseConfig; + ui32 groupSize; + TBlobStorageGroupType groupType; + ui32 groupId; + std::vector<ui32> pdiskLayout; + SetupEnv(topology, env, baseConfig, groupSize, groupType, groupId, pdiskLayout); ui64 dsproxyCost = 0; ui64 vdiskCost = 0; - auto vdisksTotal = [&](TString subsystem, TString counter, bool derivative = false) { - ui64 ctr = 0; - for (const auto& vslot : baseConfig.GetVSlot()) { - auto* appData = env.Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get(); - for (ui32 i = 0; i < groupSize; ++i) { - ctr += GetServiceCounters(appData->Counters, "vdisks")-> - GetSubgroup("storagePool", env.StoragePoolName)-> - GetSubgroup("group", std::to_string(groupId))-> - GetSubgroup("orderNumber", "0" + std::to_string(i))-> - GetSubgroup("pdisk", "00000" + std::to_string(pdiskIds[i]))-> - GetSubgroup("media", "rot")-> - GetSubgroup("subsystem", subsystem)-> - GetCounter(counter, derivative)->Val(); - } - } - return ctr; - }; auto updateCounters = [&]() { + dsproxyCost = 0; + for (const auto& vslot : baseConfig.GetVSlot()) { - auto* appData = env.Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get(); + auto* appData = env->Runtime->GetNode(vslot.GetVSlotId().GetNodeId())->AppData.get(); dsproxyCost += GetServiceCounters(appData->Counters, "dsproxynode")-> GetSubgroup("subsystem", "request")-> - GetSubgroup("storagePool", env.StoragePoolName)-> + GetSubgroup("storagePool", env->StoragePoolName)-> GetCounter("DSProxyDiskCostNs")->Val(); } - vdiskCost = vdisksTotal("cost", "SkeletonFrontUserCostNs"); + vdiskCost = AggregateVDiskCounters(env, baseConfig, env->StoragePoolName, groupSize, groupId, + pdiskLayout, "cost", "SkeletonFrontUserCostNs"); }; updateCounters(); UNIT_ASSERT_VALUES_EQUAL(dsproxyCost, vdiskCost); actor->SetGroupId(groupId); - env.Runtime->Register(actor, 1); - env.Sim(TDuration::Minutes(15)); + env->Runtime->Register(actor, 1); + env->Sim(TDuration::Minutes(15)); updateCounters(); @@ -138,19 +174,21 @@ void Test(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* acto if constexpr(VERBOSE) { Cerr << str.Str() << Endl; + // env->Runtime->GetAppData()->Counters->OutputPlainText(Cerr); } UNIT_ASSERT_VALUES_EQUAL_C(dsproxyCost, vdiskCost, str.Str()); } class TInflightActorPut : public TInflightActor<TInflightActorPut> { public: - TInflightActorPut(ui32 requests, ui32 inflight, ui32 dataSize = 1024) - : TInflightActor(requests, inflight) + TInflightActorPut(TSettings settings, ui32 dataSize = 1024) + : TInflightActor(settings) , DataSize(dataSize) {} STRICT_STFUNC(StateWork, - cFunc(TEvBlobStorage::TEvStatusResult::EventType, SendRequests); + cFunc(TEvBlobStorage::TEvStatusResult::EventType, ScheduleRequests); + cFunc(TEvents::TEvWakeup::EventType, ScheduleRequests); hFunc(TEvBlobStorage::TEvPutResult, Handle); ) @@ -164,7 +202,7 @@ public: protected: virtual void SendRequest() override { TString data = MakeData(DataSize); - auto ev = new TEvBlobStorage::TEvPut(TLogoBlobID(1, 1, 1, 10, DataSize, RequestCount + 1), + auto ev = new TEvBlobStorage::TEvPut(TLogoBlobID(1, 1, 1, 10, DataSize, RequestsToSend + 1), data, TInstant::Max(), NKikimrBlobStorage::UserData); SendToBSProxy(SelfId(), GroupId, ev, 0); } @@ -184,8 +222,8 @@ Y_UNIT_TEST(Test##requestType##erasure##Requests##requests##Inflight##inflight) ui32 realms = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 1; \ ui32 domains = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 8; \ TBlobStorageGroupInfo::TTopology topology(groupType, realms, domains, 1, true); \ - auto actor = new TInflightActor##requestType(requests, inflight); \ - Test(topology, actor); \ + auto actor = new TInflightActor##requestType({requests, inflight}); \ + TestDSProxyAndVDiskEqualCost(topology, actor); \ } #define MAKE_TEST_W_DATASIZE(erasure, requestType, requests, inflight, dataSize) \ @@ -194,19 +232,20 @@ Y_UNIT_TEST(Test##requestType##erasure##Requests##requests##Inflight##inflight## ui32 realms = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 1; \ ui32 domains = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 8; \ TBlobStorageGroupInfo::TTopology topology(groupType, realms, domains, 1, true); \ - auto actor = new TInflightActor##requestType(requests, inflight, dataSize); \ - Test(topology, actor); \ + auto actor = new TInflightActor##requestType({requests, inflight}, dataSize); \ + TestDSProxyAndVDiskEqualCost(topology, actor); \ } class TInflightActorGet : public TInflightActor<TInflightActorGet> { public: - TInflightActorGet(ui32 requests, ui32 inflight, ui32 dataSize = 1024) - : TInflightActor(requests, inflight) + TInflightActorGet(TSettings settings, ui32 dataSize = 1024) + : TInflightActor(settings) , DataSize(dataSize) {} STRICT_STFUNC(StateWork, - cFunc(TEvBlobStorage::TEvPutResult::EventType, SendRequests); + cFunc(TEvBlobStorage::TEvPutResult::EventType, ScheduleRequests); + cFunc(TEvents::TEvWakeup::EventType, ScheduleRequests); hFunc(TEvBlobStorage::TEvGetResult, Handle); ) @@ -236,8 +275,8 @@ private: class TInflightActorPatch : public TInflightActor<TInflightActorPatch> { public: - TInflightActorPatch(ui32 requests, ui32 inflight, ui32 dataSize = 1024) - : TInflightActor(requests, inflight) + TInflightActorPatch(TSettings settings, ui32 dataSize = 1024) + : TInflightActor(settings) , DataSize(dataSize) {} @@ -248,7 +287,7 @@ public: virtual void BootstrapImpl(const TActorContext&/* ctx*/) override { TString data = MakeData(DataSize); - for (ui32 i = 0; i < RequestInflight; ++i) { + for (ui32 i = 0; i < RequestInFlight; ++i) { TLogoBlobID blobId(1, 1, 1, 10, DataSize, 1 + i); auto ev = new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max()); SendToBSProxy(SelfId(), GroupId, ev, 0); @@ -263,7 +302,7 @@ protected: TLogoBlobID newId(1, 1, oldId.Step() + 1, 10, DataSize, oldId.Cookie()); Y_ABORT_UNLESS(TEvBlobStorage::TEvPatch::GetBlobIdWithSamePlacement(oldId, &newId, BlobIdMask, GroupId, GroupId)); TArrayHolder<TEvBlobStorage::TEvPatch::TDiff> diffs(new TEvBlobStorage::TEvPatch::TDiff[1]); - char c = 'a' + RequestCount % 26; + char c = 'a' + RequestsToSend % 26; diffs[0].Set(TString(DataSize, c), 0); auto ev = new TEvBlobStorage::TEvPatch(GroupId, oldId, newId, BlobIdMask, std::move(diffs), 1, TInstant::Max()); SendToBSProxy(SelfId(), GroupId, ev, 0); @@ -277,8 +316,8 @@ protected: void Handle(TEvBlobStorage::TEvPutResult::TPtr res) { Blobs.push_back(res->Get()->Id); - if (++BlobsWritten == RequestInflight) { - SendRequests(); + if (++BlobsWritten == RequestInFlight) { + ScheduleRequests(); } } @@ -368,3 +407,42 @@ Y_UNIT_TEST_SUITE(CostMetricsPatchBlock4Plus2) { MAKE_TEST_W_DATASIZE(4Plus2Block, Patch, 100, 10, 1000); MAKE_TEST_W_DATASIZE(4Plus2Block, Patch, 10000, 100, 1000); } + +template <typename TInflightActor> +void TestBurst(const TBlobStorageGroupInfo::TTopology& topology, TInflightActor* actor, bool burstExpected) { + std::unique_ptr<TEnvironmentSetup> env; + NKikimrBlobStorage::TBaseConfig baseConfig; + ui32 groupSize; + TBlobStorageGroupType groupType; + ui32 groupId; + std::vector<ui32> pdiskLayout; + SetupEnv(topology, env, baseConfig, groupSize, groupType, groupId, pdiskLayout); + + actor->SetGroupId(groupId); + env->Runtime->Register(actor, 1); + env->Sim(TDuration::Seconds(15)); + + ui64 redMs = AggregateVDiskCounters(env, baseConfig, env->StoragePoolName, groupSize, groupId, + pdiskLayout, "advancedCost", "BurstDetector_redMs"); + + if (burstExpected) { + UNIT_ASSERT_GT(redMs, 0); + } else { + UNIT_ASSERT_VALUES_EQUAL(redMs, burstExpected); + } +} + +#define MAKE_BURST_TEST(testType, erasure, requestType, requests, inflight, delay, burstExpected) \ +Y_UNIT_TEST(Test##requestType##testType##erasure) { \ + auto groupType = TBlobStorageGroupType::Erasure##erasure; \ + ui32 realms = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 1; \ + ui32 domains = (groupType == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 8; \ + TBlobStorageGroupInfo::TTopology topology(groupType, realms, domains, 1, true); \ + auto* actor = new TInflightActor##requestType({requests, inflight, delay}, 1_KB); \ + TestBurst(topology, actor, burstExpected); \ +} + +Y_UNIT_TEST_SUITE(BurstDetection) { + MAKE_BURST_TEST(Evenly, 4Plus2Block, Put, 100000, 1, TDuration::MicroSeconds(1), false); + MAKE_BURST_TEST(Burst, 4Plus2Block, Put, 100000, 1000000, TDuration::Zero(), true); +} diff --git a/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp b/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp index 689c577bec..6ebab77acd 100644 --- a/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp +++ b/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp @@ -38,7 +38,9 @@ TBsCostTracker::TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::E , ScrubDiskCost(CostCounters->GetCounter("ScrubDiskCost", true)) , DefragDiskCost(CostCounters->GetCounter("DefragDiskCost", true)) , InternalDiskCost(CostCounters->GetCounter("InternalDiskCost", true)) + , Bucket(1'000'000'000, BucketCapacity) { + BurstDetector.Initialize(CostCounters, "BurstDetector"); switch (GroupType.GetErasure()) { case TBlobStorageGroupType::ErasureMirror3dc: CostModel = std::make_unique<TBsCostModelMirror3dc>(diskType); diff --git a/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h b/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h index 27b9f9e9c7..6f84bd229d 100644 --- a/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h +++ b/ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h @@ -5,8 +5,10 @@ #include "vdisk_events.h" #include "vdisk_handle_class.h" +#include <library/cpp/bucket_quoter/bucket_quoter.h> #include <util/system/compiler.h> #include <ydb/core/blobstorage/base/blobstorage_events.h> +#include <ydb/core/util/light.h> namespace NKikimr { @@ -262,8 +264,7 @@ class TBsCostTracker { private: TBlobStorageGroupType GroupType; std::unique_ptr<TBsCostModelBase> CostModel; - - const TIntrusivePtr<::NMonitoring::TDynamicCounters> CostCounters; + TIntrusivePtr<::NMonitoring::TDynamicCounters> CostCounters; ::NMonitoring::TDynamicCounters::TCounterPtr UserDiskCost; ::NMonitoring::TDynamicCounters::TCounterPtr CompactionDiskCost; @@ -271,6 +272,11 @@ private: ::NMonitoring::TDynamicCounters::TCounterPtr DefragDiskCost; ::NMonitoring::TDynamicCounters::TCounterPtr InternalDiskCost; + TBucketQuoter<i64, TSpinLock, THPTimerUs> Bucket; + static constexpr ui64 BucketCapacity = 1'000'000'000; + TLight BurstDetector; + std::atomic<ui64> SeqnoBurstDetector = 0; + public: TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::EDeviceType diskType, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters); @@ -296,38 +302,59 @@ public: } } + void CountRequest(ui64 cost) { + Bucket.Use(cost); + BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1)); + } + public: template<class TEvent> void CountUserRequest(const TEvent& ev) { - *UserDiskCost += GetCost(ev); + ui64 cost = GetCost(ev); + *UserDiskCost += cost; + CountRequest(cost); } void CountUserCost(ui64 cost) { *UserDiskCost += cost; + CountRequest(cost); } template<class TEvent> void CountCompactionRequest(const TEvent& ev) { - *CompactionDiskCost += GetCost(ev); + ui64 cost = GetCost(ev); + *CompactionDiskCost += cost; + CountRequest(cost); } template<class TEvent> void CountScrubRequest(const TEvent& ev) { - *UserDiskCost += GetCost(ev); + ui64 cost = GetCost(ev); + *UserDiskCost += cost; + CountRequest(cost); } template<class TEvent> void CountDefragRequest(const TEvent& ev) { - *DefragDiskCost += GetCost(ev); + ui64 cost = GetCost(ev); + *DefragDiskCost += cost; + CountRequest(cost); } template<class TEvent> void CountInternalRequest(const TEvent& ev) { - *InternalDiskCost += GetCost(ev); + ui64 cost = GetCost(ev); + *InternalDiskCost += cost; + CountRequest(cost); } void CountInternalCost(ui64 cost) { *InternalDiskCost += cost; + CountRequest(cost); + } + + void CountPDiskResponse() { + BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1)); } }; diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h index e208983b08..c4ab516044 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcompact.h @@ -170,6 +170,7 @@ namespace NKikimr { // the same logic for every yard response: apply response and restart main cycle void HandleYardResponse(NPDisk::TEvChunkReadResult::TPtr& ev, const TActorContext &ctx) { --PendingResponses; + HullCtx->VCtx->CostTracker->CountPDiskResponse(); if (ev->Get()->Status != NKikimrProto::CORRUPTED) { CHECK_PDISK_RESPONSE(HullCtx->VCtx, ev, ctx); } @@ -202,6 +203,7 @@ namespace NKikimr { void HandleYardResponse(NPDisk::TEvChunkWriteResult::TPtr& ev, const TActorContext &ctx) { --PendingResponses; + HullCtx->VCtx->CostTracker->CountPDiskResponse(); CHECK_PDISK_RESPONSE(HullCtx->VCtx, ev, ctx); if (FinalizeIfAborting(ctx)) { return; diff --git a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp index ef68ca83e0..e477ce788f 100644 --- a/ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp +++ b/ydb/core/blobstorage/vdisk/scrub/scrub_actor_pdisk.cpp @@ -11,6 +11,7 @@ namespace NKikimr { Send(ScrubCtx->PDiskCtx->PDiskId, msg.release()); CurrentState = TStringBuilder() << "reading data from " << part.ToString(); auto res = WaitForPDiskEvent<NPDisk::TEvChunkReadResult>(); + ScrubCtx->VCtx->CostTracker->CountPDiskResponse(); auto *m = res->Get(); Y_VERIFY_S(m->Status == NKikimrProto::OK || m->Status == NKikimrProto::CORRUPTED, "Status# " << NKikimrProto::EReplyStatus_Name(m->Status)); @@ -41,6 +42,7 @@ namespace NKikimr { Send(ScrubCtx->PDiskCtx->PDiskId, msg.release()); CurrentState = TStringBuilder() << "writing index to " << part.ToString(); auto res = WaitForPDiskEvent<NPDisk::TEvChunkWriteResult>(); + ScrubCtx->VCtx->CostTracker->CountPDiskResponse(); Y_ABORT_UNLESS(res->Get()->Status == NKikimrProto::OK); // FIXME: good logic } diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp index 238a69e391..2447d28501 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp @@ -1632,6 +1632,7 @@ namespace NKikimr { extQueue.Completed(ctx, msgCtx, event); TIntQueueClass &intQueue = GetIntQueue(msgCtx.IntQueueId); intQueue.Completed(ctx, msgCtx, *this, id); + VCtx->CostTracker->CountPDiskResponse(); TActivationContext::Send(event.release()); } diff --git a/ydb/core/util/hp_timer_helpers.h b/ydb/core/util/hp_timer_helpers.h new file mode 100644 index 0000000000..e168b33ce8 --- /dev/null +++ b/ydb/core/util/hp_timer_helpers.h @@ -0,0 +1,64 @@ +#include <util/system/hp_timer.h> +#include <ydb/library/actors/util/datetime.h> + +namespace NKikimr { + +inline NHPTimer::STime HPNow() { + NHPTimer::STime ret; + GetTimeFast(&ret); + return ret; +} + +inline double HPSecondsFloat(i64 cycles) { + if (cycles > 0) { + return double(cycles) / NHPTimer::GetClockRate(); + } else { + return 0.0; + } +} + +inline double HPMilliSecondsFloat(i64 cycles) { + if (cycles > 0) { + return double(cycles) * 1000.0 / NHPTimer::GetClockRate(); + } else { + return 0; + } +} + +inline ui64 HPMilliSeconds(i64 cycles) { + return (ui64)HPMilliSecondsFloat(cycles); +} + +inline ui64 HPMicroSecondsFloat(i64 cycles) { + if (cycles > 0) { + return double(cycles) * 1000000.0 / NHPTimer::GetClockRate(); + } else { + return 0; + } +} + +inline ui64 HPMicroSeconds(i64 cycles) { + return (ui64)HPMicroSecondsFloat(cycles); +} + +inline ui64 HPNanoSeconds(i64 cycles) { + if (cycles > 0) { + return ui64(double(cycles) * 1000000000.0 / NHPTimer::GetClockRate()); + } else { + return 0; + } +} + +inline ui64 HPCyclesNs(ui64 ns) { + return ui64(NHPTimer::GetClockRate() * double(ns) / 1000000000.0); +} + +inline ui64 HPCyclesUs(ui64 us) { + return ui64(NHPTimer::GetClockRate() * double(us) / 1000000.0); +} + +inline ui64 HPCyclesMs(ui64 ms) { + return ui64(NHPTimer::GetClockRate() * double(ms) / 1000.0); +} + +} // namespace NKikimr diff --git a/ydb/core/util/light.h b/ydb/core/util/light.h new file mode 100644 index 0000000000..10a698a45e --- /dev/null +++ b/ydb/core/util/light.h @@ -0,0 +1,213 @@ +#include <ydb/core/mon/mon.h> + +#include "hp_timer_helpers.h" + +namespace NKikimr { + +class TLightBase { +protected: + TString Name; + ::NMonitoring::TDynamicCounters::TCounterPtr State; // Current state (0=OFF=green, 1=ON=red) + ::NMonitoring::TDynamicCounters::TCounterPtr Count; // Number of switches to ON state + ::NMonitoring::TDynamicCounters::TCounterPtr RedMs; // Time elapsed in ON state + ::NMonitoring::TDynamicCounters::TCounterPtr GreenMs; // Time elapsed in OFF state +private: + ui64 RedCycles = 0; + ui64 GreenCycles = 0; + NHPTimer::STime AdvancedTill = 0; + NHPTimer::STime LastNow = 0; + ui64 UpdateThreshold = 0; +public: + void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& name) { + Name = name; + State = counters->GetCounter(name + "_state"); + Count = counters->GetCounter(name + "_count", true); + RedMs = counters->GetCounter(name + "_redMs", true); + GreenMs = counters->GetCounter(name + "_greenMs", true); + UpdateThreshold = HPCyclesMs(100); + AdvancedTill = Now(); + } + + void Initialize(TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, const TString& countName, + const TString& redMsName,const TString& greenMsName) { + Count = counters->GetCounter(countName, true); + RedMs = counters->GetCounter(redMsName, true); + GreenMs = counters->GetCounter(greenMsName, true); + UpdateThreshold = HPCyclesMs(100); + AdvancedTill = Now(); + } + + ui64 GetCount() const { + return *Count; + } + + ui64 GetRedMs() const { + return *RedMs; + } + + ui64 GetGreenMs() const { + return *GreenMs; + } +protected: + void Modify(bool state, bool prevState) { + if (state && !prevState) { // Switched to ON state + if (State) { + *State = true; + } + (*Count)++; + return; + } + if (!state && prevState) { // Switched to OFF state + if (State) { + *State = false; + } + return; + } + } + + void Advance(bool state, NHPTimer::STime now) { + if (now == AdvancedTill) { + return; + } + Elapsed(state, now - AdvancedTill); + if (RedCycles > UpdateThreshold) { + *RedMs += CutMs(RedCycles); + } + if (GreenCycles > UpdateThreshold) { + *GreenMs += CutMs(GreenCycles); + } + AdvancedTill = now; + } + + NHPTimer::STime Now() { + // Avoid time going backwards + NHPTimer::STime now = HPNow(); + if (now < LastNow) { + now = LastNow; + } + LastNow = now; + return now; + } +private: + void Elapsed(bool state, ui64 cycles) { + if (state) { + RedCycles += cycles; + } else { + GreenCycles += cycles; + } + } + + ui64 CutMs(ui64& src) { + ui64 ms = HPMilliSeconds(src); + ui64 cycles = HPCyclesMs(ms); + src -= cycles; + return ms; + } +}; + +// Thread-safe light +class TLight : public TLightBase { +private: + struct TItem { + bool State; + bool Filled; + TItem(bool state = false, bool filled = false) + : State(state) + , Filled(filled) + {} + }; + + // Cyclic buffer to enforce event ordering by seqno + TSpinLock Lock; + size_t HeadIdx = 0; // Index of current state + size_t FilledCount = 0; + ui16 Seqno = 0; // Current seqno + TStackVec<TItem, 32> Data; // In theory should have not more than thread count items +public: + TLight() { + InitData(); + } + + void Set(bool state, ui16 seqno) { + TGuard<TSpinLock> g(Lock); + Push(state, seqno); + bool prevState; + // Note that 'state' variable is being reused + NHPTimer::STime now = Now(); + while (Pop(state, prevState)) { + Modify(state, prevState); + Advance(prevState, now); + } + } + + void Update() { + TGuard<TSpinLock> g(Lock); + Advance(Data[HeadIdx].State, Now()); + } + +private: + void InitData(bool state = false, bool filled = false) { + Data.clear(); + Data.emplace_back(state, filled); + Data.resize(32); + HeadIdx = 0; + } + + void Push(bool state, ui16 seqno) { + FilledCount++; + if (FilledCount == 1) { // First event must initialize seqno + Seqno = seqno; + InitData(state, true); + if (state) { + Modify(true, false); + } + return; + } + Y_ABORT_UNLESS(seqno != Seqno, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d", + (int)Seqno, (int)seqno, (int)state, (int)CountFilled()); + ui16 diff = seqno; + diff -= Seqno; // Underflow is fine + size_t size = Data.size(); + if (size <= diff) { // Buffer is full -- extend and move wrapped part + Data.resize(size * 2); + for (size_t i = 0; i < HeadIdx; i++) { + Data[size + i] = Data[i]; + Data[i].Filled = false; + } + } + TItem& item = Data[(HeadIdx + diff) % Data.size()]; + Y_ABORT_UNLESS(!item.Filled, "ordering overflow or duplicate event headSeqno# %d seqno# %d state# %d filled# %d", + (int)Seqno, (int)seqno, (int)state, (int)CountFilled()); + item.Filled = true; + item.State = state; + } + + bool Pop(bool& state, bool& prevState) { + size_t nextIdx = (HeadIdx + 1) % Data.size(); + TItem& head = Data[HeadIdx]; + TItem& next = Data[nextIdx]; + if (!head.Filled || !next.Filled) { + return false; + } + state = next.State; + prevState = head.State; + head.Filled = false; + HeadIdx = nextIdx; + Seqno++; // Overflow is fine + FilledCount--; + if (FilledCount == 1 && Data.size() > 32) { + InitData(state, true); + } + return true; + } + + size_t CountFilled() const { + size_t ret = 0; + for (const TItem& item : Data) { + ret += item.Filled; + } + return ret; + } +}; + +} // namespace NKikimr |