aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-06-10 14:05:34 +0300
committerarkady-e1ppa <arkady-e1ppa@yandex-team.com>2024-06-10 14:20:44 +0300
commit8a0ab0da5db5ef2d986d56bce04a9b08e7ae4523 (patch)
tree3c751cdd5e6174bd177fce05077638ac3c4b0564
parentccb9ebb0b777d8e648101a0b97cdb940a989c9b9 (diff)
downloadydb-8a0ab0da5db5ef2d986d56bce04a9b08e7ae4523.tar.gz
Fix some babenko issues of trunk-only changes
9b59c80f4005d526b821ab53d0d8f8616cd46537
-rw-r--r--yt/yt/core/concurrency/delayed_executor.cpp2
-rw-r--r--yt/yt/core/concurrency/fair_share_action_queue.cpp2
-rw-r--r--yt/yt/core/concurrency/fair_share_thread_pool.cpp17
-rw-r--r--yt/yt/core/concurrency/fiber.cpp44
-rw-r--r--yt/yt/core/concurrency/invoker_queue.h2
-rw-r--r--yt/yt/core/concurrency/periodic_executor.h2
-rw-r--r--yt/yt/core/concurrency/retrying_periodic_executor.h8
-rw-r--r--yt/yt/core/concurrency/scheduler_thread.h2
-rw-r--r--yt/yt/core/misc/process_exit_profiler.cpp8
-rw-r--r--yt/yt/core/misc/process_exit_profiler.h6
10 files changed, 36 insertions, 57 deletions
diff --git a/yt/yt/core/concurrency/delayed_executor.cpp b/yt/yt/core/concurrency/delayed_executor.cpp
index 581163dcfa..d538896bc0 100644
--- a/yt/yt/core/concurrency/delayed_executor.cpp
+++ b/yt/yt/core/concurrency/delayed_executor.cpp
@@ -433,7 +433,7 @@ private:
void RunCallback(const TDelayedExecutorEntryPtr& entry, bool abort)
{
if (auto callback = TakeCallback(entry)) {
- auto invoker = entry->Invoker
+ const auto& invoker = entry->Invoker
? entry->Invoker
: DelayedInvoker_;
invoker
diff --git a/yt/yt/core/concurrency/fair_share_action_queue.cpp b/yt/yt/core/concurrency/fair_share_action_queue.cpp
index ecef8faa85..21a8add98a 100644
--- a/yt/yt/core/concurrency/fair_share_action_queue.cpp
+++ b/yt/yt/core/concurrency/fair_share_action_queue.cpp
@@ -105,7 +105,7 @@ public:
void Shutdown(bool graceful)
{
- // Syncrhonization done via Queue_->Shutdown().
+ // Synchronization done via Queue_->Shutdown().
if (Stopped_.exchange(true, std::memory_order::relaxed)) {
return;
}
diff --git a/yt/yt/core/concurrency/fair_share_thread_pool.cpp b/yt/yt/core/concurrency/fair_share_thread_pool.cpp
index 91444d3f5d..a72c2eb876 100644
--- a/yt/yt/core/concurrency/fair_share_thread_pool.cpp
+++ b/yt/yt/core/concurrency/fair_share_thread_pool.cpp
@@ -233,11 +233,18 @@ public:
void Shutdown()
{
auto guard = Guard(SpinLock_);
- // Setting under spinlock because this way
- // we have atomicity of two actions:
- // 1) Write/read flag and 2) Drain/Enqueue callback.
- // See two_level_fair_share_thread_pool Queue
- // for more detailed explanation.
+ // We want to make sure that calls to
+ // Shutdown and Invoke are "atomic" with respect
+ // to each other. We need this so that
+ // there are no tasks left in the queue after
+ // the first call to Shutdown has finished.
+ // Here we achieve that by accessing
+ // both buckets and Stopping flag under
+ // SpinLock in either method.
+ // See two_level_fair_share_thread_pool.cpp
+ // for lock-free version with a more detailed
+ // explanation why Stopping flag logic provides
+ // the desired guarantee.
Stopping_ = true;
for (const auto& item : Heap_) {
item.Bucket->Drain();
diff --git a/yt/yt/core/concurrency/fiber.cpp b/yt/yt/core/concurrency/fiber.cpp
index 1b0717fa26..032bc53fa7 100644
--- a/yt/yt/core/concurrency/fiber.cpp
+++ b/yt/yt/core/concurrency/fiber.cpp
@@ -31,22 +31,6 @@ static constexpr auto& Logger = ConcurrencyLogger;
////////////////////////////////////////////////////////////////////////////////
-#ifdef NDEBUG
- #define ATOMIC_EXCHANGE_WITH_VERIFY(Atomic, NewValue, ExpectedValue, MemoryOrder) \
- YT_VERIFY(Atomic.load(std::memory_order::relaxed) == ExpectedValue); \
- Atomic.store(NewValue, MemoryOrder); \
- static_assert(true)
-#else
- #define ATOMIC_EXCHANGE_WITH_VERIFY(Atomic, NewValue, ExpectedValue, MemoryOrder) \
- { \
- auto observed = Atomic.exchange(NewValue, MemoryOrder); \
- YT_VERIFY(observed == ExpectedValue); \
- } \
- static_assert(true)
-#endif
-
-////////////////////////////////////////////////////////////////////////////////
-
class TFiberProfiler
: public ISensorProducer
{
@@ -278,11 +262,8 @@ bool TFiberIntrospectionBase::TryLockForIntrospection(EFiberState* state, TFunct
auto guard = Finally([&] {
// Release lock held by introspector.
- ATOMIC_EXCHANGE_WITH_VERIFY(
- State_,
- EFiberState::Waiting,
- EFiberState::Introspecting,
- std::memory_order::release);
+ YT_VERIFY(State_.load(std::memory_order::relaxed) == EFiberState::Introspecting);
+ State_.store(EFiberState::Waiting, std::memory_order::release);
});
successHandler();
@@ -332,11 +313,8 @@ void TFiberIntrospectionBase::SetWaiting()
WaitingSince_ = CpuInstantToInstant(GetApproximateCpuInstant());
// Release lock that should be acquired by running fiber.
- ATOMIC_EXCHANGE_WITH_VERIFY(
- State_,
- EFiberState::Waiting,
- EFiberState::Running,
- std::memory_order::release);
+ YT_VERIFY(State_.load(std::memory_order::relaxed) == EFiberState::Running);
+ State_.store(EFiberState::Waiting, std::memory_order::release);
}
void TFiberIntrospectionBase::SetIdle()
@@ -344,11 +322,8 @@ void TFiberIntrospectionBase::SetIdle()
// 1) Locked by running fiber.
// 2) Reading this doesn't cause anything to happen.
// This state is never checked for, so just relaxed.
- ATOMIC_EXCHANGE_WITH_VERIFY(
- State_,
- EFiberState::Idle,
- EFiberState::Running,
- std::memory_order::relaxed);
+ YT_VERIFY(State_.load(std::memory_order::relaxed) == EFiberState::Running);
+ State_.store(EFiberState::Idle, std::memory_order::relaxed);
}
std::optional<TDuration> TFiberIntrospectionBase::SetRunning()
@@ -400,11 +375,8 @@ void TFiberIntrospectionBase::SetFinished()
// the last modification and if this function
// is called twice regardless of situation
// one of the calls will crash as it should.
- ATOMIC_EXCHANGE_WITH_VERIFY(
- State_,
- EFiberState::Finished,
- EFiberState::Running,
- std::memory_order::relaxed);
+ YT_VERIFY(State_.load(std::memory_order::relaxed) == EFiberState::Running);
+ State_.store(EFiberState::Finished, std::memory_order::relaxed);
}
EFiberState TFiberIntrospectionBase::GetState() const
diff --git a/yt/yt/core/concurrency/invoker_queue.h b/yt/yt/core/concurrency/invoker_queue.h
index f4464807da..0dea9c30b8 100644
--- a/yt/yt/core/concurrency/invoker_queue.h
+++ b/yt/yt/core/concurrency/invoker_queue.h
@@ -179,7 +179,7 @@ public:
// NB(arkady-e1ppa): Calling shutdown is not
// enough to prevent leaks of callbacks
// as there might be some callbacks left in
- // local queue of mpsc queue if shutdown
+ // local queue of MPSC queue if shutdown
// was not graceful.
void OnConsumerFinished();
diff --git a/yt/yt/core/concurrency/periodic_executor.h b/yt/yt/core/concurrency/periodic_executor.h
index 60951f7250..9c3ad9b94b 100644
--- a/yt/yt/core/concurrency/periodic_executor.h
+++ b/yt/yt/core/concurrency/periodic_executor.h
@@ -54,7 +54,7 @@ class TPeriodicExecutor
: public NDetail::TPeriodicExecutorBase<NDetail::TDefaultInvocationTimePolicy>
{
public:
- //! Initializes an instance.
+ //! Initializes the instance.
/*!
* \note
* We must call #Start to activate the instance.
diff --git a/yt/yt/core/concurrency/retrying_periodic_executor.h b/yt/yt/core/concurrency/retrying_periodic_executor.h
index a67d65feb2..1024bec7b3 100644
--- a/yt/yt/core/concurrency/retrying_periodic_executor.h
+++ b/yt/yt/core/concurrency/retrying_periodic_executor.h
@@ -66,15 +66,15 @@ private:
////////////////////////////////////////////////////////////////////////////////
-// Periodically executes callback which can fail using retries. Speciffics:
+// Periodically executes callback which can fail using retries. Specifics:
// Fallible callback is modelled as TCallback<TError()>
-// Any non-Ok error is considered a failure.
-// Retries are made with exponential backoff see yt/yt/core/misc/backoff_strategy.h .
+// Any non-OK error is considered a failure.
+// Retries are made with exponential backoff; see yt/yt/core/misc/backoff_strategy.h .
class TRetryingPeriodicExecutor
: public NDetail::TPeriodicExecutorBase<NDetail::TRetryingInvocationTimePolicy>
{
public:
- //! Initializes an instance.
+ //! Initializes the instance.
/*!
* \note
* We must call #Start to activate the instance.
diff --git a/yt/yt/core/concurrency/scheduler_thread.h b/yt/yt/core/concurrency/scheduler_thread.h
index 5eb34ae18c..b4f9632d86 100644
--- a/yt/yt/core/concurrency/scheduler_thread.h
+++ b/yt/yt/core/concurrency/scheduler_thread.h
@@ -28,7 +28,7 @@ protected:
// NB(arkady-e1ppa): We don't need a customisation point OnStop
// because the only sensible case when we need to do something
- // After stop is a graceful shutdown for which we might want
+ // after stop is a graceful shutdown for which we might want
// to clear the queue. Now, every shutdownable queue is
// either drained automatically (graceful = false) or
// the Shutdown is graceful (TSchedulerThread::Stop(true)) will
diff --git a/yt/yt/core/misc/process_exit_profiler.cpp b/yt/yt/core/misc/process_exit_profiler.cpp
index 5c2133ba43..ec15c33ae6 100644
--- a/yt/yt/core/misc/process_exit_profiler.cpp
+++ b/yt/yt/core/misc/process_exit_profiler.cpp
@@ -9,7 +9,7 @@ TProcessExitProfiler::TProcessExitProfiler(
const TString& prefix)
: Profiler_(parent.WithPrefix(prefix))
, ExitDelayTimer_(Profiler_.Timer("/exit_delay"))
- , ExitOkCounter_(Profiler_.Counter("/zero_exit_code"))
+ , ExitOKCounter_(Profiler_.Counter("/zero_exit_code"))
, ExitUnknownCounter_(Profiler_.Counter("/unknown"))
{ }
@@ -22,7 +22,7 @@ void TProcessExitProfiler::OnProcessExit(
}
if (error.IsOK()) {
- ExitOkCounter_.Increment();
+ ExitOKCounter_.Increment();
return;
}
@@ -73,11 +73,11 @@ NProfiling::TCounter& TProcessExitProfiler::GetOrCreateSignalExitCounter(int sig
NProfiling::TCounter TProcessExitProfiler::MakeSignalExitCounter(int signal)
{
return Profiler_
- .WithTag("terminated_by_signal", SignalName(signal))
+ .WithTag("terminated_by_signal", GetSignalName(signal))
.Counter("/count");
}
-TString TProcessExitProfiler::SignalName(int signal)
+TString TProcessExitProfiler::GetSignalName(int signal)
{
#ifdef _unix_
auto result = TString(strsignal(signal));
diff --git a/yt/yt/core/misc/process_exit_profiler.h b/yt/yt/core/misc/process_exit_profiler.h
index d7b5e769e6..0306e44ed2 100644
--- a/yt/yt/core/misc/process_exit_profiler.h
+++ b/yt/yt/core/misc/process_exit_profiler.h
@@ -25,10 +25,10 @@ public:
std::optional<TDuration> delay = {});
private:
- NProfiling::TProfiler Profiler_;
+ const NProfiling::TProfiler Profiler_;
NProfiling::TEventTimer ExitDelayTimer_;
- NProfiling::TCounter ExitOkCounter_;
+ NProfiling::TCounter ExitOKCounter_;
NProfiling::TCounter ExitUnknownCounter_;
THashMap<int, NProfiling::TCounter> NonZeroExitCounters_;
THashMap<int, NProfiling::TCounter> SignalExitCounters_;
@@ -41,7 +41,7 @@ private:
NProfiling::TCounter MakeSignalExitCounter(int signal);
- static TString SignalName(int signal);
+ static TString GetSignalName(int signal);
};
////////////////////////////////////////////////////////////////////////////////