aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/actor
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:15 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:15 +0300
commit72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch)
treeda2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/actor
parent778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff)
downloadydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/actor')
-rw-r--r--library/cpp/messagebus/actor/actor.h218
-rw-r--r--library/cpp/messagebus/actor/actor_ut.cpp26
-rw-r--r--library/cpp/messagebus/actor/executor.cpp140
-rw-r--r--library/cpp/messagebus/actor/executor.h130
-rw-r--r--library/cpp/messagebus/actor/queue_for_actor.h92
-rw-r--r--library/cpp/messagebus/actor/queue_in_actor.h144
-rw-r--r--library/cpp/messagebus/actor/ring_buffer.h4
-rw-r--r--library/cpp/messagebus/actor/ring_buffer_ut.cpp10
-rw-r--r--library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h4
-rw-r--r--library/cpp/messagebus/actor/tasks.h74
-rw-r--r--library/cpp/messagebus/actor/temp_tls_vector.h4
-rw-r--r--library/cpp/messagebus/actor/thread_extra.h40
-rw-r--r--library/cpp/messagebus/actor/what_thread_does.cpp10
-rw-r--r--library/cpp/messagebus/actor/what_thread_does_guard.h58
14 files changed, 477 insertions, 477 deletions
diff --git a/library/cpp/messagebus/actor/actor.h b/library/cpp/messagebus/actor/actor.h
index 9b8f20298a..1f5af55097 100644
--- a/library/cpp/messagebus/actor/actor.h
+++ b/library/cpp/messagebus/actor/actor.h
@@ -7,58 +7,58 @@
#include <util/system/yassert.h>
namespace NActor {
- class IActor: protected IWorkItem {
- public:
- // TODO: make private
- TTasks Tasks;
-
- public:
- virtual void ScheduleHereV() = 0;
- virtual void ScheduleV() = 0;
- virtual void ScheduleHereAtMostOnceV() = 0;
-
- // TODO: make private
- virtual void RefV() = 0;
- virtual void UnRefV() = 0;
-
- // mute warnings
- ~IActor() override {
- }
- };
-
- struct TDefaultTag {};
-
- template <typename TThis, typename TTag = TDefaultTag>
- class TActor: public IActor {
- private:
- TExecutor* const Executor;
-
- public:
- TActor(TExecutor* executor)
- : Executor(executor)
- {
- }
-
- void AddTaskFromActorLoop() {
- bool schedule = Tasks.AddTask();
- // TODO: check thread id
- Y_ASSERT(!schedule);
- }
-
- /**
+ class IActor: protected IWorkItem {
+ public:
+ // TODO: make private
+ TTasks Tasks;
+
+ public:
+ virtual void ScheduleHereV() = 0;
+ virtual void ScheduleV() = 0;
+ virtual void ScheduleHereAtMostOnceV() = 0;
+
+ // TODO: make private
+ virtual void RefV() = 0;
+ virtual void UnRefV() = 0;
+
+ // mute warnings
+ ~IActor() override {
+ }
+ };
+
+ struct TDefaultTag {};
+
+ template <typename TThis, typename TTag = TDefaultTag>
+ class TActor: public IActor {
+ private:
+ TExecutor* const Executor;
+
+ public:
+ TActor(TExecutor* executor)
+ : Executor(executor)
+ {
+ }
+
+ void AddTaskFromActorLoop() {
+ bool schedule = Tasks.AddTask();
+ // TODO: check thread id
+ Y_ASSERT(!schedule);
+ }
+
+ /**
* Schedule actor.
*
* If actor is sleeping, then actor will be executed right now.
* If actor is executing right now, it will be executed one more time.
* If this method is called multiple time, actor will be re-executed no more than one more time.
*/
- void Schedule() {
- if (Tasks.AddTask()) {
- EnqueueWork();
- }
+ void Schedule() {
+ if (Tasks.AddTask()) {
+ EnqueueWork();
+ }
}
- /**
+ /**
* Schedule actor, execute it in current thread.
*
* If actor is running, continue executing where it is executing.
@@ -66,79 +66,79 @@ namespace NActor {
*
* Operation is useful for tasks that are likely to complete quickly.
*/
- void ScheduleHere() {
- if (Tasks.AddTask()) {
- Loop();
- }
+ void ScheduleHere() {
+ if (Tasks.AddTask()) {
+ Loop();
+ }
}
- /**
+ /**
* Schedule actor, execute in current thread no more than once.
*
* If actor is running, continue executing where it is executing.
* If actor is sleeping, execute one iteration here, and if actor got new tasks,
* reschedule it in worker pool.
*/
- void ScheduleHereAtMostOnce() {
- if (Tasks.AddTask()) {
- bool fetched = Tasks.FetchTask();
- Y_VERIFY(fetched, "happens");
-
- DoAct();
-
- // if someone added more tasks, schedule them
- if (Tasks.FetchTask()) {
- bool added = Tasks.AddTask();
- Y_VERIFY(!added, "happens");
- EnqueueWork();
- }
- }
- }
-
- void ScheduleHereV() override {
- ScheduleHere();
- }
- void ScheduleV() override {
- Schedule();
- }
- void ScheduleHereAtMostOnceV() override {
- ScheduleHereAtMostOnce();
- }
- void RefV() override {
- GetThis()->Ref();
- }
- void UnRefV() override {
- GetThis()->UnRef();
- }
-
- private:
- TThis* GetThis() {
- return static_cast<TThis*>(this);
- }
-
- void EnqueueWork() {
- GetThis()->Ref();
- Executor->EnqueueWork({this});
- }
-
- void DoAct() {
- WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
-
- GetThis()->Act(TTag());
- }
-
- void Loop() {
- // TODO: limit number of iterations
- while (Tasks.FetchTask()) {
- DoAct();
+ void ScheduleHereAtMostOnce() {
+ if (Tasks.AddTask()) {
+ bool fetched = Tasks.FetchTask();
+ Y_VERIFY(fetched, "happens");
+
+ DoAct();
+
+ // if someone added more tasks, schedule them
+ if (Tasks.FetchTask()) {
+ bool added = Tasks.AddTask();
+ Y_VERIFY(!added, "happens");
+ EnqueueWork();
+ }
}
}
- void DoWork() override {
- Y_ASSERT(GetThis()->RefCount() >= 1);
- Loop();
- GetThis()->UnRef();
- }
- };
-
-}
+ void ScheduleHereV() override {
+ ScheduleHere();
+ }
+ void ScheduleV() override {
+ Schedule();
+ }
+ void ScheduleHereAtMostOnceV() override {
+ ScheduleHereAtMostOnce();
+ }
+ void RefV() override {
+ GetThis()->Ref();
+ }
+ void UnRefV() override {
+ GetThis()->UnRef();
+ }
+
+ private:
+ TThis* GetThis() {
+ return static_cast<TThis*>(this);
+ }
+
+ void EnqueueWork() {
+ GetThis()->Ref();
+ Executor->EnqueueWork({this});
+ }
+
+ void DoAct() {
+ WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
+
+ GetThis()->Act(TTag());
+ }
+
+ void Loop() {
+ // TODO: limit number of iterations
+ while (Tasks.FetchTask()) {
+ DoAct();
+ }
+ }
+
+ void DoWork() override {
+ Y_ASSERT(GetThis()->RefCount() >= 1);
+ Loop();
+ GetThis()->UnRef();
+ }
+ };
+
+}
diff --git a/library/cpp/messagebus/actor/actor_ut.cpp b/library/cpp/messagebus/actor/actor_ut.cpp
index b76ab55bfa..20dee53e2d 100644
--- a/library/cpp/messagebus/actor/actor_ut.cpp
+++ b/library/cpp/messagebus/actor/actor_ut.cpp
@@ -11,14 +11,14 @@
using namespace NActor;
template <typename TThis>
-struct TTestActorBase: public TAtomicRefCount<TThis>, public TActor<TThis> {
+struct TTestActorBase: public TAtomicRefCount<TThis>, public TActor<TThis> {
TTestSync Started;
TTestSync Acted;
TTestActorBase(TExecutor* executor)
: TActor<TThis>(executor)
- {
- }
+ {
+ }
void Act(TDefaultTag) {
Started.Inc();
@@ -27,23 +27,23 @@ struct TTestActorBase: public TAtomicRefCount<TThis>, public TActor<TThis> {
}
};
-struct TNopActor: public TTestActorBase<TNopActor> {
+struct TNopActor: public TTestActorBase<TNopActor> {
TObjectCounter<TNopActor> AllocCounter;
TNopActor(TExecutor* executor)
: TTestActorBase<TNopActor>(executor)
- {
- }
+ {
+ }
void Act2() {
}
};
-struct TWaitForSignalActor: public TTestActorBase<TWaitForSignalActor> {
+struct TWaitForSignalActor: public TTestActorBase<TWaitForSignalActor> {
TWaitForSignalActor(TExecutor* executor)
: TTestActorBase<TWaitForSignalActor>(executor)
- {
- }
+ {
+ }
TSystemEvent WaitFor;
@@ -52,7 +52,7 @@ struct TWaitForSignalActor: public TTestActorBase<TWaitForSignalActor> {
}
};
-struct TDecrementAndSendActor: public TTestActorBase<TDecrementAndSendActor>, public TQueueInActor<TDecrementAndSendActor, int> {
+struct TDecrementAndSendActor: public TTestActorBase<TDecrementAndSendActor>, public TQueueInActor<TDecrementAndSendActor, int> {
TSystemEvent Done;
TDecrementAndSendActor* Next;
@@ -60,8 +60,8 @@ struct TDecrementAndSendActor: public TTestActorBase<TDecrementAndSendActor>, pu
TDecrementAndSendActor(TExecutor* executor)
: TTestActorBase<TDecrementAndSendActor>(executor)
, Next(nullptr)
- {
- }
+ {
+ }
void ProcessItem(TDefaultTag, TDefaultTag, int n) {
if (n == 0) {
@@ -129,7 +129,7 @@ Y_UNIT_TEST_SUITE(TActor) {
TExecutor executor(queueSize);
- TVector<TIntrusivePtr<TDecrementAndSendActor>> actors;
+ TVector<TIntrusivePtr<TDecrementAndSendActor>> actors;
for (int i = 0; i < actorCount; ++i) {
actors.push_back(new TDecrementAndSendActor(&executor));
}
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp
index 7a2227a458..4c428754f3 100644
--- a/library/cpp/messagebus/actor/executor.cpp
+++ b/library/cpp/messagebus/actor/executor.cpp
@@ -20,10 +20,10 @@ namespace {
struct TRecord {
TAtomic MaxQueueSize;
- TRecord()
- : MaxQueueSize()
- {
- }
+ TRecord()
+ : MaxQueueSize()
+ {
+ }
TExecutorHistory::THistoryRecord Capture() {
TExecutorHistory::THistoryRecord r;
@@ -70,8 +70,8 @@ namespace {
}
-Y_POD_STATIC_THREAD(TExecutor*)
-ThreadCurrentExecutor;
+Y_POD_STATIC_THREAD(TExecutor*)
+ThreadCurrentExecutor;
static const char* NoLocation = "nowhere";
@@ -80,80 +80,80 @@ struct TExecutorWorkerThreadLocalData {
};
static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData;
-Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData)
-WorkerThreadLocalData;
+Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData)
+WorkerThreadLocalData;
namespace NActor {
- struct TExecutorWorker {
- TExecutor* const Executor;
- TThread Thread;
- const char** WhatThreadDoesLocation;
- TExecutorWorkerThreadLocalData* ThreadLocalData;
-
- TExecutorWorker(TExecutor* executor)
- : Executor(executor)
- , Thread(RunThreadProc, this)
- , WhatThreadDoesLocation(&NoLocation)
- , ThreadLocalData(&::WorkerNoThreadLocalData)
- {
- Thread.Start();
- }
-
- void Run() {
- WhatThreadDoesLocation = ::WhatThreadDoesLocation();
- AtomicSet(ThreadLocalData, &::WorkerThreadLocalData);
- WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
- Executor->RunWorker();
- }
-
- static void* RunThreadProc(void* thiz0) {
- TExecutorWorker* thiz = (TExecutorWorker*)thiz0;
- thiz->Run();
- return nullptr;
- }
- };
-
- struct TExecutor::TImpl {
- TExecutor* const Executor;
- THistoryInternal History;
+ struct TExecutorWorker {
+ TExecutor* const Executor;
+ TThread Thread;
+ const char** WhatThreadDoesLocation;
+ TExecutorWorkerThreadLocalData* ThreadLocalData;
+
+ TExecutorWorker(TExecutor* executor)
+ : Executor(executor)
+ , Thread(RunThreadProc, this)
+ , WhatThreadDoesLocation(&NoLocation)
+ , ThreadLocalData(&::WorkerNoThreadLocalData)
+ {
+ Thread.Start();
+ }
+
+ void Run() {
+ WhatThreadDoesLocation = ::WhatThreadDoesLocation();
+ AtomicSet(ThreadLocalData, &::WorkerThreadLocalData);
+ WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
+ Executor->RunWorker();
+ }
+
+ static void* RunThreadProc(void* thiz0) {
+ TExecutorWorker* thiz = (TExecutorWorker*)thiz0;
+ thiz->Run();
+ return nullptr;
+ }
+ };
+
+ struct TExecutor::TImpl {
+ TExecutor* const Executor;
+ THistoryInternal History;
TSystemEvent HelperStopSignal;
- TThread HelperThread;
+ TThread HelperThread;
- TImpl(TExecutor* executor)
- : Executor(executor)
- , HelperThread(HelperThreadProc, this)
- {
- }
+ TImpl(TExecutor* executor)
+ : Executor(executor)
+ , HelperThread(HelperThreadProc, this)
+ {
+ }
- void RunHelper() {
- ui64 nowSeconds = TInstant::Now().Seconds();
- for (;;) {
- TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000));
+ void RunHelper() {
+ ui64 nowSeconds = TInstant::Now().Seconds();
+ for (;;) {
+ TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000));
- if (HelperStopSignal.WaitD(nextStop)) {
- return;
- }
+ if (HelperStopSignal.WaitD(nextStop)) {
+ return;
+ }
- nowSeconds = nextStop.Seconds();
+ nowSeconds = nextStop.Seconds();
- THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds);
+ THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds);
- ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear();
- if (maxQueueSize > record.MaxQueueSize) {
- AtomicSet(record.MaxQueueSize, maxQueueSize);
- }
+ ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear();
+ if (maxQueueSize > record.MaxQueueSize) {
+ AtomicSet(record.MaxQueueSize, maxQueueSize);
+ }
}
}
- static void* HelperThreadProc(void* impl0) {
- TImpl* impl = (TImpl*)impl0;
- impl->RunHelper();
- return nullptr;
- }
- };
+ static void* HelperThreadProc(void* impl0) {
+ TImpl* impl = (TImpl*)impl0;
+ impl->RunHelper();
+ return nullptr;
+ }
+ };
-}
+}
static TExecutor::TConfig MakeConfig(unsigned workerCount) {
TExecutor::TConfig config;
@@ -296,9 +296,9 @@ TAutoPtr<IWorkItem> TExecutor::DequeueWork() {
WorkAvailable.Wait(WorkMutex);
}
}
-
- auto& wtls = TlsRef(WorkerThreadLocalData);
-
+
+ auto& wtls = TlsRef(WorkerThreadLocalData);
+
if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) {
RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize);
}
@@ -334,5 +334,5 @@ void TExecutor::RunWorker() {
RunWorkItem(wi);
}
- ThreadCurrentExecutor = (TExecutor*)nullptr;
+ ThreadCurrentExecutor = (TExecutor*)nullptr;
}
diff --git a/library/cpp/messagebus/actor/executor.h b/library/cpp/messagebus/actor/executor.h
index 7292d8be53..4b9bcb1da0 100644
--- a/library/cpp/messagebus/actor/executor.h
+++ b/library/cpp/messagebus/actor/executor.h
@@ -11,95 +11,95 @@
#include <util/thread/lfqueue.h>
namespace NActor {
- namespace NPrivate {
- struct TExecutorHistory {
- struct THistoryRecord {
- ui32 MaxQueueSize;
- };
- TVector<THistoryRecord> HistoryRecords;
- ui64 LastHistoryRecordSecond;
-
- ui64 FirstHistoryRecordSecond() const {
- return LastHistoryRecordSecond - HistoryRecords.size() + 1;
- }
+ namespace NPrivate {
+ struct TExecutorHistory {
+ struct THistoryRecord {
+ ui32 MaxQueueSize;
+ };
+ TVector<THistoryRecord> HistoryRecords;
+ ui64 LastHistoryRecordSecond;
+
+ ui64 FirstHistoryRecordSecond() const {
+ return LastHistoryRecordSecond - HistoryRecords.size() + 1;
+ }
+ };
+
+ struct TExecutorStatus {
+ size_t WorkQueueSize = 0;
+ TExecutorHistory History;
+ TString Status;
};
+ }
- struct TExecutorStatus {
- size_t WorkQueueSize = 0;
- TExecutorHistory History;
- TString Status;
- };
- }
-
- class IWorkItem {
- public:
- virtual ~IWorkItem() {
+ class IWorkItem {
+ public:
+ virtual ~IWorkItem() {
}
- virtual void DoWork(/* must release this */) = 0;
+ virtual void DoWork(/* must release this */) = 0;
};
- struct TExecutorWorker;
+ struct TExecutorWorker;
- class TExecutor: public TAtomicRefCount<TExecutor> {
- friend struct TExecutorWorker;
+ class TExecutor: public TAtomicRefCount<TExecutor> {
+ friend struct TExecutorWorker;
- public:
- struct TConfig {
- size_t WorkerCount;
- const char* Name;
+ public:
+ struct TConfig {
+ size_t WorkerCount;
+ const char* Name;
- TConfig()
- : WorkerCount(1)
- , Name()
- {
- }
- };
+ TConfig()
+ : WorkerCount(1)
+ , Name()
+ {
+ }
+ };
- private:
- struct TImpl;
- THolder<TImpl> Impl;
+ private:
+ struct TImpl;
+ THolder<TImpl> Impl;
- const TConfig Config;
+ const TConfig Config;
- TAtomic ExitWorkers;
+ TAtomic ExitWorkers;
- TVector<TAutoPtr<TExecutorWorker>> WorkerThreads;
+ TVector<TAutoPtr<TExecutorWorker>> WorkerThreads;
- TRingBufferWithSpinLock<IWorkItem*> WorkItems;
+ TRingBufferWithSpinLock<IWorkItem*> WorkItems;
- TMutex WorkMutex;
- TCondVar WorkAvailable;
+ TMutex WorkMutex;
+ TCondVar WorkAvailable;
- public:
- explicit TExecutor(size_t workerCount);
- TExecutor(const TConfig& config);
- ~TExecutor();
+ public:
+ explicit TExecutor(size_t workerCount);
+ TExecutor(const TConfig& config);
+ ~TExecutor();
- void Stop();
+ void Stop();
- void EnqueueWork(TArrayRef<IWorkItem* const> w);
+ void EnqueueWork(TArrayRef<IWorkItem* const> w);
- size_t GetWorkQueueSize() const;
- TString GetStatus() const;
- TString GetStatusSingleLine() const;
- NPrivate::TExecutorStatus GetStatusRecordInternal() const;
+ size_t GetWorkQueueSize() const;
+ TString GetStatus() const;
+ TString GetStatusSingleLine() const;
+ NPrivate::TExecutorStatus GetStatusRecordInternal() const;
- bool IsInExecutorThread() const;
+ bool IsInExecutorThread() const;
- private:
- void Init();
+ private:
+ void Init();
- TAutoPtr<IWorkItem> DequeueWork();
+ TAutoPtr<IWorkItem> DequeueWork();
- void ProcessWorkQueueHere();
+ void ProcessWorkQueueHere();
- inline void RunWorkItem(TAutoPtr<IWorkItem>);
+ inline void RunWorkItem(TAutoPtr<IWorkItem>);
- void RunWorker();
+ void RunWorker();
- ui32 GetMaxQueueSizeAndClear() const;
- };
+ ui32 GetMaxQueueSizeAndClear() const;
+ };
- using TExecutorPtr = TIntrusivePtr<TExecutor>;
+ using TExecutorPtr = TIntrusivePtr<TExecutor>;
-}
+}
diff --git a/library/cpp/messagebus/actor/queue_for_actor.h b/library/cpp/messagebus/actor/queue_for_actor.h
index 40fa536b82..f59f229b79 100644
--- a/library/cpp/messagebus/actor/queue_for_actor.h
+++ b/library/cpp/messagebus/actor/queue_for_actor.h
@@ -9,66 +9,66 @@
#include "temp_tls_vector.h"
namespace NActor {
- namespace NPrivate {
- struct TTagForTl {};
+ namespace NPrivate {
+ struct TTagForTl {};
- }
+ }
- template <typename T>
- class TQueueForActor {
- private:
- TLockFreeStack<T> Queue;
+ template <typename T>
+ class TQueueForActor {
+ private:
+ TLockFreeStack<T> Queue;
- public:
- ~TQueueForActor() {
- Y_VERIFY(Queue.IsEmpty());
- }
+ public:
+ ~TQueueForActor() {
+ Y_VERIFY(Queue.IsEmpty());
+ }
- bool IsEmpty() {
- return Queue.IsEmpty();
- }
+ bool IsEmpty() {
+ return Queue.IsEmpty();
+ }
- void Enqueue(const T& value) {
- Queue.Enqueue(value);
- }
+ void Enqueue(const T& value) {
+ Queue.Enqueue(value);
+ }
- template <typename TCollection>
- void EnqueueAll(const TCollection& all) {
- Queue.EnqueueAll(all);
- }
+ template <typename TCollection>
+ void EnqueueAll(const TCollection& all) {
+ Queue.EnqueueAll(all);
+ }
- void Clear() {
- TVector<T> tmp;
- Queue.DequeueAll(&tmp);
- }
+ void Clear() {
+ TVector<T> tmp;
+ Queue.DequeueAll(&tmp);
+ }
- template <typename TFunc>
- void DequeueAll(const TFunc& func
- // TODO: , std::enable_if_t<TFunctionParamCount<TFunc>::Value == 1>* = 0
- ) {
- TTempTlsVector<T> temp;
+ template <typename TFunc>
+ void DequeueAll(const TFunc& func
+ // TODO: , std::enable_if_t<TFunctionParamCount<TFunc>::Value == 1>* = 0
+ ) {
+ TTempTlsVector<T> temp;
- Queue.DequeueAllSingleConsumer(temp.GetVector());
+ Queue.DequeueAllSingleConsumer(temp.GetVector());
- for (typename TVector<T>::reverse_iterator i = temp.GetVector()->rbegin(); i != temp.GetVector()->rend(); ++i) {
- func(*i);
- }
+ for (typename TVector<T>::reverse_iterator i = temp.GetVector()->rbegin(); i != temp.GetVector()->rend(); ++i) {
+ func(*i);
+ }
- temp.Clear();
+ temp.Clear();
- if (temp.Capacity() * sizeof(T) > 64 * 1024) {
- temp.Shrink();
- }
+ if (temp.Capacity() * sizeof(T) > 64 * 1024) {
+ temp.Shrink();
+ }
}
- template <typename TFunc>
- void DequeueAllLikelyEmpty(const TFunc& func) {
- if (Y_LIKELY(IsEmpty())) {
- return;
- }
+ template <typename TFunc>
+ void DequeueAllLikelyEmpty(const TFunc& func) {
+ if (Y_LIKELY(IsEmpty())) {
+ return;
+ }
- DequeueAll(func);
+ DequeueAll(func);
}
- };
+ };
-}
+}
diff --git a/library/cpp/messagebus/actor/queue_in_actor.h b/library/cpp/messagebus/actor/queue_in_actor.h
index 9865996532..65110967aa 100644
--- a/library/cpp/messagebus/actor/queue_in_actor.h
+++ b/library/cpp/messagebus/actor/queue_in_actor.h
@@ -6,75 +6,75 @@
#include <functional>
namespace NActor {
- template <typename TItem>
- class IQueueInActor {
- public:
- virtual void EnqueueAndScheduleV(const TItem& item) = 0;
- virtual void DequeueAllV() = 0;
- virtual void DequeueAllLikelyEmptyV() = 0;
-
- virtual ~IQueueInActor() {
- }
- };
-
- template <typename TThis, typename TItem, typename TActorTag = TDefaultTag, typename TQueueTag = TDefaultTag>
- class TQueueInActor: public IQueueInActor<TItem> {
- typedef TQueueInActor<TThis, TItem, TActorTag, TQueueTag> TSelf;
-
- public:
- // TODO: make protected
- TQueueForActor<TItem> QueueInActor;
-
- private:
- TActor<TThis, TActorTag>* GetActor() {
- return GetThis();
- }
-
- TThis* GetThis() {
- return static_cast<TThis*>(this);
- }
-
- void ProcessItem(const TItem& item) {
- GetThis()->ProcessItem(TActorTag(), TQueueTag(), item);
- }
-
- public:
- void EnqueueAndNoSchedule(const TItem& item) {
- QueueInActor.Enqueue(item);
- }
-
- void EnqueueAndSchedule(const TItem& item) {
- EnqueueAndNoSchedule(item);
- GetActor()->Schedule();
- }
-
- void EnqueueAndScheduleV(const TItem& item) override {
- EnqueueAndSchedule(item);
- }
-
- void Clear() {
- QueueInActor.Clear();
- }
-
- void DequeueAll() {
- QueueInActor.DequeueAll(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1));
- }
-
- void DequeueAllV() override {
- return DequeueAll();
- }
-
- void DequeueAllLikelyEmpty() {
- QueueInActor.DequeueAllLikelyEmpty(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1));
- }
-
- void DequeueAllLikelyEmptyV() override {
- return DequeueAllLikelyEmpty();
- }
-
- bool IsEmpty() {
- return QueueInActor.IsEmpty();
- }
- };
-
-}
+ template <typename TItem>
+ class IQueueInActor {
+ public:
+ virtual void EnqueueAndScheduleV(const TItem& item) = 0;
+ virtual void DequeueAllV() = 0;
+ virtual void DequeueAllLikelyEmptyV() = 0;
+
+ virtual ~IQueueInActor() {
+ }
+ };
+
+ template <typename TThis, typename TItem, typename TActorTag = TDefaultTag, typename TQueueTag = TDefaultTag>
+ class TQueueInActor: public IQueueInActor<TItem> {
+ typedef TQueueInActor<TThis, TItem, TActorTag, TQueueTag> TSelf;
+
+ public:
+ // TODO: make protected
+ TQueueForActor<TItem> QueueInActor;
+
+ private:
+ TActor<TThis, TActorTag>* GetActor() {
+ return GetThis();
+ }
+
+ TThis* GetThis() {
+ return static_cast<TThis*>(this);
+ }
+
+ void ProcessItem(const TItem& item) {
+ GetThis()->ProcessItem(TActorTag(), TQueueTag(), item);
+ }
+
+ public:
+ void EnqueueAndNoSchedule(const TItem& item) {
+ QueueInActor.Enqueue(item);
+ }
+
+ void EnqueueAndSchedule(const TItem& item) {
+ EnqueueAndNoSchedule(item);
+ GetActor()->Schedule();
+ }
+
+ void EnqueueAndScheduleV(const TItem& item) override {
+ EnqueueAndSchedule(item);
+ }
+
+ void Clear() {
+ QueueInActor.Clear();
+ }
+
+ void DequeueAll() {
+ QueueInActor.DequeueAll(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1));
+ }
+
+ void DequeueAllV() override {
+ return DequeueAll();
+ }
+
+ void DequeueAllLikelyEmpty() {
+ QueueInActor.DequeueAllLikelyEmpty(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1));
+ }
+
+ void DequeueAllLikelyEmptyV() override {
+ return DequeueAllLikelyEmpty();
+ }
+
+ bool IsEmpty() {
+ return QueueInActor.IsEmpty();
+ }
+ };
+
+}
diff --git a/library/cpp/messagebus/actor/ring_buffer.h b/library/cpp/messagebus/actor/ring_buffer.h
index ec5706f7c7..8ca76bdb91 100644
--- a/library/cpp/messagebus/actor/ring_buffer.h
+++ b/library/cpp/messagebus/actor/ring_buffer.h
@@ -95,11 +95,11 @@ public:
}
for (size_t i = 0; i < firstSize; ++i) {
- Data[WritePos + i] = value[i];
+ Data[WritePos + i] = value[i];
}
for (size_t i = 0; i < secondSize; ++i) {
- Data[i] = value[firstSize + i];
+ Data[i] = value[firstSize + i];
}
WritePos = (WritePos + value.size()) & CapacityMask;
diff --git a/library/cpp/messagebus/actor/ring_buffer_ut.cpp b/library/cpp/messagebus/actor/ring_buffer_ut.cpp
index bdb379b3a9..27926b5f2c 100644
--- a/library/cpp/messagebus/actor/ring_buffer_ut.cpp
+++ b/library/cpp/messagebus/actor/ring_buffer_ut.cpp
@@ -11,11 +11,11 @@ Y_UNIT_TEST_SUITE(RingBuffer) {
unsigned NextPush;
unsigned NextPop;
- TRingBufferTester()
- : NextPush()
- , NextPop()
- {
- }
+ TRingBufferTester()
+ : NextPush()
+ , NextPop()
+ {
+ }
void Push() {
//Cerr << "push " << NextPush << "\n";
diff --git a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
index f0b7cd90e4..39e1d05506 100644
--- a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
+++ b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h
@@ -14,8 +14,8 @@ private:
public:
TRingBufferWithSpinLock()
: CachedSize(0)
- {
- }
+ {
+ }
void Push(const T& t) {
PushAll(t);
diff --git a/library/cpp/messagebus/actor/tasks.h b/library/cpp/messagebus/actor/tasks.h
index 31d35931d2..e3b3f0f504 100644
--- a/library/cpp/messagebus/actor/tasks.h
+++ b/library/cpp/messagebus/actor/tasks.h
@@ -4,45 +4,45 @@
#include <util/system/yassert.h>
namespace NActor {
- class TTasks {
- enum {
- // order of values is important
- E_WAITING,
- E_RUNNING_NO_TASKS,
- E_RUNNING_GOT_TASKS,
- };
-
- private:
- TAtomic State;
-
- public:
- TTasks()
- : State(E_WAITING)
- {
+ class TTasks {
+ enum {
+ // order of values is important
+ E_WAITING,
+ E_RUNNING_NO_TASKS,
+ E_RUNNING_GOT_TASKS,
+ };
+
+ private:
+ TAtomic State;
+
+ public:
+ TTasks()
+ : State(E_WAITING)
+ {
}
- // @return true iff caller have to either schedule task or execute it
- bool AddTask() {
- // High contention case optimization: AtomicGet is cheaper than AtomicSwap.
- if (E_RUNNING_GOT_TASKS == AtomicGet(State)) {
- return false;
- }
+ // @return true iff caller have to either schedule task or execute it
+ bool AddTask() {
+ // High contention case optimization: AtomicGet is cheaper than AtomicSwap.
+ if (E_RUNNING_GOT_TASKS == AtomicGet(State)) {
+ return false;
+ }
- TAtomicBase oldState = AtomicSwap(&State, E_RUNNING_GOT_TASKS);
- return oldState == E_WAITING;
+ TAtomicBase oldState = AtomicSwap(&State, E_RUNNING_GOT_TASKS);
+ return oldState == E_WAITING;
}
- // called by executor
- // @return true iff we have to recheck queues
- bool FetchTask() {
- TAtomicBase newState = AtomicDecrement(State);
- if (newState == E_RUNNING_NO_TASKS) {
- return true;
- } else if (newState == E_WAITING) {
- return false;
- }
- Y_FAIL("unknown");
- }
- };
-
-}
+ // called by executor
+ // @return true iff we have to recheck queues
+ bool FetchTask() {
+ TAtomicBase newState = AtomicDecrement(State);
+ if (newState == E_RUNNING_NO_TASKS) {
+ return true;
+ } else if (newState == E_WAITING) {
+ return false;
+ }
+ Y_FAIL("unknown");
+ }
+ };
+
+}
diff --git a/library/cpp/messagebus/actor/temp_tls_vector.h b/library/cpp/messagebus/actor/temp_tls_vector.h
index 675d92f5b0..5b94b2126f 100644
--- a/library/cpp/messagebus/actor/temp_tls_vector.h
+++ b/library/cpp/messagebus/actor/temp_tls_vector.h
@@ -5,13 +5,13 @@
#include <util/generic/vector.h>
#include <util/system/yassert.h>
-template <typename T, typename TTag = void, template <typename, class> class TVectorType = TVector>
+template <typename T, typename TTag = void, template <typename, class> class TVectorType = TVector>
class TTempTlsVector {
private:
struct TTagForTls {};
TVectorType<T, std::allocator<T>>* Vector;
-
+
public:
TVectorType<T, std::allocator<T>>* GetVector() {
return Vector;
diff --git a/library/cpp/messagebus/actor/thread_extra.h b/library/cpp/messagebus/actor/thread_extra.h
index b5aa151618..efd150a0b2 100644
--- a/library/cpp/messagebus/actor/thread_extra.h
+++ b/library/cpp/messagebus/actor/thread_extra.h
@@ -3,39 +3,39 @@
#include <util/thread/singleton.h>
namespace NTSAN {
- template <typename T>
- inline void RelaxedStore(volatile T* a, T x) {
- static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
+ template <typename T>
+ inline void RelaxedStore(volatile T* a, T x) {
+ static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
#ifdef _win_
- *a = x;
+ *a = x;
#else
- __atomic_store_n(a, x, __ATOMIC_RELAXED);
+ __atomic_store_n(a, x, __ATOMIC_RELAXED);
#endif
- }
+ }
- template <typename T>
- inline T RelaxedLoad(volatile T* a) {
+ template <typename T>
+ inline T RelaxedLoad(volatile T* a) {
#ifdef _win_
- return *a;
+ return *a;
#else
- return __atomic_load_n(a, __ATOMIC_RELAXED);
+ return __atomic_load_n(a, __ATOMIC_RELAXED);
#endif
- }
+ }
}
void SetCurrentThreadName(const char* name);
-namespace NThreadExtra {
- namespace NPrivate {
- template <typename TValue, typename TTag>
- struct TValueHolder {
- TValue Value;
- };
- }
-}
+namespace NThreadExtra {
+ namespace NPrivate {
+ template <typename TValue, typename TTag>
+ struct TValueHolder {
+ TValue Value;
+ };
+ }
+}
template <typename TValue, typename TTag>
static inline TValue* FastTlsSingletonWithTag() {
- return &FastTlsSingleton< ::NThreadExtra::NPrivate::TValueHolder<TValue, TTag>>()->Value;
+ return &FastTlsSingleton< ::NThreadExtra::NPrivate::TValueHolder<TValue, TTag>>()->Value;
}
diff --git a/library/cpp/messagebus/actor/what_thread_does.cpp b/library/cpp/messagebus/actor/what_thread_does.cpp
index bebb6a888c..5960093a6b 100644
--- a/library/cpp/messagebus/actor/what_thread_does.cpp
+++ b/library/cpp/messagebus/actor/what_thread_does.cpp
@@ -4,19 +4,19 @@
#include <util/system/tls.h>
-Y_POD_STATIC_THREAD(const char*)
-WhatThreadDoes;
+Y_POD_STATIC_THREAD(const char*)
+WhatThreadDoes;
-const char* PushWhatThreadDoes(const char* what) {
+const char* PushWhatThreadDoes(const char* what) {
const char* r = NTSAN::RelaxedLoad(&WhatThreadDoes);
NTSAN::RelaxedStore(&WhatThreadDoes, what);
return r;
}
-void PopWhatThreadDoes(const char* prev) {
+void PopWhatThreadDoes(const char* prev) {
NTSAN::RelaxedStore(&WhatThreadDoes, prev);
}
-const char** WhatThreadDoesLocation() {
+const char** WhatThreadDoesLocation() {
return &WhatThreadDoes;
}
diff --git a/library/cpp/messagebus/actor/what_thread_does_guard.h b/library/cpp/messagebus/actor/what_thread_does_guard.h
index f104e9e173..4fb0abef6d 100644
--- a/library/cpp/messagebus/actor/what_thread_does_guard.h
+++ b/library/cpp/messagebus/actor/what_thread_does_guard.h
@@ -4,37 +4,37 @@
template <class T>
class TWhatThreadDoesAcquireGuard: public TNonCopyable {
-public:
- inline TWhatThreadDoesAcquireGuard(const T& t, const char* acquire) noexcept {
- Init(&t, acquire);
- }
-
- inline TWhatThreadDoesAcquireGuard(const T* t, const char* acquire) noexcept {
- Init(t, acquire);
- }
-
- inline ~TWhatThreadDoesAcquireGuard() {
- Release();
- }
-
- inline void Release() noexcept {
- if (WasAcquired()) {
- const_cast<T*>(T_)->Release();
- T_ = nullptr;
+public:
+ inline TWhatThreadDoesAcquireGuard(const T& t, const char* acquire) noexcept {
+ Init(&t, acquire);
+ }
+
+ inline TWhatThreadDoesAcquireGuard(const T* t, const char* acquire) noexcept {
+ Init(t, acquire);
+ }
+
+ inline ~TWhatThreadDoesAcquireGuard() {
+ Release();
+ }
+
+ inline void Release() noexcept {
+ if (WasAcquired()) {
+ const_cast<T*>(T_)->Release();
+ T_ = nullptr;
}
- }
+ }
- inline bool WasAcquired() const noexcept {
- return T_ != nullptr;
- }
+ inline bool WasAcquired() const noexcept {
+ return T_ != nullptr;
+ }
-private:
- inline void Init(const T* t, const char* acquire) noexcept {
- T_ = const_cast<T*>(t);
- TWhatThreadDoesPushPop pp(acquire);
- T_->Acquire();
- }
+private:
+ inline void Init(const T* t, const char* acquire) noexcept {
+ T_ = const_cast<T*>(t);
+ TWhatThreadDoesPushPop pp(acquire);
+ T_->Acquire();
+ }
-private:
- T* T_;
+private:
+ T* T_;
};