diff options
author | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-06-10 14:05:34 +0300 |
---|---|---|
committer | arkady-e1ppa <arkady-e1ppa@yandex-team.com> | 2024-06-10 14:20:44 +0300 |
commit | 8a0ab0da5db5ef2d986d56bce04a9b08e7ae4523 (patch) | |
tree | 3c751cdd5e6174bd177fce05077638ac3c4b0564 | |
parent | ccb9ebb0b777d8e648101a0b97cdb940a989c9b9 (diff) | |
download | ydb-8a0ab0da5db5ef2d986d56bce04a9b08e7ae4523.tar.gz |
Fix some babenko issues of trunk-only changes
9b59c80f4005d526b821ab53d0d8f8616cd46537
-rw-r--r-- | yt/yt/core/concurrency/delayed_executor.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fair_share_action_queue.cpp | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fair_share_thread_pool.cpp | 17 | ||||
-rw-r--r-- | yt/yt/core/concurrency/fiber.cpp | 44 | ||||
-rw-r--r-- | yt/yt/core/concurrency/invoker_queue.h | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/periodic_executor.h | 2 | ||||
-rw-r--r-- | yt/yt/core/concurrency/retrying_periodic_executor.h | 8 | ||||
-rw-r--r-- | yt/yt/core/concurrency/scheduler_thread.h | 2 | ||||
-rw-r--r-- | yt/yt/core/misc/process_exit_profiler.cpp | 8 | ||||
-rw-r--r-- | yt/yt/core/misc/process_exit_profiler.h | 6 |
10 files changed, 36 insertions, 57 deletions
diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp index 581163dcfa..d538896bc0 100644 --- a/yt/yt/core/concurrency/delayed_executor.cpp +++ b/yt/yt/core/concurrency/delayed_executor.cpp @@ -433,7 +433,7 @@ private: void RunCallback(const TDelayedExecutorEntryPtr& entry, bool abort) { if (auto callback = TakeCallback(entry)) { - auto invoker = entry->Invoker + const auto& invoker = entry->Invoker ? entry->Invoker : DelayedInvoker_; invoker diff --git a/yt/yt/core/concurrency/fair_share_action_queue.cpp b/yt/yt/core/concurrency/fair_share_action_queue.cpp index ecef8faa85..21a8add98a 100644 --- a/yt/yt/core/concurrency/fair_share_action_queue.cpp +++ b/yt/yt/core/concurrency/fair_share_action_queue.cpp @@ -105,7 +105,7 @@ public: void Shutdown(bool graceful) { - // Syncrhonization done via Queue_->Shutdown(). + // Synchronization done via Queue_->Shutdown(). if (Stopped_.exchange(true, std::memory_order::relaxed)) { return; } diff --git a/yt/yt/core/concurrency/fair_share_thread_pool.cpp b/yt/yt/core/concurrency/fair_share_thread_pool.cpp index 91444d3f5d..a72c2eb876 100644 --- a/yt/yt/core/concurrency/fair_share_thread_pool.cpp +++ b/yt/yt/core/concurrency/fair_share_thread_pool.cpp @@ -233,11 +233,18 @@ public: void Shutdown() { auto guard = Guard(SpinLock_); - // Setting under spinlock because this way - // we have atomicity of two actions: - // 1) Write/read flag and 2) Drain/Enqueue callback. - // See two_level_fair_share_thread_pool Queue - // for more detailed explanation. + // We want to make sure that calls to + // Shutdown and Invoke are "atomic" with respect + // to each other. We need this so that + // there are no tasks left in the queue after + // the first call to Shutdown has finished. + // Here we achieve that by accessing + // both buckets and Stopping flag under + // SpinLock in either method. + // See two_level_fair_share_thread_pool.cpp + // for lock-free version with a more detailed + // explanation why Stopping flag logic provides + // the desired guarantee. Stopping_ = true; for (const auto& item : Heap_) { item.Bucket->Drain(); diff --git a/yt/yt/core/concurrency/fiber.cpp b/yt/yt/core/concurrency/fiber.cpp index 1b0717fa26..032bc53fa7 100644 --- a/yt/yt/core/concurrency/fiber.cpp +++ b/yt/yt/core/concurrency/fiber.cpp @@ -31,22 +31,6 @@ static constexpr auto& Logger = ConcurrencyLogger; //////////////////////////////////////////////////////////////////////////////// -#ifdef NDEBUG - #define ATOMIC_EXCHANGE_WITH_VERIFY(Atomic, NewValue, ExpectedValue, MemoryOrder) \ - YT_VERIFY(Atomic.load(std::memory_order::relaxed) == ExpectedValue); \ - Atomic.store(NewValue, MemoryOrder); \ - static_assert(true) -#else - #define ATOMIC_EXCHANGE_WITH_VERIFY(Atomic, NewValue, ExpectedValue, MemoryOrder) \ - { \ - auto observed = Atomic.exchange(NewValue, MemoryOrder); \ - YT_VERIFY(observed == ExpectedValue); \ - } \ - static_assert(true) -#endif - -//////////////////////////////////////////////////////////////////////////////// - class TFiberProfiler : public ISensorProducer { @@ -278,11 +262,8 @@ bool TFiberIntrospectionBase::TryLockForIntrospection(EFiberState* state, TFunct auto guard = Finally([&] { // Release lock held by introspector. - ATOMIC_EXCHANGE_WITH_VERIFY( - State_, - EFiberState::Waiting, - EFiberState::Introspecting, - std::memory_order::release); + YT_VERIFY(State_.load(std::memory_order::relaxed) == EFiberState::Introspecting); + State_.store(EFiberState::Waiting, std::memory_order::release); }); successHandler(); @@ -332,11 +313,8 @@ void TFiberIntrospectionBase::SetWaiting() WaitingSince_ = CpuInstantToInstant(GetApproximateCpuInstant()); // Release lock that should be acquired by running fiber. - ATOMIC_EXCHANGE_WITH_VERIFY( - State_, - EFiberState::Waiting, - EFiberState::Running, - std::memory_order::release); + YT_VERIFY(State_.load(std::memory_order::relaxed) == EFiberState::Running); + State_.store(EFiberState::Waiting, std::memory_order::release); } void TFiberIntrospectionBase::SetIdle() @@ -344,11 +322,8 @@ void TFiberIntrospectionBase::SetIdle() // 1) Locked by running fiber. // 2) Reading this doesn't cause anything to happen. // This state is never checked for, so just relaxed. - ATOMIC_EXCHANGE_WITH_VERIFY( - State_, - EFiberState::Idle, - EFiberState::Running, - std::memory_order::relaxed); + YT_VERIFY(State_.load(std::memory_order::relaxed) == EFiberState::Running); + State_.store(EFiberState::Idle, std::memory_order::relaxed); } std::optional<TDuration> TFiberIntrospectionBase::SetRunning() @@ -400,11 +375,8 @@ void TFiberIntrospectionBase::SetFinished() // the last modification and if this function // is called twice regardless of situation // one of the calls will crash as it should. - ATOMIC_EXCHANGE_WITH_VERIFY( - State_, - EFiberState::Finished, - EFiberState::Running, - std::memory_order::relaxed); + YT_VERIFY(State_.load(std::memory_order::relaxed) == EFiberState::Running); + State_.store(EFiberState::Finished, std::memory_order::relaxed); } EFiberState TFiberIntrospectionBase::GetState() const diff --git a/yt/yt/core/concurrency/invoker_queue.h b/yt/yt/core/concurrency/invoker_queue.h index f4464807da..0dea9c30b8 100644 --- a/yt/yt/core/concurrency/invoker_queue.h +++ b/yt/yt/core/concurrency/invoker_queue.h @@ -179,7 +179,7 @@ public: // NB(arkady-e1ppa): Calling shutdown is not // enough to prevent leaks of callbacks // as there might be some callbacks left in - // local queue of mpsc queue if shutdown + // local queue of MPSC queue if shutdown // was not graceful. void OnConsumerFinished(); diff --git a/yt/yt/core/concurrency/periodic_executor.h b/yt/yt/core/concurrency/periodic_executor.h index 60951f7250..9c3ad9b94b 100644 --- a/yt/yt/core/concurrency/periodic_executor.h +++ b/yt/yt/core/concurrency/periodic_executor.h @@ -54,7 +54,7 @@ class TPeriodicExecutor : public NDetail::TPeriodicExecutorBase<NDetail::TDefaultInvocationTimePolicy> { public: - //! Initializes an instance. + //! Initializes the instance. /*! * \note * We must call #Start to activate the instance. diff --git a/yt/yt/core/concurrency/retrying_periodic_executor.h b/yt/yt/core/concurrency/retrying_periodic_executor.h index a67d65feb2..1024bec7b3 100644 --- a/yt/yt/core/concurrency/retrying_periodic_executor.h +++ b/yt/yt/core/concurrency/retrying_periodic_executor.h @@ -66,15 +66,15 @@ private: //////////////////////////////////////////////////////////////////////////////// -// Periodically executes callback which can fail using retries. Speciffics: +// Periodically executes callback which can fail using retries. Specifics: // Fallible callback is modelled as TCallback<TError()> -// Any non-Ok error is considered a failure. -// Retries are made with exponential backoff see yt/yt/core/misc/backoff_strategy.h . +// Any non-OK error is considered a failure. +// Retries are made with exponential backoff; see yt/yt/core/misc/backoff_strategy.h . class TRetryingPeriodicExecutor : public NDetail::TPeriodicExecutorBase<NDetail::TRetryingInvocationTimePolicy> { public: - //! Initializes an instance. + //! Initializes the instance. /*! * \note * We must call #Start to activate the instance. diff --git a/yt/yt/core/concurrency/scheduler_thread.h b/yt/yt/core/concurrency/scheduler_thread.h index 5eb34ae18c..b4f9632d86 100644 --- a/yt/yt/core/concurrency/scheduler_thread.h +++ b/yt/yt/core/concurrency/scheduler_thread.h @@ -28,7 +28,7 @@ protected: // NB(arkady-e1ppa): We don't need a customisation point OnStop // because the only sensible case when we need to do something - // After stop is a graceful shutdown for which we might want + // after stop is a graceful shutdown for which we might want // to clear the queue. Now, every shutdownable queue is // either drained automatically (graceful = false) or // the Shutdown is graceful (TSchedulerThread::Stop(true)) will diff --git a/yt/yt/core/misc/process_exit_profiler.cpp b/yt/yt/core/misc/process_exit_profiler.cpp index 5c2133ba43..ec15c33ae6 100644 --- a/yt/yt/core/misc/process_exit_profiler.cpp +++ b/yt/yt/core/misc/process_exit_profiler.cpp @@ -9,7 +9,7 @@ TProcessExitProfiler::TProcessExitProfiler( const TString& prefix) : Profiler_(parent.WithPrefix(prefix)) , ExitDelayTimer_(Profiler_.Timer("/exit_delay")) - , ExitOkCounter_(Profiler_.Counter("/zero_exit_code")) + , ExitOKCounter_(Profiler_.Counter("/zero_exit_code")) , ExitUnknownCounter_(Profiler_.Counter("/unknown")) { } @@ -22,7 +22,7 @@ void TProcessExitProfiler::OnProcessExit( } if (error.IsOK()) { - ExitOkCounter_.Increment(); + ExitOKCounter_.Increment(); return; } @@ -73,11 +73,11 @@ NProfiling::TCounter& TProcessExitProfiler::GetOrCreateSignalExitCounter(int sig NProfiling::TCounter TProcessExitProfiler::MakeSignalExitCounter(int signal) { return Profiler_ - .WithTag("terminated_by_signal", SignalName(signal)) + .WithTag("terminated_by_signal", GetSignalName(signal)) .Counter("/count"); } -TString TProcessExitProfiler::SignalName(int signal) +TString TProcessExitProfiler::GetSignalName(int signal) { #ifdef _unix_ auto result = TString(strsignal(signal)); diff --git a/yt/yt/core/misc/process_exit_profiler.h b/yt/yt/core/misc/process_exit_profiler.h index d7b5e769e6..0306e44ed2 100644 --- a/yt/yt/core/misc/process_exit_profiler.h +++ b/yt/yt/core/misc/process_exit_profiler.h @@ -25,10 +25,10 @@ public: std::optional<TDuration> delay = {}); private: - NProfiling::TProfiler Profiler_; + const NProfiling::TProfiler Profiler_; NProfiling::TEventTimer ExitDelayTimer_; - NProfiling::TCounter ExitOkCounter_; + NProfiling::TCounter ExitOKCounter_; NProfiling::TCounter ExitUnknownCounter_; THashMap<int, NProfiling::TCounter> NonZeroExitCounters_; THashMap<int, NProfiling::TCounter> SignalExitCounters_; @@ -41,7 +41,7 @@ private: NProfiling::TCounter MakeSignalExitCounter(int signal); - static TString SignalName(int signal); + static TString GetSignalName(int signal); }; //////////////////////////////////////////////////////////////////////////////// |