diff options
author | eeight <eeight@yandex-team.ru> | 2022-05-20 00:26:58 +0300 |
---|---|---|
committer | eeight <eeight@yandex-team.ru> | 2022-05-20 00:26:58 +0300 |
commit | 13950b7049e70b9e0adf3cbe48a527f9e8d25e3c (patch) | |
tree | 65359e6418fb7833b826e97044c58c55485941cb | |
parent | d98cd6128e8838e19ae1a95be34385af299dd62a (diff) | |
download | ydb-13950b7049e70b9e0adf3cbe48a527f9e8d25e3c.tar.gz |
IGNIETFERRO-1105 Get rid of TAtomic in system/{atexit, event, shellcommand}
ref:c74bffae7e90dee515f5089979fc050d20a34e4e
-rw-r--r-- | util/system/atexit.cpp | 10 | ||||
-rw-r--r-- | util/system/event.cpp | 19 | ||||
-rw-r--r-- | util/system/shellcommand.cpp | 39 | ||||
-rw-r--r-- | util/system/shellcommand.h | 20 | ||||
-rw-r--r-- | util/system/tls.cpp | 8 |
5 files changed, 57 insertions, 39 deletions
diff --git a/util/system/atexit.cpp b/util/system/atexit.cpp index aeb29466b3..b06880818c 100644 --- a/util/system/atexit.cpp +++ b/util/system/atexit.cpp @@ -1,5 +1,4 @@ #include "atexit.h" -#include "atomic.h" #include "yassert.h" #include "spinlock.h" #include "thread.h" @@ -9,6 +8,7 @@ #include <util/generic/deque.h> #include <util/generic/queue.h> +#include <atomic> #include <tuple> #include <cstdlib> @@ -30,12 +30,12 @@ namespace { public: inline TAtExit() noexcept - : FinishStarted_(0) + : FinishStarted_(false) { } inline void Finish() noexcept { - AtomicSet(FinishStarted_, 1); + FinishStarted_.store(true); auto guard = Guard(Lock_); @@ -66,12 +66,12 @@ namespace { } inline bool FinishStarted() const { - return AtomicGet(FinishStarted_); + return FinishStarted_.load(); } private: TAdaptiveLock Lock_; - TAtomic FinishStarted_; + std::atomic<bool> FinishStarted_; TDeque<TFunc> Store_; TPriorityQueue<TFunc*, TVector<TFunc*>, TCmp> Items_; }; diff --git a/util/system/event.cpp b/util/system/event.cpp index ceb9134a3d..42311d4835 100644 --- a/util/system/event.cpp +++ b/util/system/event.cpp @@ -1,6 +1,5 @@ #include "defaults.h" -#include "atomic.h" #include "event.h" #include "mutex.h" #include "condvar.h" @@ -9,6 +8,8 @@ #include "winint.h" #endif +#include <atomic> + class TSystemEvent::TEvImpl: public TAtomicRefCount<TSystemEvent::TEvImpl> { public: #ifdef _win_ @@ -49,12 +50,12 @@ public: } inline void Signal() noexcept { - if (Manual && AtomicGet(Signaled)) { + if (Manual && Signaled.load(std::memory_order_acquire)) { return; // shortcut } with_lock (Mutex) { - AtomicSet(Signaled, 1); + Signaled.store(true, std::memory_order_release); } if (Manual) { @@ -65,27 +66,27 @@ public: } inline void Reset() noexcept { - AtomicSet(Signaled, 0); + Signaled.store(false, std::memory_order_release); } inline bool WaitD(TInstant deadLine) noexcept { - if (Manual && AtomicGet(Signaled)) { + if (Manual && Signaled.load(std::memory_order_acquire)) { return true; // shortcut } bool resSignaled = true; with_lock (Mutex) { - while (!AtomicGet(Signaled)) { + while (!Signaled.load(std::memory_order_acquire)) { if (!Cond.WaitD(Mutex, deadLine)) { - resSignaled = AtomicGet(Signaled); // timed out, but Signaled could have been set + resSignaled = Signaled.load(std::memory_order_acquire); // timed out, but Signaled could have been set break; } } if (!Manual) { - AtomicSet(Signaled, 0); + Signaled.store(false, std::memory_order_release); } } @@ -99,7 +100,7 @@ private: #else TCondVar Cond; TMutex Mutex; - TAtomic Signaled = 0; + std::atomic<bool> Signaled = false; bool Manual; #endif }; diff --git a/util/system/shellcommand.cpp b/util/system/shellcommand.cpp index 247bf6f340..025a3ffd2f 100644 --- a/util/system/shellcommand.cpp +++ b/util/system/shellcommand.cpp @@ -2,7 +2,6 @@ #include "user.h" #include "nice.h" #include "sigset.h" -#include "atomic.h" #include <util/folder/dirut.h> #include <util/generic/algorithm.h> @@ -198,7 +197,7 @@ private: TShellCommandOptions::EHandleMode InputMode = TShellCommandOptions::HANDLE_STREAM; TPid Pid; - TAtomic ExecutionStatus; // TShellCommand::ECommandStatus + std::atomic<size_t> ExecutionStatus; // TShellCommand::ECommandStatus TThread* WatchThread; bool TerminateFlag = false; @@ -253,7 +252,7 @@ private: TRealPipeHandle* Pipe; IOutputStream* OutputStream; IInputStream* InputStream; - TAtomic* ShouldClosePipe; + std::atomic<bool>* ShouldClosePipe; TString InternalError; }; @@ -298,35 +297,35 @@ public: } inline void AppendArgument(const TStringBuf argument) { - if (AtomicGet(ExecutionStatus) == SHELL_RUNNING) { + if (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING) { ythrow yexception() << "You cannot change command parameters while process is running"; } Arguments.push_back(ToString(argument)); } inline const TString& GetOutput() const { - if (AtomicGet(ExecutionStatus) == SHELL_RUNNING) { + if (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING) { ythrow yexception() << "You cannot retrieve output while process is running."; } return CollectedOutput; } inline const TString& GetError() const { - if (AtomicGet(ExecutionStatus) == SHELL_RUNNING) { + if (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING) { ythrow yexception() << "You cannot retrieve output while process is running."; } return CollectedError; } inline const TString& GetInternalError() const { - if (AtomicGet(ExecutionStatus) != SHELL_INTERNAL_ERROR) { + if (ExecutionStatus.load(std::memory_order_acquire) != SHELL_INTERNAL_ERROR) { ythrow yexception() << "Internal error hasn't occured so can't be retrieved."; } return InternalError; } inline ECommandStatus GetStatus() const { - return static_cast<ECommandStatus>(AtomicGet(ExecutionStatus)); + return static_cast<ECommandStatus>(ExecutionStatus.load(std::memory_order_acquire)); } inline TMaybe<int> GetExitCode() const { @@ -357,7 +356,7 @@ public: void Run(); inline void Terminate() { - if (!!Pid && (AtomicGet(ExecutionStatus) == SHELL_RUNNING)) { + if (!!Pid && (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING)) { bool ok = #if defined(_unix_) kill(Options_.DetachSession ? -1 * Pid : Pid, SIGTERM) == 0; @@ -382,7 +381,7 @@ public: } inline void CloseInput() { - AtomicSet(Options_.ShouldCloseInput, true); + Options_.ShouldCloseInput.store(true); } inline static bool TerminateIsRequired(void* processInfo) { @@ -451,7 +450,7 @@ public: if (!bytesToWrite) { bytesToWrite = pump->InputStream->Read(buffer.Data(), buffer.Capacity()); if (bytesToWrite == 0) { - if (AtomicGet(pump->ShouldClosePipe)) { + if (pump->ShouldClosePipe->load(std::memory_order_acquire)) { break; } continue; @@ -583,7 +582,7 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) { } if (!res) { - AtomicSet(ExecutionStatus, SHELL_ERROR); + ExecutionStatus.store(SHELL_ERROR, std::memory_order_release); /// @todo: write to error stream if set TStringOutput out(CollectedError); out << "Process was not created: " << LastSystemErrorText() << " command text was: '" << GetAString(cmdcopy.Data()) << "'"; @@ -723,7 +722,7 @@ void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const* #endif void TShellCommand::TImpl::Run() { - Y_ENSURE(AtomicGet(ExecutionStatus) != SHELL_RUNNING, TStringBuf("Process is already running")); + Y_ENSURE(ExecutionStatus.load(std::memory_order_acquire) != SHELL_RUNNING, TStringBuf("Process is already running")); // Prepare I/O streams CollectedOutput.clear(); CollectedError.clear(); @@ -739,7 +738,7 @@ void TShellCommand::TImpl::Run() { TRealPipeHandle::Pipe(pipes.InputPipeFd[0], pipes.InputPipeFd[1], CloseOnExec); } - AtomicSet(ExecutionStatus, SHELL_RUNNING); + ExecutionStatus.store(SHELL_RUNNING, std::memory_order_release); #if defined(_unix_) // block all signals to avoid signal handler race after fork() @@ -786,7 +785,7 @@ void TShellCommand::TImpl::Run() { pid_t pid = fork(); if (pid == -1) { - AtomicSet(ExecutionStatus, SHELL_ERROR); + ExecutionStatus.store(SHELL_ERROR, std::memory_order_release); /// @todo check if pipes are still open ythrow TSystemError() << "Cannot fork"; } else if (pid == 0) { // child @@ -807,7 +806,7 @@ void TShellCommand::TImpl::Run() { #endif pipes.PrepareParents(); - if (AtomicGet(ExecutionStatus) != SHELL_RUNNING) { + if (ExecutionStatus.load(std::memory_order_acquire) != SHELL_RUNNING) { return; } @@ -1002,7 +1001,7 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) { if (!bytesToWrite) { bytesToWrite = input->Read(inputBuffer.Data(), inputBuffer.Capacity()); if (bytesToWrite == 0) { - if (AtomicGet(pi->Parent->Options_.ShouldCloseInput)) { + if (pi->Parent->Options_.ShouldCloseInput.load(std::memory_order_acquire)) { input = nullptr; } continue; @@ -1051,9 +1050,9 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) { #endif pi->Parent->ExitCode = processExitCode; if (cleanExit) { - AtomicSet(pi->Parent->ExecutionStatus, SHELL_FINISHED); + pi->Parent->ExecutionStatus.store(SHELL_FINISHED, std::memory_order_release); } else { - AtomicSet(pi->Parent->ExecutionStatus, SHELL_ERROR); + pi->Parent->ExecutionStatus.store(SHELL_ERROR, std::memory_order_release); } #if defined(_win_) @@ -1076,7 +1075,7 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) { #endif } catch (const yexception& e) { // Some error in watch occured, set result to error - AtomicSet(pi->Parent->ExecutionStatus, SHELL_INTERNAL_ERROR); + pi->Parent->ExecutionStatus.store(SHELL_INTERNAL_ERROR, std::memory_order_release); pi->Parent->InternalError = e.what(); if (input) { pi->InputFd.Close(); diff --git a/util/system/shellcommand.h b/util/system/shellcommand.h index 51b08e04b6..c7637821e4 100644 --- a/util/system/shellcommand.h +++ b/util/system/shellcommand.h @@ -14,7 +14,23 @@ #include "mutex.h" #include <sys/types.h> +#include <atomic> + class TShellCommandOptions { + class TCopyableAtomicBool: public std::atomic<bool> { + public: + using std::atomic<bool>::atomic; + TCopyableAtomicBool(const TCopyableAtomicBool& other) + : std::atomic<bool>(other.load(std::memory_order_acquire)) + { + } + + TCopyableAtomicBool& operator=(const TCopyableAtomicBool& other) { + this->store(other.load(std::memory_order_acquire), std::memory_order_release); + return *this; + } + }; + public: struct TUserOptions { TString Name; @@ -185,7 +201,7 @@ public: * @return self */ inline TShellCommandOptions& SetCloseInput(bool val) { - ShouldCloseInput = val; + ShouldCloseInput.store(val); return *this; } @@ -307,7 +323,7 @@ public: bool QuoteArguments = false; bool DetachSession = false; bool CloseStreams = false; - TAtomic ShouldCloseInput = false; + TCopyableAtomicBool ShouldCloseInput = false; EHandleMode InputMode = HANDLE_STREAM; EHandleMode OutputMode = HANDLE_STREAM; EHandleMode ErrorMode = HANDLE_STREAM; diff --git a/util/system/tls.cpp b/util/system/tls.cpp index 8e892a6d32..9443aec3a3 100644 --- a/util/system/tls.cpp +++ b/util/system/tls.cpp @@ -5,6 +5,8 @@ #include <util/generic/singleton.h> #include <util/generic/vector.h> +#include <atomic> + #if defined(_unix_) #include <pthread.h> #endif @@ -12,10 +14,10 @@ using namespace NTls; namespace { - static inline TAtomicBase AcquireKey() { - static TAtomic cur; + static inline size_t AcquireKey() { + static std::atomic<size_t> cur; - return AtomicIncrement(cur) - (TAtomicBase)1; + return cur++; } class TGenericTlsBase { |