aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2023-08-07 17:37:26 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2023-08-07 20:32:28 +0300
commit3d7db4ecff29d4855e220be26cd2d993ff7328e0 (patch)
tree78fe1d10fc333092c902ceaafb4b967c0b74e5d3
parentb76e3a18a545bb224d408a16c5e52289cdffbe02 (diff)
downloadydb-3d7db4ecff29d4855e220be26cd2d993ff7328e0.tar.gz
Use relaxed memory order in TPercentileTracker[Lg] KIKIMR-18953
-rw-r--r--library/cpp/monlib/dynamic_counters/percentile/percentile.h19
-rw-r--r--library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h26
-rw-r--r--library/cpp/monlib/dynamic_counters/percentile/percentile_ut.cpp27
-rw-r--r--ydb/core/load_test/group_write.cpp30
4 files changed, 61 insertions, 41 deletions
diff --git a/library/cpp/monlib/dynamic_counters/percentile/percentile.h b/library/cpp/monlib/dynamic_counters/percentile/percentile.h
index 73c482bce9..db40793adf 100644
--- a/library/cpp/monlib/dynamic_counters/percentile/percentile.h
+++ b/library/cpp/monlib/dynamic_counters/percentile/percentile.h
@@ -10,15 +10,15 @@ namespace NMonitoring {
template <size_t BUCKET_SIZE, size_t BUCKET_COUNT, size_t FRAME_COUNT>
struct TPercentileTracker : public TPercentileBase {
- TAtomic Items[BUCKET_COUNT];
- TAtomicBase Frame[FRAME_COUNT][BUCKET_COUNT];
+ std::atomic<size_t> Items[BUCKET_COUNT];
+ size_t Frame[FRAME_COUNT][BUCKET_COUNT];
size_t CurrentFrame;
TPercentileTracker()
: CurrentFrame(0)
{
for (size_t i = 0; i < BUCKET_COUNT; ++i) {
- AtomicSet(Items[i], 0);
+ Items[i].store(0);
}
for (size_t frame = 0; frame < FRAME_COUNT; ++frame) {
for (size_t bucket = 0; bucket < BUCKET_COUNT; ++bucket) {
@@ -28,17 +28,18 @@ struct TPercentileTracker : public TPercentileBase {
}
void Increment(size_t value) {
- AtomicIncrement(Items[Min((value + BUCKET_SIZE - 1) / BUCKET_SIZE, BUCKET_COUNT - 1)]);
+ auto idx = Min((value + BUCKET_SIZE - 1) / BUCKET_SIZE, BUCKET_COUNT - 1);
+ Items[idx].fetch_add(1, std::memory_order_relaxed);
}
// shift frame (call periodically)
void Update() {
- TVector<TAtomicBase> totals(BUCKET_COUNT);
+ TVector<size_t> totals(BUCKET_COUNT);
totals.resize(BUCKET_COUNT);
- TAtomicBase total = 0;
+ size_t total = 0;
for (size_t i = 0; i < BUCKET_COUNT; ++i) {
- TAtomicBase item = AtomicGet(Items[i]);
- TAtomicBase prevItem = Frame[CurrentFrame][i];
+ size_t item = Items[i].load(std::memory_order_relaxed);
+ size_t prevItem = Frame[CurrentFrame][i];
Frame[CurrentFrame][i] = item;
total += item - prevItem;
totals[i] = total;
@@ -46,7 +47,7 @@ struct TPercentileTracker : public TPercentileBase {
for (size_t i = 0; i < Percentiles.size(); ++i) {
TPercentile &percentile = Percentiles[i];
- auto threshold = (TAtomicBase)(percentile.first * (float)total);
+ auto threshold = (size_t)(percentile.first * (float)total);
threshold = Min(threshold, total);
auto it = LowerBound(totals.begin(), totals.end(), threshold);
size_t index = it - totals.begin();
diff --git a/library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h b/library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h
index 0042cd9a6a..e27664ded9 100644
--- a/library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h
+++ b/library/cpp/monlib/dynamic_counters/percentile/percentile_lg.h
@@ -24,8 +24,8 @@ struct TPercentileTrackerLg : public TPercentileBase {
static constexpr size_t MAX_GRANULARITY = size_t(1) << (BUCKET_COUNT - 1);
size_t Borders[BUCKET_COUNT];
- TAtomic Items[ITEMS_COUNT];
- TAtomicBase Frame[FRAME_COUNT][ITEMS_COUNT];
+ std::atomic<size_t> Items[ITEMS_COUNT];
+ size_t Frame[FRAME_COUNT][ITEMS_COUNT];
size_t CurrentFrame;
TPercentileTrackerLg()
@@ -36,7 +36,7 @@ struct TPercentileTrackerLg : public TPercentileBase {
Borders[i] = Borders[i-1] + (BUCKET_SIZE << (i - 1));
}
for (size_t i = 0; i < ITEMS_COUNT; ++i) {
- AtomicSet(Items[i], 0);
+ Items[i].store(0);
}
for (size_t frame = 0; frame < FRAME_COUNT; ++frame) {
for (size_t bucket = 0; bucket < ITEMS_COUNT; ++bucket) {
@@ -135,18 +135,18 @@ struct TPercentileTrackerLg : public TPercentileBase {
size_t bucket_idx = BucketIdxMostSignificantBit(value);
size_t inside_bucket_idx = (value - Borders[bucket_idx] + (1 << bucket_idx) - 1) >> bucket_idx;
size_t idx = bucket_idx * BUCKET_SIZE + inside_bucket_idx;
- AtomicIncrement(Items[Min(idx, ITEMS_COUNT - 1)]);
+ Items[Min(idx, ITEMS_COUNT - 1)].fetch_add(1, std::memory_order_relaxed);
}
// Needed only for tests
size_t GetPercentile(float threshold) {
- TStackVec<TAtomicBase, ITEMS_COUNT> totals(ITEMS_COUNT);
- TAtomicBase total = 0;
+ TStackVec<size_t, ITEMS_COUNT> totals(ITEMS_COUNT);
+ size_t total = 0;
for (size_t i = 0; i < ITEMS_COUNT; ++i) {
- total += AtomicGet(Items[i]);
+ total += Items[i].load();
totals[i] = total;
}
- TAtomicBase item_threshold = std::llround(threshold * (float)total);
+ size_t item_threshold = std::llround(threshold * (float)total);
item_threshold = Min(item_threshold, total);
auto it = LowerBound(totals.begin(), totals.end(), item_threshold);
size_t index = it - totals.begin();
@@ -156,11 +156,11 @@ struct TPercentileTrackerLg : public TPercentileBase {
// shift frame (call periodically)
void Update() {
- TStackVec<TAtomicBase, ITEMS_COUNT> totals(ITEMS_COUNT);
- TAtomicBase total = 0;
+ TStackVec<size_t, ITEMS_COUNT> totals(ITEMS_COUNT);
+ size_t total = 0;
for (size_t i = 0; i < ITEMS_COUNT; ++i) {
- TAtomicBase item = AtomicGet(Items[i]);
- TAtomicBase prevItem = Frame[CurrentFrame][i];
+ size_t item = Items[i].load(std::memory_order_relaxed);
+ size_t prevItem = Frame[CurrentFrame][i];
Frame[CurrentFrame][i] = item;
total += item - prevItem;
totals[i] = total;
@@ -168,7 +168,7 @@ struct TPercentileTrackerLg : public TPercentileBase {
for (size_t i = 0; i < Percentiles.size(); ++i) {
TPercentile &percentile = Percentiles[i];
- TAtomicBase threshold = std::llround(percentile.first * (float)total);
+ size_t threshold = std::llround(percentile.first * (float)total);
threshold = Min(threshold, total);
auto it = LowerBound(totals.begin(), totals.end(), threshold);
size_t index = it - totals.begin();
diff --git a/library/cpp/monlib/dynamic_counters/percentile/percentile_ut.cpp b/library/cpp/monlib/dynamic_counters/percentile/percentile_ut.cpp
index 6c8bb54ec9..b8bdd41028 100644
--- a/library/cpp/monlib/dynamic_counters/percentile/percentile_ut.cpp
+++ b/library/cpp/monlib/dynamic_counters/percentile/percentile_ut.cpp
@@ -1,27 +1,46 @@
+#include "percentile.h"
#include "percentile_lg.h"
#include <library/cpp/testing/unittest/registar.h>
+#include <util/datetime/cputimer.h>
using namespace NMonitoring;
Y_UNIT_TEST_SUITE(PercentileTest) {
template<size_t A, size_t B, size_t B_BEGIN>
-void printSizeAndLimit() {
+void PrintSizeAndLimit() {
using TPerc = TPercentileTrackerLg<A, B, 15>;
Cout << "TPercentileTrackerLg<" << A << ", " << B << ", 15>"
<< "; sizeof# " << LeftPad(HumanReadableSize(sizeof(TPerc), SF_BYTES), 7)
<< "; max_granularity# " << LeftPad(HumanReadableSize(TPerc::MAX_GRANULARITY, SF_QUANTITY), 5)
<< "; limit# " << LeftPad(HumanReadableSize(TPerc::TRACKER_LIMIT , SF_QUANTITY), 5) << Endl;
if constexpr (B > 1) {
- printSizeAndLimit<A, B - 1, B_BEGIN>();
+ PrintSizeAndLimit<A, B - 1, B_BEGIN>();
} else if constexpr (A > 1) {
Cout << Endl;
- printSizeAndLimit<A - 1, B_BEGIN, B_BEGIN>();
+ PrintSizeAndLimit<A - 1, B_BEGIN, B_BEGIN>();
}
}
Y_UNIT_TEST(PrintTrackerLgSizeAndLimits) {
- printSizeAndLimit<10, 5, 5>();
+ PrintSizeAndLimit<10, 5, 5>();
+ }
+
+template<class T>
+void RunPerf() {
+ TTimer t(TypeName<T>() + "\n");
+ T tracker;
+ for (size_t i = 0; i < 1000000; ++i) {
+ for (size_t i = 0; i < 10; ++i) {
+ tracker.Increment(i * 6451);
+ }
+ tracker.Update();
+ }
+}
+
+ Y_UNIT_TEST(TrackerPerf) {
+ RunPerf<TPercentileTracker<4, 512, 15>>();
+ RunPerf<TPercentileTrackerLg<4, 3, 15>>();
}
Y_UNIT_TEST(TrackerLimitTest) {
diff --git a/ydb/core/load_test/group_write.cpp b/ydb/core/load_test/group_write.cpp
index f448c0afde..da73831e6c 100644
--- a/ydb/core/load_test/group_write.cpp
+++ b/ydb/core/load_test/group_write.cpp
@@ -121,7 +121,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
ui64 TotalBytesRead = 0;
TSpeedTracker<ui64> MegabytesPerSecondST;
TQuantileTracker<ui64> MegabytesPerSecondQT;
- TLatencyTrackerUs ResponseQT;
+ std::unique_ptr<TLatencyTrackerUs> ResponseQT;
THashMap<ui64, ui64> SentTimestamp;
TDeque<std::pair<ui64, ui64>> WritesInFlightTimestamps;
TIntrusivePtr<NMonitoring::TCounterForPtr> MaxInFlightLatency;
@@ -139,7 +139,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
THashMap<ui64, ui64> ReadSentTimestamp;
TSpeedTracker<ui64> ReadMegabytesPerSecondST;
TQuantileTracker<ui64> ReadMegabytesPerSecondQT;
- TLatencyTrackerUs ReadResponseQT;
+ std::unique_ptr<TLatencyTrackerUs> ReadResponseQT;
bool NextWriteInQueue = false;
bool NextReadInQueue = false;
bool IsWorkingNow = true;
@@ -219,8 +219,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
*Counters->GetCounter("tabletId") = tabletId;
const auto& percCounters = Counters->GetSubgroup("sensor", "microseconds");
MaxInFlightLatency = percCounters->GetCounter("MaxInFlightLatency");
- ResponseQT.Initialize(percCounters->GetSubgroup("metric", "writeResponse"), Percentiles);
- ReadResponseQT.Initialize(percCounters->GetSubgroup("metric", "readResponse"), Percentiles);
+ ResponseQT->Initialize(percCounters->GetSubgroup("metric", "writeResponse"), Percentiles);
+ ReadResponseQT->Initialize(percCounters->GetSubgroup("metric", "readResponse"), Percentiles);
}
TString PrintMe() {
@@ -382,8 +382,8 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
ReadBytesInFlightQT.Add(now, ReadBytesInFlight);
if (now > LastLatencyTrackerUpdate + TDuration::Seconds(1)) {
LastLatencyTrackerUpdate = now;
- ResponseQT.Update();
- ReadResponseQT.Update();
+ ResponseQT->Update();
+ ReadResponseQT->Update();
if (WritesInFlightTimestamps) {
const auto& maxLatency = CyclesToDuration(GetCycleCountFast() - WritesInFlightTimestamps.front().second);
*MaxInFlightLatency = maxLatency.MicroSeconds();
@@ -576,7 +576,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
Y_VERIFY(itInFlight != WritesInFlightTimestamps.end());
WritesInFlightTimestamps.erase(itInFlight);
- ResponseQT.Increment(response.MicroSeconds());
+ ResponseQT->Increment(response.MicroSeconds());
IssueWriteIfPossible(ctx);
if (ConfirmedBlobIds.size() == 1) {
@@ -719,7 +719,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
const TDuration response = CyclesToDuration(GetCycleCountFast() - it->second);
ReadSentTimestamp.erase(it);
- ReadResponseQT.Increment(response.MicroSeconds());
+ ReadResponseQT->Increment(response.MicroSeconds());
IssueReadIfPossible(ctx);
};
@@ -753,7 +753,7 @@ class TLogWriterLoadTestActor : public TActorBootstrapped<TLogWriterLoadTestActo
TInstant TestStartTime;
bool EarlyStop = false;
- TVector<TTabletWriter> TabletWriters;
+ TVector<std::unique_ptr<TTabletWriter>> TabletWriters;
TWakeupQueue WakeupQueue;
TDeque<TInstant> WakeupScheduledAt;
@@ -831,12 +831,12 @@ public:
if (!tablet.HasTabletId() || !tablet.HasChannel() || !tablet.HasGroupId()) {
ythrow TLoadActorException() << "TTabletInfo.{TabletId,Channel,GroupId} fields are mandatory";
}
- TabletWriters.emplace_back(Tag, counters, WakeupQueue, QueryDispatcher, tablet.GetTabletId(),
+ TabletWriters.emplace_back(std::make_unique<TTabletWriter>(Tag, counters, WakeupQueue, QueryDispatcher, tablet.GetTabletId(),
tablet.GetChannel(), tablet.HasGeneration() ? TMaybe<ui32>(tablet.GetGeneration()) : TMaybe<ui32>(),
tablet.GetGroupId(), putHandleClass, writeSizeGen, writeIntervalGen, garbageCollectIntervalGen,
maxWritesInFlight, maxWriteBytesInFlight, maxTotalBytesWritten,
getHandleClass, readSizeGen, readIntervalGen,
- maxReadsInFlight, maxReadBytesInFlight, scriptedRoundDuration, std::move(scriptedRequests));
+ maxReadsInFlight, maxReadBytesInFlight, scriptedRoundDuration, std::move(scriptedRequests)));
}
}
}
@@ -849,7 +849,7 @@ public:
ctx.Schedule(*TestDuration, new TEvents::TEvPoisonPill());
}
for (auto& writer : TabletWriters) {
- writer.Bootstrap(ctx);
+ writer->Bootstrap(ctx);
}
HandleWakeup(ctx);
HandleUpdateQuantile(ctx);
@@ -861,7 +861,7 @@ public:
}
LOG_DEBUG_S(ctx, NKikimrServices::BS_LOAD_TEST, "Load tablet recieved PoisonPill, going to die");
for (auto& writer : TabletWriters) {
- writer.StopWorking(ctx); // Sends TEvStopTest then all garbage is collected
+ writer->StopWorking(ctx); // Sends TEvStopTest then all garbage is collected
}
}
@@ -894,7 +894,7 @@ public:
void HandleUpdateQuantile(const TActorContext& ctx) {
TInstant now = TAppData::TimeProvider->Now();
for (auto& writer : TabletWriters) {
- writer.UpdateQuantile(now);
+ writer->UpdateQuantile(now);
}
ctx.Schedule(TDuration::MilliSeconds(5), new TEvUpdateQuantile);
}
@@ -958,7 +958,7 @@ public:
TABLER() {
str << "<td colspan=\"2\">" << "<b>Tablet</b>" << "</td>";
}
- writer.DumpState(str, finalResult);
+ writer->DumpState(str, finalResult);
}
}
}