aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus
diff options
context:
space:
mode:
authorvskipin <vskipin@yandex-team.ru>2022-02-10 16:46:00 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:46:00 +0300
commit4e4b78bd7b67e2533da4dbb9696374a6d6068e32 (patch)
treea7a5543d815c451256ece74081d960b4e1d70ec2 /library/cpp/messagebus
parent5b00ed04a5137a452fa6d3423cb0c9b54ac27408 (diff)
downloadydb-4e4b78bd7b67e2533da4dbb9696374a6d6068e32.tar.gz
Restoring authorship annotation for <vskipin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus')
-rw-r--r--library/cpp/messagebus/actor/executor.cpp8
-rw-r--r--library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h20
-rw-r--r--library/cpp/messagebus/event_loop.cpp8
-rw-r--r--library/cpp/messagebus/futex_like.h2
-rw-r--r--library/cpp/messagebus/misc/tokenquota.h2
5 files changed, 20 insertions, 20 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp
index 7a2227a458..d0ed3647fa 100644
--- a/library/cpp/messagebus/actor/executor.cpp
+++ b/library/cpp/messagebus/actor/executor.cpp
@@ -176,7 +176,7 @@ TExecutor::TExecutor(const TExecutor::TConfig& config)
void TExecutor::Init() {
Impl.Reset(new TImpl(this));
- AtomicSet(ExitWorkers, 0);
+ AtomicSet(ExitWorkers, 0);
Y_VERIFY(Config.WorkerCount > 0);
@@ -192,7 +192,7 @@ TExecutor::~TExecutor() {
}
void TExecutor::Stop() {
- AtomicSet(ExitWorkers, 1);
+ AtomicSet(ExitWorkers, 1);
Impl->HelperStopSignal.Signal();
Impl->HelperThread.Join();
@@ -214,7 +214,7 @@ void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) {
if (wis.empty())
return;
- if (Y_UNLIKELY(AtomicGet(ExitWorkers) != 0)) {
+ if (Y_UNLIKELY(AtomicGet(ExitWorkers) != 0)) {
Y_VERIFY(WorkItems.Empty(), "executor %s: cannot add tasks after queue shutdown", Config.Name);
}
@@ -289,7 +289,7 @@ TAutoPtr<IWorkItem> TExecutor::DequeueWork() {
if (!WorkItems.TryPop(&wi, &queueSize)) {
TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork");
while (!WorkItems.TryPop(&wi, &queueSize)) {
- if (AtomicGet(ExitWorkers) != 0)
+ if (AtomicGet(ExitWorkers) != 0)
return nullptr;
TWhatThreadDoesPushPop pp("waiting for work on condvar");
diff --git a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
index f0b7cd90e4..b49bfd6cfb 100644
--- a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
+++ b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
@@ -9,11 +9,11 @@ class TRingBufferWithSpinLock {
private:
TRingBuffer<T> RingBuffer;
TSpinLock SpinLock;
- TAtomic CachedSize;
-
+ TAtomic CachedSize;
+
public:
TRingBufferWithSpinLock()
- : CachedSize(0)
+ : CachedSize(0)
{
}
@@ -28,11 +28,11 @@ public:
TGuard<TSpinLock> Guard(SpinLock);
RingBuffer.PushAll(collection);
- AtomicSet(CachedSize, RingBuffer.Size());
+ AtomicSet(CachedSize, RingBuffer.Size());
}
bool TryPop(T* r, size_t* sizePtr = nullptr) {
- if (AtomicGet(CachedSize) == 0) {
+ if (AtomicGet(CachedSize) == 0) {
return false;
}
@@ -42,7 +42,7 @@ public:
TGuard<TSpinLock> Guard(SpinLock);
ok = RingBuffer.TryPop(r);
size = RingBuffer.Size();
- AtomicSet(CachedSize, size);
+ AtomicSet(CachedSize, size);
}
if (!!sizePtr) {
*sizePtr = size;
@@ -63,25 +63,25 @@ public:
if (collection.size() == 0) {
return TryPop(r);
} else {
- if (AtomicGet(CachedSize) == 0) {
+ if (AtomicGet(CachedSize) == 0) {
*r = collection[0];
if (collection.size() > 1) {
TGuard<TSpinLock> guard(SpinLock);
RingBuffer.PushAll(MakeArrayRef(collection.data() + 1, collection.size() - 1));
- AtomicSet(CachedSize, RingBuffer.Size());
+ AtomicSet(CachedSize, RingBuffer.Size());
}
} else {
TGuard<TSpinLock> guard(SpinLock);
RingBuffer.PushAll(collection);
*r = RingBuffer.Pop();
- AtomicSet(CachedSize, RingBuffer.Size());
+ AtomicSet(CachedSize, RingBuffer.Size());
}
return true;
}
}
bool Empty() const {
- return AtomicGet(CachedSize) == 0;
+ return AtomicGet(CachedSize) == 0;
}
size_t Size() const {
diff --git a/library/cpp/messagebus/event_loop.cpp b/library/cpp/messagebus/event_loop.cpp
index f685135bed..6946ccdea4 100644
--- a/library/cpp/messagebus/event_loop.cpp
+++ b/library/cpp/messagebus/event_loop.cpp
@@ -79,7 +79,7 @@ public:
const char* Name;
TAtomic RunningState;
- TAtomic StopSignal;
+ TAtomic StopSignal;
TSystemEvent StoppedEvent;
TData Data;
@@ -255,7 +255,7 @@ void TChannel::TImpl::CallHandler() {
TEventLoop::TImpl::TImpl(const char* name)
: Name(name)
, RunningState(EVENT_LOOP_CREATED)
- , StopSignal(0)
+ , StopSignal(0)
{
SOCKET wakeupSockets[2];
@@ -284,7 +284,7 @@ void TEventLoop::TImpl::Run() {
SetCurrentThreadName(Name);
}
- while (AtomicGet(StopSignal) == 0) {
+ while (AtomicGet(StopSignal) == 0) {
void* cookies[1024];
const size_t count = Poller.WaitI(cookies, Y_ARRAY_SIZE(cookies));
@@ -328,7 +328,7 @@ void TEventLoop::TImpl::Run() {
}
void TEventLoop::TImpl::Stop() {
- AtomicSet(StopSignal, 1);
+ AtomicSet(StopSignal, 1);
if (AtomicGet(RunningState) == EVENT_LOOP_RUNNING) {
Wakeup();
diff --git a/library/cpp/messagebus/futex_like.h b/library/cpp/messagebus/futex_like.h
index 31d60c60f1..f40594918f 100644
--- a/library/cpp/messagebus/futex_like.h
+++ b/library/cpp/messagebus/futex_like.h
@@ -39,7 +39,7 @@ public:
#ifdef _linux_
return __atomic_exchange_n(&Value, newValue, __ATOMIC_SEQ_CST);
#else
- return AtomicSwap(&Value, newValue);
+ return AtomicSwap(&Value, newValue);
#endif
}
#endif
diff --git a/library/cpp/messagebus/misc/tokenquota.h b/library/cpp/messagebus/misc/tokenquota.h
index 190547fa54..656da42b10 100644
--- a/library/cpp/messagebus/misc/tokenquota.h
+++ b/library/cpp/messagebus/misc/tokenquota.h
@@ -29,7 +29,7 @@ namespace NBus {
level = Max(TAtomicBase(level), TAtomicBase(1));
if (Enabled && (Acquired < level || force)) {
- Acquired += AtomicSwap(&Tokens_, 0);
+ Acquired += AtomicSwap(&Tokens_, 0);
}
return !Enabled || Acquired >= level;