aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreeight <eeight@yandex-team.ru>2022-05-20 00:26:58 +0300
committereeight <eeight@yandex-team.ru>2022-05-20 00:26:58 +0300
commit13950b7049e70b9e0adf3cbe48a527f9e8d25e3c (patch)
tree65359e6418fb7833b826e97044c58c55485941cb
parentd98cd6128e8838e19ae1a95be34385af299dd62a (diff)
downloadydb-13950b7049e70b9e0adf3cbe48a527f9e8d25e3c.tar.gz
IGNIETFERRO-1105 Get rid of TAtomic in system/{atexit, event, shellcommand}
ref:c74bffae7e90dee515f5089979fc050d20a34e4e
-rw-r--r--util/system/atexit.cpp10
-rw-r--r--util/system/event.cpp19
-rw-r--r--util/system/shellcommand.cpp39
-rw-r--r--util/system/shellcommand.h20
-rw-r--r--util/system/tls.cpp8
5 files changed, 57 insertions, 39 deletions
diff --git a/util/system/atexit.cpp b/util/system/atexit.cpp
index aeb29466b3..b06880818c 100644
--- a/util/system/atexit.cpp
+++ b/util/system/atexit.cpp
@@ -1,5 +1,4 @@
#include "atexit.h"
-#include "atomic.h"
#include "yassert.h"
#include "spinlock.h"
#include "thread.h"
@@ -9,6 +8,7 @@
#include <util/generic/deque.h>
#include <util/generic/queue.h>
+#include <atomic>
#include <tuple>
#include <cstdlib>
@@ -30,12 +30,12 @@ namespace {
public:
inline TAtExit() noexcept
- : FinishStarted_(0)
+ : FinishStarted_(false)
{
}
inline void Finish() noexcept {
- AtomicSet(FinishStarted_, 1);
+ FinishStarted_.store(true);
auto guard = Guard(Lock_);
@@ -66,12 +66,12 @@ namespace {
}
inline bool FinishStarted() const {
- return AtomicGet(FinishStarted_);
+ return FinishStarted_.load();
}
private:
TAdaptiveLock Lock_;
- TAtomic FinishStarted_;
+ std::atomic<bool> FinishStarted_;
TDeque<TFunc> Store_;
TPriorityQueue<TFunc*, TVector<TFunc*>, TCmp> Items_;
};
diff --git a/util/system/event.cpp b/util/system/event.cpp
index ceb9134a3d..42311d4835 100644
--- a/util/system/event.cpp
+++ b/util/system/event.cpp
@@ -1,6 +1,5 @@
#include "defaults.h"
-#include "atomic.h"
#include "event.h"
#include "mutex.h"
#include "condvar.h"
@@ -9,6 +8,8 @@
#include "winint.h"
#endif
+#include <atomic>
+
class TSystemEvent::TEvImpl: public TAtomicRefCount<TSystemEvent::TEvImpl> {
public:
#ifdef _win_
@@ -49,12 +50,12 @@ public:
}
inline void Signal() noexcept {
- if (Manual && AtomicGet(Signaled)) {
+ if (Manual && Signaled.load(std::memory_order_acquire)) {
return; // shortcut
}
with_lock (Mutex) {
- AtomicSet(Signaled, 1);
+ Signaled.store(true, std::memory_order_release);
}
if (Manual) {
@@ -65,27 +66,27 @@ public:
}
inline void Reset() noexcept {
- AtomicSet(Signaled, 0);
+ Signaled.store(false, std::memory_order_release);
}
inline bool WaitD(TInstant deadLine) noexcept {
- if (Manual && AtomicGet(Signaled)) {
+ if (Manual && Signaled.load(std::memory_order_acquire)) {
return true; // shortcut
}
bool resSignaled = true;
with_lock (Mutex) {
- while (!AtomicGet(Signaled)) {
+ while (!Signaled.load(std::memory_order_acquire)) {
if (!Cond.WaitD(Mutex, deadLine)) {
- resSignaled = AtomicGet(Signaled); // timed out, but Signaled could have been set
+ resSignaled = Signaled.load(std::memory_order_acquire); // timed out, but Signaled could have been set
break;
}
}
if (!Manual) {
- AtomicSet(Signaled, 0);
+ Signaled.store(false, std::memory_order_release);
}
}
@@ -99,7 +100,7 @@ private:
#else
TCondVar Cond;
TMutex Mutex;
- TAtomic Signaled = 0;
+ std::atomic<bool> Signaled = false;
bool Manual;
#endif
};
diff --git a/util/system/shellcommand.cpp b/util/system/shellcommand.cpp
index 247bf6f340..025a3ffd2f 100644
--- a/util/system/shellcommand.cpp
+++ b/util/system/shellcommand.cpp
@@ -2,7 +2,6 @@
#include "user.h"
#include "nice.h"
#include "sigset.h"
-#include "atomic.h"
#include <util/folder/dirut.h>
#include <util/generic/algorithm.h>
@@ -198,7 +197,7 @@ private:
TShellCommandOptions::EHandleMode InputMode = TShellCommandOptions::HANDLE_STREAM;
TPid Pid;
- TAtomic ExecutionStatus; // TShellCommand::ECommandStatus
+ std::atomic<size_t> ExecutionStatus; // TShellCommand::ECommandStatus
TThread* WatchThread;
bool TerminateFlag = false;
@@ -253,7 +252,7 @@ private:
TRealPipeHandle* Pipe;
IOutputStream* OutputStream;
IInputStream* InputStream;
- TAtomic* ShouldClosePipe;
+ std::atomic<bool>* ShouldClosePipe;
TString InternalError;
};
@@ -298,35 +297,35 @@ public:
}
inline void AppendArgument(const TStringBuf argument) {
- if (AtomicGet(ExecutionStatus) == SHELL_RUNNING) {
+ if (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING) {
ythrow yexception() << "You cannot change command parameters while process is running";
}
Arguments.push_back(ToString(argument));
}
inline const TString& GetOutput() const {
- if (AtomicGet(ExecutionStatus) == SHELL_RUNNING) {
+ if (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING) {
ythrow yexception() << "You cannot retrieve output while process is running.";
}
return CollectedOutput;
}
inline const TString& GetError() const {
- if (AtomicGet(ExecutionStatus) == SHELL_RUNNING) {
+ if (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING) {
ythrow yexception() << "You cannot retrieve output while process is running.";
}
return CollectedError;
}
inline const TString& GetInternalError() const {
- if (AtomicGet(ExecutionStatus) != SHELL_INTERNAL_ERROR) {
+ if (ExecutionStatus.load(std::memory_order_acquire) != SHELL_INTERNAL_ERROR) {
ythrow yexception() << "Internal error hasn't occured so can't be retrieved.";
}
return InternalError;
}
inline ECommandStatus GetStatus() const {
- return static_cast<ECommandStatus>(AtomicGet(ExecutionStatus));
+ return static_cast<ECommandStatus>(ExecutionStatus.load(std::memory_order_acquire));
}
inline TMaybe<int> GetExitCode() const {
@@ -357,7 +356,7 @@ public:
void Run();
inline void Terminate() {
- if (!!Pid && (AtomicGet(ExecutionStatus) == SHELL_RUNNING)) {
+ if (!!Pid && (ExecutionStatus.load(std::memory_order_acquire) == SHELL_RUNNING)) {
bool ok =
#if defined(_unix_)
kill(Options_.DetachSession ? -1 * Pid : Pid, SIGTERM) == 0;
@@ -382,7 +381,7 @@ public:
}
inline void CloseInput() {
- AtomicSet(Options_.ShouldCloseInput, true);
+ Options_.ShouldCloseInput.store(true);
}
inline static bool TerminateIsRequired(void* processInfo) {
@@ -451,7 +450,7 @@ public:
if (!bytesToWrite) {
bytesToWrite = pump->InputStream->Read(buffer.Data(), buffer.Capacity());
if (bytesToWrite == 0) {
- if (AtomicGet(pump->ShouldClosePipe)) {
+ if (pump->ShouldClosePipe->load(std::memory_order_acquire)) {
break;
}
continue;
@@ -583,7 +582,7 @@ void TShellCommand::TImpl::StartProcess(TShellCommand::TImpl::TPipes& pipes) {
}
if (!res) {
- AtomicSet(ExecutionStatus, SHELL_ERROR);
+ ExecutionStatus.store(SHELL_ERROR, std::memory_order_release);
/// @todo: write to error stream if set
TStringOutput out(CollectedError);
out << "Process was not created: " << LastSystemErrorText() << " command text was: '" << GetAString(cmdcopy.Data()) << "'";
@@ -723,7 +722,7 @@ void TShellCommand::TImpl::OnFork(TPipes& pipes, sigset_t oldmask, char* const*
#endif
void TShellCommand::TImpl::Run() {
- Y_ENSURE(AtomicGet(ExecutionStatus) != SHELL_RUNNING, TStringBuf("Process is already running"));
+ Y_ENSURE(ExecutionStatus.load(std::memory_order_acquire) != SHELL_RUNNING, TStringBuf("Process is already running"));
// Prepare I/O streams
CollectedOutput.clear();
CollectedError.clear();
@@ -739,7 +738,7 @@ void TShellCommand::TImpl::Run() {
TRealPipeHandle::Pipe(pipes.InputPipeFd[0], pipes.InputPipeFd[1], CloseOnExec);
}
- AtomicSet(ExecutionStatus, SHELL_RUNNING);
+ ExecutionStatus.store(SHELL_RUNNING, std::memory_order_release);
#if defined(_unix_)
// block all signals to avoid signal handler race after fork()
@@ -786,7 +785,7 @@ void TShellCommand::TImpl::Run() {
pid_t pid = fork();
if (pid == -1) {
- AtomicSet(ExecutionStatus, SHELL_ERROR);
+ ExecutionStatus.store(SHELL_ERROR, std::memory_order_release);
/// @todo check if pipes are still open
ythrow TSystemError() << "Cannot fork";
} else if (pid == 0) { // child
@@ -807,7 +806,7 @@ void TShellCommand::TImpl::Run() {
#endif
pipes.PrepareParents();
- if (AtomicGet(ExecutionStatus) != SHELL_RUNNING) {
+ if (ExecutionStatus.load(std::memory_order_acquire) != SHELL_RUNNING) {
return;
}
@@ -1002,7 +1001,7 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) {
if (!bytesToWrite) {
bytesToWrite = input->Read(inputBuffer.Data(), inputBuffer.Capacity());
if (bytesToWrite == 0) {
- if (AtomicGet(pi->Parent->Options_.ShouldCloseInput)) {
+ if (pi->Parent->Options_.ShouldCloseInput.load(std::memory_order_acquire)) {
input = nullptr;
}
continue;
@@ -1051,9 +1050,9 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) {
#endif
pi->Parent->ExitCode = processExitCode;
if (cleanExit) {
- AtomicSet(pi->Parent->ExecutionStatus, SHELL_FINISHED);
+ pi->Parent->ExecutionStatus.store(SHELL_FINISHED, std::memory_order_release);
} else {
- AtomicSet(pi->Parent->ExecutionStatus, SHELL_ERROR);
+ pi->Parent->ExecutionStatus.store(SHELL_ERROR, std::memory_order_release);
}
#if defined(_win_)
@@ -1076,7 +1075,7 @@ void TShellCommand::TImpl::Communicate(TProcessInfo* pi) {
#endif
} catch (const yexception& e) {
// Some error in watch occured, set result to error
- AtomicSet(pi->Parent->ExecutionStatus, SHELL_INTERNAL_ERROR);
+ pi->Parent->ExecutionStatus.store(SHELL_INTERNAL_ERROR, std::memory_order_release);
pi->Parent->InternalError = e.what();
if (input) {
pi->InputFd.Close();
diff --git a/util/system/shellcommand.h b/util/system/shellcommand.h
index 51b08e04b6..c7637821e4 100644
--- a/util/system/shellcommand.h
+++ b/util/system/shellcommand.h
@@ -14,7 +14,23 @@
#include "mutex.h"
#include <sys/types.h>
+#include <atomic>
+
class TShellCommandOptions {
+ class TCopyableAtomicBool: public std::atomic<bool> {
+ public:
+ using std::atomic<bool>::atomic;
+ TCopyableAtomicBool(const TCopyableAtomicBool& other)
+ : std::atomic<bool>(other.load(std::memory_order_acquire))
+ {
+ }
+
+ TCopyableAtomicBool& operator=(const TCopyableAtomicBool& other) {
+ this->store(other.load(std::memory_order_acquire), std::memory_order_release);
+ return *this;
+ }
+ };
+
public:
struct TUserOptions {
TString Name;
@@ -185,7 +201,7 @@ public:
* @return self
*/
inline TShellCommandOptions& SetCloseInput(bool val) {
- ShouldCloseInput = val;
+ ShouldCloseInput.store(val);
return *this;
}
@@ -307,7 +323,7 @@ public:
bool QuoteArguments = false;
bool DetachSession = false;
bool CloseStreams = false;
- TAtomic ShouldCloseInput = false;
+ TCopyableAtomicBool ShouldCloseInput = false;
EHandleMode InputMode = HANDLE_STREAM;
EHandleMode OutputMode = HANDLE_STREAM;
EHandleMode ErrorMode = HANDLE_STREAM;
diff --git a/util/system/tls.cpp b/util/system/tls.cpp
index 8e892a6d32..9443aec3a3 100644
--- a/util/system/tls.cpp
+++ b/util/system/tls.cpp
@@ -5,6 +5,8 @@
#include <util/generic/singleton.h>
#include <util/generic/vector.h>
+#include <atomic>
+
#if defined(_unix_)
#include <pthread.h>
#endif
@@ -12,10 +14,10 @@
using namespace NTls;
namespace {
- static inline TAtomicBase AcquireKey() {
- static TAtomic cur;
+ static inline size_t AcquireKey() {
+ static std::atomic<size_t> cur;
- return AtomicIncrement(cur) - (TAtomicBase)1;
+ return cur++;
}
class TGenericTlsBase {