aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbabenko <babenko@yandex-team.com>2025-03-04 08:56:40 +0300
committerbabenko <babenko@yandex-team.com>2025-03-04 09:43:19 +0300
commit875cc9c0de3e65ec0e587e991c825c3f2c173f21 (patch)
treebbe22ee79dcdd22821b3219f734c5537ab383de4
parent3c6c39e9f8f3e2666d34217cc27fce7f0302fcda (diff)
downloadydb-875cc9c0de3e65ec0e587e991c825c3f2c173f21.tar.gz
YT-18571: Cosmetics in fair throttler
commit_hash:942f79488214afdd3203f96192b9efc78afecc30
-rw-r--r--yt/yt/core/concurrency/fair_throttler.cpp114
-rw-r--r--yt/yt/core/concurrency/fair_throttler.h30
-rw-r--r--yt/yt/core/concurrency/public.h4
-rw-r--r--yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp16
4 files changed, 82 insertions, 82 deletions
diff --git a/yt/yt/core/concurrency/fair_throttler.cpp b/yt/yt/core/concurrency/fair_throttler.cpp
index 3899c949e7f..9a5cf3c6972 100644
--- a/yt/yt/core/concurrency/fair_throttler.cpp
+++ b/yt/yt/core/concurrency/fair_throttler.cpp
@@ -35,7 +35,7 @@ void TFairThrottlerConfig::Register(TRegistrar registrar)
registrar.Parameter("global_accumulation_ticks", &TThis::GlobalAccumulationTicks)
.Default(5);
- registrar.Parameter("ipc_path", &TThis::IPCPath)
+ registrar.Parameter("ipc_path", &TThis::IpcPath)
.Default();
}
@@ -95,27 +95,27 @@ static constexpr TStringBuf BucketsFileName = "buckets.v1";
////////////////////////////////////////////////////////////////////////////////
-class TFileIPCBucket
- : public IIPCBucket
+class TFileIpcBucket
+ : public IIpcBucket
{
public:
- TFileIPCBucket(const TString& path, bool create)
+ TFileIpcBucket(const TString& path, bool create)
: File_(path, OpenAlways | RdWr)
{
if (create) {
File_.Flock(LOCK_EX | LOCK_NB);
}
- File_.Resize(sizeof(TBucket));
+ File_.Resize(sizeof(TState));
Map_ = std::make_unique<TFileMap>(File_, TMemoryMapCommon::oRdWr);
Map_->Map(0, Map_->Length());
LockMemory(Map_->Ptr(), Map_->Length());
}
- TBucket* State() override
+ TState* GetState() override
{
- return reinterpret_cast<TBucket*>(Map_->Ptr());
+ return reinterpret_cast<TState*>(Map_->Ptr());
}
private:
@@ -123,15 +123,15 @@ private:
std::unique_ptr<TFileMap> Map_;
};
-DEFINE_REFCOUNTED_TYPE(TFileIPCBucket)
+DEFINE_REFCOUNTED_TYPE(TFileIpcBucket)
////////////////////////////////////////////////////////////////////////////////
-class TFileThrottlerIPC
- : public IThrottlerIPC
+class TFileThrottlerIpc
+ : public IThrottlerIpc
{
public:
- explicit TFileThrottlerIPC(const TString& path)
+ explicit TFileThrottlerIpc(const TString& path)
: Path_(path)
{
NFS::MakeDirRecursive(Path_);
@@ -166,7 +166,7 @@ public:
return reinterpret_cast<TSharedBucket*>(SharedBucketMap_->Ptr());
}
- std::vector<IIPCBucketPtr> ListBuckets() override
+ std::vector<IIpcBucketPtr> ListBuckets() override
{
Lock_->Acquire();
auto release = Finally([this] {
@@ -175,14 +175,14 @@ public:
Reload();
- std::vector<IIPCBucketPtr> buckets;
+ std::vector<IIpcBucketPtr> buckets;
for (const auto& bucket : OpenBuckets_) {
buckets.push_back(bucket.second);
}
return buckets;
}
- IIPCBucketPtr AddBucket() override
+ IIpcBucketPtr AddBucket() override
{
Lock_->Acquire();
auto release = Finally([this] {
@@ -191,7 +191,7 @@ public:
auto id = TGuid::Create();
OwnedBuckets_.insert(ToString(id));
- return New<TFileIPCBucket>(Path_ + "/" + BucketsFileName + "/" + ToString(id), true);
+ return New<TFileIpcBucket>(Path_ + "/" + BucketsFileName + "/" + ToString(id), true);
}
private:
@@ -202,8 +202,8 @@ private:
TFile SharedBucketFile_;
std::unique_ptr<TFileMap> SharedBucketMap_;
- THashMap<TString, IIPCBucketPtr> OpenBuckets_;
- THashSet<TString> OwnedBuckets_;
+ THashMap<std::string, IIpcBucketPtr> OpenBuckets_;
+ THashSet<std::string> OwnedBuckets_;
void Reload()
{
@@ -230,7 +230,7 @@ private:
continue;
}
- OpenBuckets_[fileName] = New<TFileIPCBucket>(bucketPath, false);
+ OpenBuckets_[fileName] = New<TFileIpcBucket>(bucketPath, false);
} catch (const TSystemError& ex) {
continue;
}
@@ -238,12 +238,12 @@ private:
}
};
-IThrottlerIPCPtr CreateFileThrottlerIPC(const TString& path)
+IThrottlerIpcPtr CreateFileThrottlerIpc(const TString& path)
{
- return New<TFileThrottlerIPC>(path);
+ return New<TFileThrottlerIpc>(path);
}
-DEFINE_REFCOUNTED_TYPE(TFileThrottlerIPC)
+DEFINE_REFCOUNTED_TYPE(TFileThrottlerIpc)
////////////////////////////////////////////////////////////////////////////////
@@ -623,12 +623,12 @@ TFairThrottler::TFairThrottler(
, SharedBucket_(New<TSharedBucket>(config->GlobalAccumulationTicks, Profiler_))
, Config_(std::move(config))
{
- if (Config_->IPCPath) {
- IPC_ = New<TFileThrottlerIPC>(*Config_->IPCPath);
+ if (Config_->IpcPath) {
+ Ipc_ = New<TFileThrottlerIpc>(*Config_->IpcPath);
SharedBucket_->Limit.Value = std::shared_ptr<std::atomic<i64>>(
- &IPC_->State()->Value,
- [ipc = IPC_] (auto /*ptr*/) { });
+ &Ipc_->State()->Value,
+ [state = Ipc_] (auto /*ptr*/) { });
Profiler_.AddFuncGauge("/leader", MakeStrong(this), [this] {
return IsLeader_.load();
@@ -646,7 +646,7 @@ TFairThrottler::TFairThrottler(
}
IThroughputThrottlerPtr TFairThrottler::CreateBucketThrottler(
- const TString& name,
+ const std::string& name,
TFairThrottlerBucketConfigPtr config)
{
if (!config) {
@@ -670,15 +670,15 @@ IThroughputThrottlerPtr TFairThrottler::CreateBucketThrottler(
throttler->SetLimited(config->Limit || config->RelativeLimit);
- IIPCBucketPtr ipc;
- if (IPC_) {
- ipc = IPC_->AddBucket();
+ IIpcBucketPtr state;
+ if (Ipc_) {
+ state = Ipc_->AddBucket();
}
Buckets_[name] = TBucket{
.Config = std::move(config),
.Throttler = throttler,
- .IPC = ipc,
+ .Ipc = state,
};
return throttler;
@@ -686,7 +686,7 @@ IThroughputThrottlerPtr TFairThrottler::CreateBucketThrottler(
void TFairThrottler::Reconfigure(
TFairThrottlerConfigPtr config,
- const THashMap<TString, TFairThrottlerBucketConfigPtr>& buckets)
+ const THashMap<std::string, TFairThrottlerBucketConfigPtr>& buckets)
{
for (const auto& [name, config] : buckets) {
CreateBucketThrottler(name, config);
@@ -712,7 +712,7 @@ void TFairThrottler::DoUpdateLeader()
std::vector<TBucketThrottler::TBucketState> states;
states.reserve(Buckets_.size());
- THashMap<TString, i64> bucketDemands;
+ THashMap<std::string, i64> bucketDemands;
for (const auto& [name, bucket] : Buckets_) {
auto state = bucket.Throttler->Peek();
@@ -736,12 +736,12 @@ void TFairThrottler::DoUpdateLeader()
states.push_back(state);
}
- std::vector<IIPCBucketPtr> remoteBuckets;
- if (IPC_) {
- remoteBuckets = IPC_->ListBuckets();
+ std::vector<IIpcBucketPtr> remoteBuckets;
+ if (Ipc_) {
+ remoteBuckets = Ipc_->ListBuckets();
for (const auto& remote : remoteBuckets) {
- auto state = remote->State();
+ auto state = remote->GetState();
weights.push_back(state->Weight.load());
@@ -771,9 +771,9 @@ void TFairThrottler::DoUpdateLeader()
}
auto freeIncome = ComputeFairDistribution(freeQuota, weights, demands, limits);
- THashMap<TString, i64> bucketUsage;
- THashMap<TString, i64> bucketIncome;
- THashMap<TString, i64> bucketQuota;
+ THashMap<std::string, i64> bucketUsage;
+ THashMap<std::string, i64> bucketIncome;
+ THashMap<std::string, i64> bucketQuota;
i64 leakedQuota = 0;
int i = 0;
@@ -792,7 +792,7 @@ void TFairThrottler::DoUpdateLeader()
}
for (const auto& remote : remoteBuckets) {
- auto state = remote->State();
+ auto state = remote->GetState();
state->InFlow += tickIncome[i] + freeIncome[i];
@@ -828,41 +828,41 @@ void TFairThrottler::DoUpdateFollower()
{
auto guard = Guard(Lock_);
- THashMap<TString, i64> bucketIncome;
- THashMap<TString, i64> bucketUsage;
- THashMap<TString, i64> bucketDemands;
- THashMap<TString, i64> bucketQuota;
+ THashMap<std::string, i64> bucketIncome;
+ THashMap<std::string, i64> bucketUsage;
+ THashMap<std::string, i64> bucketDemands;
+ THashMap<std::string, i64> bucketQuota;
i64 inFlow = 0;
i64 outFlow = 0;
for (const auto& [name, bucket] : Buckets_) {
- auto ipc = bucket.IPC->State();
+ auto* ipcState = bucket.Ipc->GetState();
- ipc->Weight = bucket.Config->Weight;
+ ipcState->Weight = bucket.Config->Weight;
if (auto limit = bucket.Config->GetLimit(Config_->TotalLimit)) {
- ipc->Limit = *limit;
+ ipcState->Limit = *limit;
} else {
- ipc->Limit = -1;
+ ipcState->Limit = -1;
}
- auto state = bucket.Throttler->Peek();
+ auto bucketState = bucket.Throttler->Peek();
auto guarantee = bucket.Config->GetGuarantee(Config_->TotalLimit);
- auto demand = state.Usage + state.Overdraft + state.QueueSize;
+ auto demand = bucketState.Usage + bucketState.Overdraft + bucketState.QueueSize;
if (guarantee && *guarantee > demand) {
demand = *guarantee;
}
- ipc->Demand = demand;
+ ipcState->Demand = demand;
bucketDemands[name] = demand;
- auto in = ipc->InFlow.exchange(0);
- auto out = bucket.Throttler->Refill(in, ipc->GuaranteedQuota);
- ipc->OutFlow += out;
+ auto in = ipcState->InFlow.exchange(0);
+ auto out = bucket.Throttler->Refill(in, ipcState->GuaranteedQuota);
+ ipcState->OutFlow += out;
bucketIncome[name] = in;
- bucketUsage[name] = state.Usage;
- bucketQuota[name] = state.Quota;
+ bucketUsage[name] = bucketState.Usage;
+ bucketQuota[name] = bucketState.Quota;
inFlow += in;
outFlow += out;
@@ -906,7 +906,7 @@ void TFairThrottler::RefillFromSharedBucket()
void TFairThrottler::UpdateLimits(TInstant at)
{
- if (!IsLeader_ && IPC_->TryLock()) {
+ if (!IsLeader_ && Ipc_->TryLock()) {
IsLeader_ = true;
YT_LOG_DEBUG("Throttler is leader");
diff --git a/yt/yt/core/concurrency/fair_throttler.h b/yt/yt/core/concurrency/fair_throttler.h
index ecf32ae14bc..801ffaef6f2 100644
--- a/yt/yt/core/concurrency/fair_throttler.h
+++ b/yt/yt/core/concurrency/fair_throttler.h
@@ -19,7 +19,7 @@ struct TFairThrottlerConfig
int GlobalAccumulationTicks;
- std::optional<TString> IPCPath;
+ std::optional<TString> IpcPath;
REGISTER_YSON_STRUCT(TFairThrottlerConfig);
@@ -52,11 +52,11 @@ DEFINE_REFCOUNTED_TYPE(TFairThrottlerBucketConfig)
////////////////////////////////////////////////////////////////////////////////
-struct IIPCBucket
+struct IIpcBucket
: public TRefCounted
{
// NB: This struct is shared between processes. All changes must be backward compatible.
- struct TBucket
+ struct TState
{
std::atomic<double> Weight;
std::atomic<i64> Limit;
@@ -66,14 +66,14 @@ struct IIPCBucket
std::atomic<i64> GuaranteedQuota;
};
- virtual TBucket* State() = 0;
+ virtual TState* GetState() = 0;
};
-DEFINE_REFCOUNTED_TYPE(IIPCBucket)
+DEFINE_REFCOUNTED_TYPE(IIpcBucket)
////////////////////////////////////////////////////////////////////////////////
-struct IThrottlerIPC
+struct IThrottlerIpc
: public TRefCounted
{
// NB: This struct is shared between processes. All changes must be backward compatible.
@@ -84,13 +84,13 @@ struct IThrottlerIPC
virtual bool TryLock() = 0;
virtual TSharedBucket* State() = 0;
- virtual std::vector<IIPCBucketPtr> ListBuckets() = 0;
- virtual IIPCBucketPtr AddBucket() = 0;
+ virtual std::vector<IIpcBucketPtr> ListBuckets() = 0;
+ virtual IIpcBucketPtr AddBucket() = 0;
};
-IThrottlerIPCPtr CreateFileThrottlerIPC(const TString& path);
+IThrottlerIpcPtr CreateFileThrottlerIpc(const TString& path);
-DEFINE_REFCOUNTED_TYPE(IThrottlerIPC)
+DEFINE_REFCOUNTED_TYPE(IThrottlerIpc)
////////////////////////////////////////////////////////////////////////////////
@@ -116,12 +116,12 @@ public:
NProfiling::TProfiler profiler);
IThroughputThrottlerPtr CreateBucketThrottler(
- const TString& name,
+ const std::string& name,
TFairThrottlerBucketConfigPtr config);
void Reconfigure(
TFairThrottlerConfigPtr config,
- const THashMap<TString, TFairThrottlerBucketConfigPtr>& bucketConfigs);
+ const THashMap<std::string, TFairThrottlerBucketConfigPtr>& bucketConfigs);
static std::vector<i64> ComputeFairDistribution(
i64 totalLimit,
@@ -140,15 +140,15 @@ private:
{
TFairThrottlerBucketConfigPtr Config;
TBucketThrottlerPtr Throttler;
- IIPCBucketPtr IPC;
+ IIpcBucketPtr Ipc;
};
// Protects all Config_ and Buckets_.
YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_);
TFairThrottlerConfigPtr Config_;
- THashMap<TString, TBucket> Buckets_;
+ THashMap<std::string, TBucket> Buckets_;
- IThrottlerIPCPtr IPC_;
+ IThrottlerIpcPtr Ipc_;
void DoUpdateLeader();
void DoUpdateFollower();
diff --git a/yt/yt/core/concurrency/public.h b/yt/yt/core/concurrency/public.h
index 10ffdda9169..4b547a74fce 100644
--- a/yt/yt/core/concurrency/public.h
+++ b/yt/yt/core/concurrency/public.h
@@ -120,8 +120,8 @@ DECLARE_REFCOUNTED_STRUCT(TFiberManagerDynamicConfig)
DECLARE_REFCOUNTED_STRUCT(TFairThrottlerConfig)
DECLARE_REFCOUNTED_STRUCT(TFairThrottlerBucketConfig)
-DECLARE_REFCOUNTED_STRUCT(IThrottlerIPC)
-DECLARE_REFCOUNTED_STRUCT(IIPCBucket)
+DECLARE_REFCOUNTED_STRUCT(IThrottlerIpc)
+DECLARE_REFCOUNTED_STRUCT(IIpcBucket)
DECLARE_REFCOUNTED_CLASS(TFairThrottler)
DECLARE_REFCOUNTED_CLASS(TBucketThrottler)
diff --git a/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp
index 41ee845af7f..2a6528bd314 100644
--- a/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp
+++ b/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp
@@ -274,7 +274,7 @@ TEST_F(TFairThrottlerTest, Release)
////////////////////////////////////////////////////////////////////////////////
-struct TFairThrottlerIPCTest
+struct TFairThrottlerIpcTest
: public ::testing::Test
{
TFairThrottlerConfigPtr Config = New<TFairThrottlerConfig>();
@@ -282,11 +282,11 @@ struct TFairThrottlerIPCTest
TFairThrottlerPtr DatNode, ExeNode;
- TFairThrottlerIPCTest()
+ TFairThrottlerIpcTest()
{
- TString testName = ::testing::UnitTest::GetInstance()->current_test_info()->name();
+ std::string testName = ::testing::UnitTest::GetInstance()->current_test_info()->name();
- Config->IPCPath = GetOutputPath() / (testName + ".throttler");
+ Config->IpcPath = GetOutputPath() / (testName + ".throttler");
Config->TotalLimit = 100;
auto logger = Logger().WithTag("Test: %v", testName);
@@ -296,7 +296,7 @@ struct TFairThrottlerIPCTest
}
};
-TEST_F(TFairThrottlerIPCTest, TwoBucket)
+TEST_F(TFairThrottlerIpcTest, TwoBucket)
{
auto first = DatNode->CreateBucketThrottler("first", BucketConfig);
auto second = ExeNode->CreateBucketThrottler("second", BucketConfig);
@@ -322,12 +322,12 @@ TEST_F(TFairThrottlerIPCTest, TwoBucket)
////////////////////////////////////////////////////////////////////////////////
#ifndef _win_
-TEST(TFileIPC, Test)
+TEST(TFileIpcTest, Test)
{
auto path = GetOutputPath() / "test_ipc";
- auto a = CreateFileThrottlerIPC(path);
- auto b = CreateFileThrottlerIPC(path);
+ auto a = CreateFileThrottlerIpc(path);
+ auto b = CreateFileThrottlerIpc(path);
ASSERT_TRUE(a->TryLock());
ASSERT_FALSE(b->TryLock());