diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2023-08-07 17:37:26 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2023-08-07 20:32:28 +0300 |
commit | 3d7db4ecff29d4855e220be26cd2d993ff7328e0 (patch) | |
tree | 78fe1d10fc333092c902ceaafb4b967c0b74e5d3 | |
parent | b76e3a18a545bb224d408a16c5e52289cdffbe02 (diff) | |
download | ydb-3d7db4ecff29d4855e220be26cd2d993ff7328e0.tar.gz |
Use relaxed memory order in TPercentileTracker[Lg] KIKIMR-18953
4 files changed, 61 insertions, 41 deletions
diff --git a/library/cpp/monlib/dynamic_counters/percentile/percentile.h b/library/cpp/monlib/dynamic_counters/percentile/percentile.h index 73c482bce9..db40793adf 100644 --- a/library/cpp/monlib/dynamic_counters/percentile/percentile.h +++ b/library/cpp/monlib/dynamic_counters/percentile/percentile.h @@ -10,15 +10,15 @@ namespace NMonitoring { template <size_t BUCKET_SIZE, size_t BUCKET_COUNT, size_t FRAME_COUNT> struct TPercentileTracker : public TPercentileBase { - TAtomic Items[BUCKET_COUNT]; - TAtomicBase Frame[FRAME_COUNT][BUCKET_COUNT]; + std::atomic<size_t> Items[BUCKET_COUNT]; + size_t Frame[FRAME_COUNT][BUCKET_COUNT]; size_t CurrentFrame; TPercentileTracker() : CurrentFrame(0) { for (size_t i = 0; i < BUCKET_COUNT; ++i) { - AtomicSet(Items[i], 0); + Items[i].store(0); } for (size_t frame = 0; frame < FRAME_COUNT; ++frame) { for (size_t bucket = 0; bucket < BUCKET_COUNT; ++bucket) { @@ -28,17 +28,18 @@ struct TPercentileTracker : public TPercentileBase { } void Increment(size_t value) { - AtomicIncrement(Items[Min((value + BUCKET_SIZE - 1) / BUCKET_SIZE, BUCKET_COUNT - 1)]); + auto idx = Min((value + BUCKET_SIZE - 1) / BUCKET_SIZE, BUCKET_COUNT - 1); + Items[idx].fetch_add(1, std::memory_order_relaxed); } // shift frame (call periodically) void Update() { - TVector<TAtomicBase> totals(BUCKET_COUNT); + TVector<size_t> totals(BUCKET_COUNT); totals.resize(BUCKET_COUNT); - TAtomicBase total = 0; + size_t total = 0; for (size_t i = 0; i < BUCKET_COUNT; ++i) { - TAtomicBase item = AtomicGet(Items[i]); - TAtomicBase prevItem = Frame[CurrentFrame][i]; + size_t item = Items[i].load(std::memory_order_relaxed); + size_t prevItem = Frame[CurrentFrame][i]; Frame[CurrentFrame][i] = item; total += item - prevItem; totals[i] = total; @@ -46,7 +47,7 @@ struct TPercentileTracker : public TPercentileBase { for (size_t i = 0; i < Percentiles.size(); ++i) { TPercentile &percentile = Percentiles[i]; - auto threshold = (TAtomicBase)(percentile.first * (float)total); + auto threshold = (size_t)(percentile.first * (float)total); threshold = Min(threshold, total); auto it = LowerBound(totals.begin(), totals.end(), threshold); size_t index = it - totals.begin(); diff --git a/library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h b/library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h index 0042cd9a6a..e27664ded9 100644 --- a/library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h +++ b/library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h @@ -24,8 +24,8 @@ struct TPercentileTrackerLg : public TPercentileBase { static constexpr size_t MAX_GRANULARITY = size_t(1) << (BUCKET_COUNT - 1); size_t Borders[BUCKET_COUNT]; - TAtomic Items[ITEMS_COUNT]; - TAtomicBase Frame[FRAME_COUNT][ITEMS_COUNT]; + std::atomic<size_t> Items[ITEMS_COUNT]; + size_t Frame[FRAME_COUNT][ITEMS_COUNT]; size_t CurrentFrame; TPercentileTrackerLg() @@ -36,7 +36,7 @@ struct TPercentileTrackerLg : public TPercentileBase { Borders[i] = Borders[i-1] + (BUCKET_SIZE << (i - 1)); } for (size_t i = 0; i < ITEMS_COUNT; ++i) { - AtomicSet(Items[i], 0); + Items[i].store(0); } for (size_t frame = 0; frame < FRAME_COUNT; ++frame) { for (size_t bucket = 0; bucket < ITEMS_COUNT; ++bucket) { @@ -135,18 +135,18 @@ struct TPercentileTrackerLg : public TPercentileBase { size_t bucket_idx = BucketIdxMostSignificantBit(value); size_t inside_bucket_idx = (value - Borders[bucket_idx] + (1 << bucket_idx) - 1) >> bucket_idx; size_t idx = bucket_idx * BUCKET_SIZE + inside_bucket_idx; - AtomicIncrement(Items[Min(idx, ITEMS_COUNT - 1)]); + Items[Min(idx, ITEMS_COUNT - 1)].fetch_add(1, std::memory_order_relaxed); } // Needed only for tests size_t GetPercentile(float threshold) { - TStackVec<TAtomicBase, ITEMS_COUNT> totals(ITEMS_COUNT); - TAtomicBase total = 0; + TStackVec<size_t, ITEMS_COUNT> totals(ITEMS_COUNT); + size_t total = 0; for (size_t i = 0; i < ITEMS_COUNT; ++i) { - total += AtomicGet(Items[i]); + total += Items[i].load(); totals[i] = total; } - TAtomicBase item_threshold = std::llround(threshold * (float)total); + size_t item_threshold = std::llround(threshold * (float)total); item_threshold = Min(item_threshold, total); auto it = LowerBound(totals.begin(), totals.end(), item_threshold); size_t index = it - totals.begin(); @@ -156,11 +156,11 @@ struct TPercentileTrackerLg : public TPercentileBase { // shift frame (call periodically) void Update() { - TStackVec<TAtomicBase, ITEMS_COUNT> totals(ITEMS_COUNT); - TAtomicBase total = 0; + TStackVec<size_t, ITEMS_COUNT> totals(ITEMS_COUNT); + size_t total = 0; for (size_t i = 0; i < ITEMS_COUNT; ++i) { - TAtomicBase item = AtomicGet(Items[i]); - TAtomicBase prevItem = Frame[CurrentFrame][i]; + size_t item = Items[i].load(std::memory_order_relaxed); + size_t prevItem = Frame[CurrentFrame][i]; Frame[CurrentFrame][i] = item; total += item - prevItem; totals[i] = total; @@ -168,7 +168,7 @@ struct TPercentileTrackerLg : public TPercentileBase { for (size_t i = 0; i < Percentiles.size(); ++i) { TPercentile &percentile = Percentiles[i]; - TAtomicBase threshold = std::llround(percentile.first * (float)total); + size_t threshold = std::llround(percentile.first * (float)total); threshold = Min(threshold, total); auto it = LowerBound(totals.begin(), totals.end(), threshold); size_t index = it - totals.begin(); diff --git a/library/cpp/monlib/dynamic_counters/percentile/percentile_ut.cpp b/library/cpp/monlib/dynamic_counters/percentile/percentile_ut.cpp index 6c8bb54ec9..b8bdd41028 100644 --- a/library/cpp/monlib/dynamic_counters/percentile/percentile_ut.cpp +++ b/library/cpp/monlib/dynamic_counters/percentile/percentile_ut.cpp @@ -1,27 +1,46 @@ +#include "percentile.h" #include "percentile_lg.h" #include <library/cpp/testing/unittest/registar.h> +#include <util/datetime/cputimer.h> using namespace NMonitoring; Y_UNIT_TEST_SUITE(PercentileTest) { template<size_t A, size_t B, size_t B_BEGIN> -void printSizeAndLimit() { +void PrintSizeAndLimit() { using TPerc = TPercentileTrackerLg<A, B, 15>; Cout << "TPercentileTrackerLg<" << A << ", " << B << ", 15>" << "; sizeof# " << LeftPad(HumanReadableSize(sizeof(TPerc), SF_BYTES), 7) << "; max_granularity# " << LeftPad(HumanReadableSize(TPerc::MAX_GRANULARITY, SF_QUANTITY), 5) << "; limit# " << LeftPad(HumanReadableSize(TPerc::TRACKER_LIMIT , SF_QUANTITY), 5) << Endl; if constexpr (B > 1) { - printSizeAndLimit<A, B - 1, B_BEGIN>(); + PrintSizeAndLimit<A, B - 1, B_BEGIN>(); } else if constexpr (A > 1) { Cout << Endl; - printSizeAndLimit<A - 1, B_BEGIN, B_BEGIN>(); + PrintSizeAndLimit<A - 1, B_BEGIN, B_BEGIN>(); } } Y_UNIT_TEST(PrintTrackerLgSizeAndLimits) { - printSizeAndLimit<10, 5, 5>(); + PrintSizeAndLimit<10, 5, 5>(); + } + +template<class T> +void RunPerf() { + TTimer t(TypeName<T>() + "\n"); + T tracker; + for (size_t i = 0; i < 1000000; ++i) { + for (size_t i = 0; i < 10; ++i) { + tracker.Increment(i * 6451); + } + tracker.Update(); + } +} + + Y_UNIT_TEST(TrackerPerf) { + RunPerf<TPercentileTracker<4, 512, 15>>(); + RunPerf<TPercentileTrackerLg<4, 3, 15>>(); } Y_UNIT_TEST(TrackerLimitTest) { diff --git a/ydb/core/load_test/group_write.cpp b/ydb/core/load_test/group_write.cpp index f448c0afde..da73831e6c 100644 --- a/ydb/core/load_test/group_write.cpp +++ b/ydb/core/load_test/group_write.cpp @@ -121,7 +121,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo ui64 TotalBytesRead = 0; TSpeedTracker<ui64> MegabytesPerSecondST; TQuantileTracker<ui64> MegabytesPerSecondQT; - TLatencyTrackerUs ResponseQT; + std::unique_ptr<TLatencyTrackerUs> ResponseQT; THashMap<ui64, ui64> SentTimestamp; TDeque<std::pair<ui64, ui64>> WritesInFlightTimestamps; TIntrusivePtr<NMonitoring::TCounterForPtr> MaxInFlightLatency; @@ -139,7 +139,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo THashMap<ui64, ui64> ReadSentTimestamp; TSpeedTracker<ui64> ReadMegabytesPerSecondST; TQuantileTracker<ui64> ReadMegabytesPerSecondQT; - TLatencyTrackerUs ReadResponseQT; + std::unique_ptr<TLatencyTrackerUs> ReadResponseQT; bool NextWriteInQueue = false; bool NextReadInQueue = false; bool IsWorkingNow = true; @@ -219,8 +219,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo *Counters->GetCounter("tabletId") = tabletId; const auto& percCounters = Counters->GetSubgroup("sensor", "microseconds"); MaxInFlightLatency = percCounters->GetCounter("MaxInFlightLatency"); - ResponseQT.Initialize(percCounters->GetSubgroup("metric", "writeResponse"), Percentiles); - ReadResponseQT.Initialize(percCounters->GetSubgroup("metric", "readResponse"), Percentiles); + ResponseQT->Initialize(percCounters->GetSubgroup("metric", "writeResponse"), Percentiles); + ReadResponseQT->Initialize(percCounters->GetSubgroup("metric", "readResponse"), Percentiles); } TString PrintMe() { @@ -382,8 +382,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo ReadBytesInFlightQT.Add(now, ReadBytesInFlight); if (now > LastLatencyTrackerUpdate + TDuration::Seconds(1)) { LastLatencyTrackerUpdate = now; - ResponseQT.Update(); - ReadResponseQT.Update(); + ResponseQT->Update(); + ReadResponseQT->Update(); if (WritesInFlightTimestamps) { const auto& maxLatency = CyclesToDuration(GetCycleCountFast() - WritesInFlightTimestamps.front().second); *MaxInFlightLatency = maxLatency.MicroSeconds(); @@ -576,7 +576,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo Y_VERIFY(itInFlight != WritesInFlightTimestamps.end()); WritesInFlightTimestamps.erase(itInFlight); - ResponseQT.Increment(response.MicroSeconds()); + ResponseQT->Increment(response.MicroSeconds()); IssueWriteIfPossible(ctx); if (ConfirmedBlobIds.size() == 1) { @@ -719,7 +719,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo const TDuration response = CyclesToDuration(GetCycleCountFast() - it->second); ReadSentTimestamp.erase(it); - ReadResponseQT.Increment(response.MicroSeconds()); + ReadResponseQT->Increment(response.MicroSeconds()); IssueReadIfPossible(ctx); }; @@ -753,7 +753,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo TInstant TestStartTime; bool EarlyStop = false; - TVector<TTabletWriter> TabletWriters; + TVector<std::unique_ptr<TTabletWriter>> TabletWriters; TWakeupQueue WakeupQueue; TDeque<TInstant> WakeupScheduledAt; @@ -831,12 +831,12 @@ public: if (!tablet.HasTabletId() || !tablet.HasChannel() || !tablet.HasGroupId()) { ythrow TLoadActorException() << "TTabletInfo.{TabletId,Channel,GroupId} fields are mandatory"; } - TabletWriters.emplace_back(Tag, counters, WakeupQueue, QueryDispatcher, tablet.GetTabletId(), + TabletWriters.emplace_back(std::make_unique<TTabletWriter>(Tag, counters, WakeupQueue, QueryDispatcher, tablet.GetTabletId(), tablet.GetChannel(), tablet.HasGeneration() ? TMaybe<ui32>(tablet.GetGeneration()) : TMaybe<ui32>(), tablet.GetGroupId(), putHandleClass, writeSizeGen, writeIntervalGen, garbageCollectIntervalGen, maxWritesInFlight, maxWriteBytesInFlight, maxTotalBytesWritten, getHandleClass, readSizeGen, readIntervalGen, - maxReadsInFlight, maxReadBytesInFlight, scriptedRoundDuration, std::move(scriptedRequests)); + maxReadsInFlight, maxReadBytesInFlight, scriptedRoundDuration, std::move(scriptedRequests))); } } } @@ -849,7 +849,7 @@ public: ctx.Schedule(*TestDuration, new TEvents::TEvPoisonPill()); } for (auto& writer : TabletWriters) { - writer.Bootstrap(ctx); + writer->Bootstrap(ctx); } HandleWakeup(ctx); HandleUpdateQuantile(ctx); @@ -861,7 +861,7 @@ public: } LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Load tablet recieved PoisonPill, going to die"); for (auto& writer : TabletWriters) { - writer.StopWorking(ctx); // Sends TEvStopTest then all garbage is collected + writer->StopWorking(ctx); // Sends TEvStopTest then all garbage is collected } } @@ -894,7 +894,7 @@ public: void HandleUpdateQuantile(const TActorContext& ctx) { TInstant now = TAppData::TimeProvider->Now(); for (auto& writer : TabletWriters) { - writer.UpdateQuantile(now); + writer->UpdateQuantile(now); } ctx.Schedule(TDuration::MilliSeconds(5), new TEvUpdateQuantile); } @@ -958,7 +958,7 @@ public: TABLER() { str << "<td colspan=\"2\">" << "<b>Tablet</b>" << "</td>"; } - writer.DumpState(str, finalResult); + writer->DumpState(str, finalResult); } } } |