diff options
author | babenko <babenko@yandex-team.com> | 2025-03-04 08:56:40 +0300 |
---|---|---|
committer | babenko <babenko@yandex-team.com> | 2025-03-04 09:43:19 +0300 |
commit | 875cc9c0de3e65ec0e587e991c825c3f2c173f21 (patch) | |
tree | bbe22ee79dcdd22821b3219f734c5537ab383de4 | |
parent | 3c6c39e9f8f3e2666d34217cc27fce7f0302fcda (diff) | |
download | ydb-875cc9c0de3e65ec0e587e991c825c3f2c173f21.tar.gz |
YT-18571: Cosmetics in fair throttler
commit_hash:942f79488214afdd3203f96192b9efc78afecc30
-rw-r--r-- | yt/yt/core/concurrency/fair_throttler.cpp | 114 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fair_throttler.h | 30 | ||||
-rw-r--r-- | yt/yt/core/concurrency/public.h | 4 | ||||
-rw-r--r-- | yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp | 16 |
4 files changed, 82 insertions, 82 deletions
diff --git a/yt/yt/core/concurrency/fair_throttler.cpp b/yt/yt/core/concurrency/fair_throttler.cpp index 3899c949e7f..9a5cf3c6972 100644 --- a/yt/yt/core/concurrency/fair_throttler.cpp +++ b/yt/yt/core/concurrency/fair_throttler.cpp @@ -35,7 +35,7 @@ void TFairThrottlerConfig::Register(TRegistrar registrar) registrar.Parameter("global_accumulation_ticks", &TThis::GlobalAccumulationTicks) .Default(5); - registrar.Parameter("ipc_path", &TThis::IPCPath) + registrar.Parameter("ipc_path", &TThis::IpcPath) .Default(); } @@ -95,27 +95,27 @@ static constexpr TStringBuf BucketsFileName = "buckets.v1"; //////////////////////////////////////////////////////////////////////////////// -class TFileIPCBucket - : public IIPCBucket +class TFileIpcBucket + : public IIpcBucket { public: - TFileIPCBucket(const TString& path, bool create) + TFileIpcBucket(const TString& path, bool create) : File_(path, OpenAlways | RdWr) { if (create) { File_.Flock(LOCK_EX | LOCK_NB); } - File_.Resize(sizeof(TBucket)); + File_.Resize(sizeof(TState)); Map_ = std::make_unique<TFileMap>(File_, TMemoryMapCommon::oRdWr); Map_->Map(0, Map_->Length()); LockMemory(Map_->Ptr(), Map_->Length()); } - TBucket* State() override + TState* GetState() override { - return reinterpret_cast<TBucket*>(Map_->Ptr()); + return reinterpret_cast<TState*>(Map_->Ptr()); } private: @@ -123,15 +123,15 @@ private: std::unique_ptr<TFileMap> Map_; }; -DEFINE_REFCOUNTED_TYPE(TFileIPCBucket) +DEFINE_REFCOUNTED_TYPE(TFileIpcBucket) //////////////////////////////////////////////////////////////////////////////// -class TFileThrottlerIPC - : public IThrottlerIPC +class TFileThrottlerIpc + : public IThrottlerIpc { public: - explicit TFileThrottlerIPC(const TString& path) + explicit TFileThrottlerIpc(const TString& path) : Path_(path) { NFS::MakeDirRecursive(Path_); @@ -166,7 +166,7 @@ public: return reinterpret_cast<TSharedBucket*>(SharedBucketMap_->Ptr()); } - std::vector<IIPCBucketPtr> ListBuckets() override + std::vector<IIpcBucketPtr> ListBuckets() override { Lock_->Acquire(); auto release = Finally([this] { @@ -175,14 +175,14 @@ public: Reload(); - std::vector<IIPCBucketPtr> buckets; + std::vector<IIpcBucketPtr> buckets; for (const auto& bucket : OpenBuckets_) { buckets.push_back(bucket.second); } return buckets; } - IIPCBucketPtr AddBucket() override + IIpcBucketPtr AddBucket() override { Lock_->Acquire(); auto release = Finally([this] { @@ -191,7 +191,7 @@ public: auto id = TGuid::Create(); OwnedBuckets_.insert(ToString(id)); - return New<TFileIPCBucket>(Path_ + "/" + BucketsFileName + "/" + ToString(id), true); + return New<TFileIpcBucket>(Path_ + "/" + BucketsFileName + "/" + ToString(id), true); } private: @@ -202,8 +202,8 @@ private: TFile SharedBucketFile_; std::unique_ptr<TFileMap> SharedBucketMap_; - THashMap<TString, IIPCBucketPtr> OpenBuckets_; - THashSet<TString> OwnedBuckets_; + THashMap<std::string, IIpcBucketPtr> OpenBuckets_; + THashSet<std::string> OwnedBuckets_; void Reload() { @@ -230,7 +230,7 @@ private: continue; } - OpenBuckets_[fileName] = New<TFileIPCBucket>(bucketPath, false); + OpenBuckets_[fileName] = New<TFileIpcBucket>(bucketPath, false); } catch (const TSystemError& ex) { continue; } @@ -238,12 +238,12 @@ private: } }; -IThrottlerIPCPtr CreateFileThrottlerIPC(const TString& path) +IThrottlerIpcPtr CreateFileThrottlerIpc(const TString& path) { - return New<TFileThrottlerIPC>(path); + return New<TFileThrottlerIpc>(path); } -DEFINE_REFCOUNTED_TYPE(TFileThrottlerIPC) +DEFINE_REFCOUNTED_TYPE(TFileThrottlerIpc) //////////////////////////////////////////////////////////////////////////////// @@ -623,12 +623,12 @@ TFairThrottler::TFairThrottler( , SharedBucket_(New<TSharedBucket>(config->GlobalAccumulationTicks, Profiler_)) , Config_(std::move(config)) { - if (Config_->IPCPath) { - IPC_ = New<TFileThrottlerIPC>(*Config_->IPCPath); + if (Config_->IpcPath) { + Ipc_ = New<TFileThrottlerIpc>(*Config_->IpcPath); SharedBucket_->Limit.Value = std::shared_ptr<std::atomic<i64>>( - &IPC_->State()->Value, - [ipc = IPC_] (auto /*ptr*/) { }); + &Ipc_->State()->Value, + [state = Ipc_] (auto /*ptr*/) { }); Profiler_.AddFuncGauge("/leader", MakeStrong(this), [this] { return IsLeader_.load(); @@ -646,7 +646,7 @@ TFairThrottler::TFairThrottler( } IThroughputThrottlerPtr TFairThrottler::CreateBucketThrottler( - const TString& name, + const std::string& name, TFairThrottlerBucketConfigPtr config) { if (!config) { @@ -670,15 +670,15 @@ IThroughputThrottlerPtr TFairThrottler::CreateBucketThrottler( throttler->SetLimited(config->Limit || config->RelativeLimit); - IIPCBucketPtr ipc; - if (IPC_) { - ipc = IPC_->AddBucket(); + IIpcBucketPtr state; + if (Ipc_) { + state = Ipc_->AddBucket(); } Buckets_[name] = TBucket{ .Config = std::move(config), .Throttler = throttler, - .IPC = ipc, + .Ipc = state, }; return throttler; @@ -686,7 +686,7 @@ IThroughputThrottlerPtr TFairThrottler::CreateBucketThrottler( void TFairThrottler::Reconfigure( TFairThrottlerConfigPtr config, - const THashMap<TString, TFairThrottlerBucketConfigPtr>& buckets) + const THashMap<std::string, TFairThrottlerBucketConfigPtr>& buckets) { for (const auto& [name, config] : buckets) { CreateBucketThrottler(name, config); @@ -712,7 +712,7 @@ void TFairThrottler::DoUpdateLeader() std::vector<TBucketThrottler::TBucketState> states; states.reserve(Buckets_.size()); - THashMap<TString, i64> bucketDemands; + THashMap<std::string, i64> bucketDemands; for (const auto& [name, bucket] : Buckets_) { auto state = bucket.Throttler->Peek(); @@ -736,12 +736,12 @@ void TFairThrottler::DoUpdateLeader() states.push_back(state); } - std::vector<IIPCBucketPtr> remoteBuckets; - if (IPC_) { - remoteBuckets = IPC_->ListBuckets(); + std::vector<IIpcBucketPtr> remoteBuckets; + if (Ipc_) { + remoteBuckets = Ipc_->ListBuckets(); for (const auto& remote : remoteBuckets) { - auto state = remote->State(); + auto state = remote->GetState(); weights.push_back(state->Weight.load()); @@ -771,9 +771,9 @@ void TFairThrottler::DoUpdateLeader() } auto freeIncome = ComputeFairDistribution(freeQuota, weights, demands, limits); - THashMap<TString, i64> bucketUsage; - THashMap<TString, i64> bucketIncome; - THashMap<TString, i64> bucketQuota; + THashMap<std::string, i64> bucketUsage; + THashMap<std::string, i64> bucketIncome; + THashMap<std::string, i64> bucketQuota; i64 leakedQuota = 0; int i = 0; @@ -792,7 +792,7 @@ void TFairThrottler::DoUpdateLeader() } for (const auto& remote : remoteBuckets) { - auto state = remote->State(); + auto state = remote->GetState(); state->InFlow += tickIncome[i] + freeIncome[i]; @@ -828,41 +828,41 @@ void TFairThrottler::DoUpdateFollower() { auto guard = Guard(Lock_); - THashMap<TString, i64> bucketIncome; - THashMap<TString, i64> bucketUsage; - THashMap<TString, i64> bucketDemands; - THashMap<TString, i64> bucketQuota; + THashMap<std::string, i64> bucketIncome; + THashMap<std::string, i64> bucketUsage; + THashMap<std::string, i64> bucketDemands; + THashMap<std::string, i64> bucketQuota; i64 inFlow = 0; i64 outFlow = 0; for (const auto& [name, bucket] : Buckets_) { - auto ipc = bucket.IPC->State(); + auto* ipcState = bucket.Ipc->GetState(); - ipc->Weight = bucket.Config->Weight; + ipcState->Weight = bucket.Config->Weight; if (auto limit = bucket.Config->GetLimit(Config_->TotalLimit)) { - ipc->Limit = *limit; + ipcState->Limit = *limit; } else { - ipc->Limit = -1; + ipcState->Limit = -1; } - auto state = bucket.Throttler->Peek(); + auto bucketState = bucket.Throttler->Peek(); auto guarantee = bucket.Config->GetGuarantee(Config_->TotalLimit); - auto demand = state.Usage + state.Overdraft + state.QueueSize; + auto demand = bucketState.Usage + bucketState.Overdraft + bucketState.QueueSize; if (guarantee && *guarantee > demand) { demand = *guarantee; } - ipc->Demand = demand; + ipcState->Demand = demand; bucketDemands[name] = demand; - auto in = ipc->InFlow.exchange(0); - auto out = bucket.Throttler->Refill(in, ipc->GuaranteedQuota); - ipc->OutFlow += out; + auto in = ipcState->InFlow.exchange(0); + auto out = bucket.Throttler->Refill(in, ipcState->GuaranteedQuota); + ipcState->OutFlow += out; bucketIncome[name] = in; - bucketUsage[name] = state.Usage; - bucketQuota[name] = state.Quota; + bucketUsage[name] = bucketState.Usage; + bucketQuota[name] = bucketState.Quota; inFlow += in; outFlow += out; @@ -906,7 +906,7 @@ void TFairThrottler::RefillFromSharedBucket() void TFairThrottler::UpdateLimits(TInstant at) { - if (!IsLeader_ && IPC_->TryLock()) { + if (!IsLeader_ && Ipc_->TryLock()) { IsLeader_ = true; YT_LOG_DEBUG("Throttler is leader"); diff --git a/yt/yt/core/concurrency/fair_throttler.h b/yt/yt/core/concurrency/fair_throttler.h index ecf32ae14bc..801ffaef6f2 100644 --- a/yt/yt/core/concurrency/fair_throttler.h +++ b/yt/yt/core/concurrency/fair_throttler.h @@ -19,7 +19,7 @@ struct TFairThrottlerConfig int GlobalAccumulationTicks; - std::optional<TString> IPCPath; + std::optional<TString> IpcPath; REGISTER_YSON_STRUCT(TFairThrottlerConfig); @@ -52,11 +52,11 @@ DEFINE_REFCOUNTED_TYPE(TFairThrottlerBucketConfig) //////////////////////////////////////////////////////////////////////////////// -struct IIPCBucket +struct IIpcBucket : public TRefCounted { // NB: This struct is shared between processes. All changes must be backward compatible. - struct TBucket + struct TState { std::atomic<double> Weight; std::atomic<i64> Limit; @@ -66,14 +66,14 @@ struct IIPCBucket std::atomic<i64> GuaranteedQuota; }; - virtual TBucket* State() = 0; + virtual TState* GetState() = 0; }; -DEFINE_REFCOUNTED_TYPE(IIPCBucket) +DEFINE_REFCOUNTED_TYPE(IIpcBucket) //////////////////////////////////////////////////////////////////////////////// -struct IThrottlerIPC +struct IThrottlerIpc : public TRefCounted { // NB: This struct is shared between processes. All changes must be backward compatible. @@ -84,13 +84,13 @@ struct IThrottlerIPC virtual bool TryLock() = 0; virtual TSharedBucket* State() = 0; - virtual std::vector<IIPCBucketPtr> ListBuckets() = 0; - virtual IIPCBucketPtr AddBucket() = 0; + virtual std::vector<IIpcBucketPtr> ListBuckets() = 0; + virtual IIpcBucketPtr AddBucket() = 0; }; -IThrottlerIPCPtr CreateFileThrottlerIPC(const TString& path); +IThrottlerIpcPtr CreateFileThrottlerIpc(const TString& path); -DEFINE_REFCOUNTED_TYPE(IThrottlerIPC) +DEFINE_REFCOUNTED_TYPE(IThrottlerIpc) //////////////////////////////////////////////////////////////////////////////// @@ -116,12 +116,12 @@ public: NProfiling::TProfiler profiler); IThroughputThrottlerPtr CreateBucketThrottler( - const TString& name, + const std::string& name, TFairThrottlerBucketConfigPtr config); void Reconfigure( TFairThrottlerConfigPtr config, - const THashMap<TString, TFairThrottlerBucketConfigPtr>& bucketConfigs); + const THashMap<std::string, TFairThrottlerBucketConfigPtr>& bucketConfigs); static std::vector<i64> ComputeFairDistribution( i64 totalLimit, @@ -140,15 +140,15 @@ private: { TFairThrottlerBucketConfigPtr Config; TBucketThrottlerPtr Throttler; - IIPCBucketPtr IPC; + IIpcBucketPtr Ipc; }; // Protects all Config_ and Buckets_. YT_DECLARE_SPIN_LOCK(NThreading::TSpinLock, Lock_); TFairThrottlerConfigPtr Config_; - THashMap<TString, TBucket> Buckets_; + THashMap<std::string, TBucket> Buckets_; - IThrottlerIPCPtr IPC_; + IThrottlerIpcPtr Ipc_; void DoUpdateLeader(); void DoUpdateFollower(); diff --git a/yt/yt/core/concurrency/public.h b/yt/yt/core/concurrency/public.h index 10ffdda9169..4b547a74fce 100644 --- a/yt/yt/core/concurrency/public.h +++ b/yt/yt/core/concurrency/public.h @@ -120,8 +120,8 @@ DECLARE_REFCOUNTED_STRUCT(TFiberManagerDynamicConfig) DECLARE_REFCOUNTED_STRUCT(TFairThrottlerConfig) DECLARE_REFCOUNTED_STRUCT(TFairThrottlerBucketConfig) -DECLARE_REFCOUNTED_STRUCT(IThrottlerIPC) -DECLARE_REFCOUNTED_STRUCT(IIPCBucket) +DECLARE_REFCOUNTED_STRUCT(IThrottlerIpc) +DECLARE_REFCOUNTED_STRUCT(IIpcBucket) DECLARE_REFCOUNTED_CLASS(TFairThrottler) DECLARE_REFCOUNTED_CLASS(TBucketThrottler) diff --git a/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp b/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp index 41ee845af7f..2a6528bd314 100644 --- a/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp +++ b/yt/yt/core/concurrency/unittests/fair_throttler_ut.cpp @@ -274,7 +274,7 @@ TEST_F(TFairThrottlerTest, Release) //////////////////////////////////////////////////////////////////////////////// -struct TFairThrottlerIPCTest +struct TFairThrottlerIpcTest : public ::testing::Test { TFairThrottlerConfigPtr Config = New<TFairThrottlerConfig>(); @@ -282,11 +282,11 @@ struct TFairThrottlerIPCTest TFairThrottlerPtr DatNode, ExeNode; - TFairThrottlerIPCTest() + TFairThrottlerIpcTest() { - TString testName = ::testing::UnitTest::GetInstance()->current_test_info()->name(); + std::string testName = ::testing::UnitTest::GetInstance()->current_test_info()->name(); - Config->IPCPath = GetOutputPath() / (testName + ".throttler"); + Config->IpcPath = GetOutputPath() / (testName + ".throttler"); Config->TotalLimit = 100; auto logger = Logger().WithTag("Test: %v", testName); @@ -296,7 +296,7 @@ struct TFairThrottlerIPCTest } }; -TEST_F(TFairThrottlerIPCTest, TwoBucket) +TEST_F(TFairThrottlerIpcTest, TwoBucket) { auto first = DatNode->CreateBucketThrottler("first", BucketConfig); auto second = ExeNode->CreateBucketThrottler("second", BucketConfig); @@ -322,12 +322,12 @@ TEST_F(TFairThrottlerIPCTest, TwoBucket) //////////////////////////////////////////////////////////////////////////////// #ifndef _win_ -TEST(TFileIPC, Test) +TEST(TFileIpcTest, Test) { auto path = GetOutputPath() / "test_ipc"; - auto a = CreateFileThrottlerIPC(path); - auto b = CreateFileThrottlerIPC(path); + auto a = CreateFileThrottlerIpc(path); + auto b = CreateFileThrottlerIpc(path); ASSERT_TRUE(a->TryLock()); ASSERT_FALSE(b->TryLock()); |