diff options
author | Anton Samokhvalov <pg83@yandex.ru> | 2022-02-10 16:45:15 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:15 +0300 |
commit | 72cb13b4aff9bc9cf22e49251bc8fd143f82538f (patch) | |
tree | da2c34829458c7d4e74bdfbdf85dff449e9e7fb8 /library/cpp/messagebus/actor | |
parent | 778e51ba091dc39e7b7fcab2b9cf4dbedfb6f2b5 (diff) | |
download | ydb-72cb13b4aff9bc9cf22e49251bc8fd143f82538f.tar.gz |
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/actor')
-rw-r--r-- | library/cpp/messagebus/actor/actor.h | 218 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/actor_ut.cpp | 26 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/executor.cpp | 140 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/executor.h | 130 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/queue_for_actor.h | 92 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/queue_in_actor.h | 144 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/ring_buffer.h | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/ring_buffer_ut.cpp | 10 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/tasks.h | 74 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/temp_tls_vector.h | 4 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/thread_extra.h | 40 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/what_thread_does.cpp | 10 | ||||
-rw-r--r-- | library/cpp/messagebus/actor/what_thread_does_guard.h | 58 |
14 files changed, 477 insertions, 477 deletions
diff --git a/library/cpp/messagebus/actor/actor.h b/library/cpp/messagebus/actor/actor.h index 9b8f20298a..1f5af55097 100644 --- a/library/cpp/messagebus/actor/actor.h +++ b/library/cpp/messagebus/actor/actor.h @@ -7,58 +7,58 @@ #include <util/system/yassert.h> namespace NActor { - class IActor: protected IWorkItem { - public: - // TODO: make private - TTasks Tasks; - - public: - virtual void ScheduleHereV() = 0; - virtual void ScheduleV() = 0; - virtual void ScheduleHereAtMostOnceV() = 0; - - // TODO: make private - virtual void RefV() = 0; - virtual void UnRefV() = 0; - - // mute warnings - ~IActor() override { - } - }; - - struct TDefaultTag {}; - - template <typename TThis, typename TTag = TDefaultTag> - class TActor: public IActor { - private: - TExecutor* const Executor; - - public: - TActor(TExecutor* executor) - : Executor(executor) - { - } - - void AddTaskFromActorLoop() { - bool schedule = Tasks.AddTask(); - // TODO: check thread id - Y_ASSERT(!schedule); - } - - /** + class IActor: protected IWorkItem { + public: + // TODO: make private + TTasks Tasks; + + public: + virtual void ScheduleHereV() = 0; + virtual void ScheduleV() = 0; + virtual void ScheduleHereAtMostOnceV() = 0; + + // TODO: make private + virtual void RefV() = 0; + virtual void UnRefV() = 0; + + // mute warnings + ~IActor() override { + } + }; + + struct TDefaultTag {}; + + template <typename TThis, typename TTag = TDefaultTag> + class TActor: public IActor { + private: + TExecutor* const Executor; + + public: + TActor(TExecutor* executor) + : Executor(executor) + { + } + + void AddTaskFromActorLoop() { + bool schedule = Tasks.AddTask(); + // TODO: check thread id + Y_ASSERT(!schedule); + } + + /** * Schedule actor. * * If actor is sleeping, then actor will be executed right now. * If actor is executing right now, it will be executed one more time. * If this method is called multiple time, actor will be re-executed no more than one more time. */ - void Schedule() { - if (Tasks.AddTask()) { - EnqueueWork(); - } + void Schedule() { + if (Tasks.AddTask()) { + EnqueueWork(); + } } - /** + /** * Schedule actor, execute it in current thread. * * If actor is running, continue executing where it is executing. @@ -66,79 +66,79 @@ namespace NActor { * * Operation is useful for tasks that are likely to complete quickly. */ - void ScheduleHere() { - if (Tasks.AddTask()) { - Loop(); - } + void ScheduleHere() { + if (Tasks.AddTask()) { + Loop(); + } } - /** + /** * Schedule actor, execute in current thread no more than once. * * If actor is running, continue executing where it is executing. * If actor is sleeping, execute one iteration here, and if actor got new tasks, * reschedule it in worker pool. */ - void ScheduleHereAtMostOnce() { - if (Tasks.AddTask()) { - bool fetched = Tasks.FetchTask(); - Y_VERIFY(fetched, "happens"); - - DoAct(); - - // if someone added more tasks, schedule them - if (Tasks.FetchTask()) { - bool added = Tasks.AddTask(); - Y_VERIFY(!added, "happens"); - EnqueueWork(); - } - } - } - - void ScheduleHereV() override { - ScheduleHere(); - } - void ScheduleV() override { - Schedule(); - } - void ScheduleHereAtMostOnceV() override { - ScheduleHereAtMostOnce(); - } - void RefV() override { - GetThis()->Ref(); - } - void UnRefV() override { - GetThis()->UnRef(); - } - - private: - TThis* GetThis() { - return static_cast<TThis*>(this); - } - - void EnqueueWork() { - GetThis()->Ref(); - Executor->EnqueueWork({this}); - } - - void DoAct() { - WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); - - GetThis()->Act(TTag()); - } - - void Loop() { - // TODO: limit number of iterations - while (Tasks.FetchTask()) { - DoAct(); + void ScheduleHereAtMostOnce() { + if (Tasks.AddTask()) { + bool fetched = Tasks.FetchTask(); + Y_VERIFY(fetched, "happens"); + + DoAct(); + + // if someone added more tasks, schedule them + if (Tasks.FetchTask()) { + bool added = Tasks.AddTask(); + Y_VERIFY(!added, "happens"); + EnqueueWork(); + } } } - void DoWork() override { - Y_ASSERT(GetThis()->RefCount() >= 1); - Loop(); - GetThis()->UnRef(); - } - }; - -} + void ScheduleHereV() override { + ScheduleHere(); + } + void ScheduleV() override { + Schedule(); + } + void ScheduleHereAtMostOnceV() override { + ScheduleHereAtMostOnce(); + } + void RefV() override { + GetThis()->Ref(); + } + void UnRefV() override { + GetThis()->UnRef(); + } + + private: + TThis* GetThis() { + return static_cast<TThis*>(this); + } + + void EnqueueWork() { + GetThis()->Ref(); + Executor->EnqueueWork({this}); + } + + void DoAct() { + WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); + + GetThis()->Act(TTag()); + } + + void Loop() { + // TODO: limit number of iterations + while (Tasks.FetchTask()) { + DoAct(); + } + } + + void DoWork() override { + Y_ASSERT(GetThis()->RefCount() >= 1); + Loop(); + GetThis()->UnRef(); + } + }; + +} diff --git a/library/cpp/messagebus/actor/actor_ut.cpp b/library/cpp/messagebus/actor/actor_ut.cpp index b76ab55bfa..20dee53e2d 100644 --- a/library/cpp/messagebus/actor/actor_ut.cpp +++ b/library/cpp/messagebus/actor/actor_ut.cpp @@ -11,14 +11,14 @@ using namespace NActor; template <typename TThis> -struct TTestActorBase: public TAtomicRefCount<TThis>, public TActor<TThis> { +struct TTestActorBase: public TAtomicRefCount<TThis>, public TActor<TThis> { TTestSync Started; TTestSync Acted; TTestActorBase(TExecutor* executor) : TActor<TThis>(executor) - { - } + { + } void Act(TDefaultTag) { Started.Inc(); @@ -27,23 +27,23 @@ struct TTestActorBase: public TAtomicRefCount<TThis>, public TActor<TThis> { } }; -struct TNopActor: public TTestActorBase<TNopActor> { +struct TNopActor: public TTestActorBase<TNopActor> { TObjectCounter<TNopActor> AllocCounter; TNopActor(TExecutor* executor) : TTestActorBase<TNopActor>(executor) - { - } + { + } void Act2() { } }; -struct TWaitForSignalActor: public TTestActorBase<TWaitForSignalActor> { +struct TWaitForSignalActor: public TTestActorBase<TWaitForSignalActor> { TWaitForSignalActor(TExecutor* executor) : TTestActorBase<TWaitForSignalActor>(executor) - { - } + { + } TSystemEvent WaitFor; @@ -52,7 +52,7 @@ struct TWaitForSignalActor: public TTestActorBase<TWaitForSignalActor> { } }; -struct TDecrementAndSendActor: public TTestActorBase<TDecrementAndSendActor>, public TQueueInActor<TDecrementAndSendActor, int> { +struct TDecrementAndSendActor: public TTestActorBase<TDecrementAndSendActor>, public TQueueInActor<TDecrementAndSendActor, int> { TSystemEvent Done; TDecrementAndSendActor* Next; @@ -60,8 +60,8 @@ struct TDecrementAndSendActor: public TTestActorBase<TDecrementAndSendActor>, pu TDecrementAndSendActor(TExecutor* executor) : TTestActorBase<TDecrementAndSendActor>(executor) , Next(nullptr) - { - } + { + } void ProcessItem(TDefaultTag, TDefaultTag, int n) { if (n == 0) { @@ -129,7 +129,7 @@ Y_UNIT_TEST_SUITE(TActor) { TExecutor executor(queueSize); - TVector<TIntrusivePtr<TDecrementAndSendActor>> actors; + TVector<TIntrusivePtr<TDecrementAndSendActor>> actors; for (int i = 0; i < actorCount; ++i) { actors.push_back(new TDecrementAndSendActor(&executor)); } diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp index 7a2227a458..4c428754f3 100644 --- a/library/cpp/messagebus/actor/executor.cpp +++ b/library/cpp/messagebus/actor/executor.cpp @@ -20,10 +20,10 @@ namespace { struct TRecord { TAtomic MaxQueueSize; - TRecord() - : MaxQueueSize() - { - } + TRecord() + : MaxQueueSize() + { + } TExecutorHistory::THistoryRecord Capture() { TExecutorHistory::THistoryRecord r; @@ -70,8 +70,8 @@ namespace { } -Y_POD_STATIC_THREAD(TExecutor*) -ThreadCurrentExecutor; +Y_POD_STATIC_THREAD(TExecutor*) +ThreadCurrentExecutor; static const char* NoLocation = "nowhere"; @@ -80,80 +80,80 @@ struct TExecutorWorkerThreadLocalData { }; static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData; -Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData) -WorkerThreadLocalData; +Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData) +WorkerThreadLocalData; namespace NActor { - struct TExecutorWorker { - TExecutor* const Executor; - TThread Thread; - const char** WhatThreadDoesLocation; - TExecutorWorkerThreadLocalData* ThreadLocalData; - - TExecutorWorker(TExecutor* executor) - : Executor(executor) - , Thread(RunThreadProc, this) - , WhatThreadDoesLocation(&NoLocation) - , ThreadLocalData(&::WorkerNoThreadLocalData) - { - Thread.Start(); - } - - void Run() { - WhatThreadDoesLocation = ::WhatThreadDoesLocation(); - AtomicSet(ThreadLocalData, &::WorkerThreadLocalData); - WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); - Executor->RunWorker(); - } - - static void* RunThreadProc(void* thiz0) { - TExecutorWorker* thiz = (TExecutorWorker*)thiz0; - thiz->Run(); - return nullptr; - } - }; - - struct TExecutor::TImpl { - TExecutor* const Executor; - THistoryInternal History; + struct TExecutorWorker { + TExecutor* const Executor; + TThread Thread; + const char** WhatThreadDoesLocation; + TExecutorWorkerThreadLocalData* ThreadLocalData; + + TExecutorWorker(TExecutor* executor) + : Executor(executor) + , Thread(RunThreadProc, this) + , WhatThreadDoesLocation(&NoLocation) + , ThreadLocalData(&::WorkerNoThreadLocalData) + { + Thread.Start(); + } + + void Run() { + WhatThreadDoesLocation = ::WhatThreadDoesLocation(); + AtomicSet(ThreadLocalData, &::WorkerThreadLocalData); + WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); + Executor->RunWorker(); + } + + static void* RunThreadProc(void* thiz0) { + TExecutorWorker* thiz = (TExecutorWorker*)thiz0; + thiz->Run(); + return nullptr; + } + }; + + struct TExecutor::TImpl { + TExecutor* const Executor; + THistoryInternal History; TSystemEvent HelperStopSignal; - TThread HelperThread; + TThread HelperThread; - TImpl(TExecutor* executor) - : Executor(executor) - , HelperThread(HelperThreadProc, this) - { - } + TImpl(TExecutor* executor) + : Executor(executor) + , HelperThread(HelperThreadProc, this) + { + } - void RunHelper() { - ui64 nowSeconds = TInstant::Now().Seconds(); - for (;;) { - TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000)); + void RunHelper() { + ui64 nowSeconds = TInstant::Now().Seconds(); + for (;;) { + TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000)); - if (HelperStopSignal.WaitD(nextStop)) { - return; - } + if (HelperStopSignal.WaitD(nextStop)) { + return; + } - nowSeconds = nextStop.Seconds(); + nowSeconds = nextStop.Seconds(); - THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds); + THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds); - ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear(); - if (maxQueueSize > record.MaxQueueSize) { - AtomicSet(record.MaxQueueSize, maxQueueSize); - } + ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear(); + if (maxQueueSize > record.MaxQueueSize) { + AtomicSet(record.MaxQueueSize, maxQueueSize); + } } } - static void* HelperThreadProc(void* impl0) { - TImpl* impl = (TImpl*)impl0; - impl->RunHelper(); - return nullptr; - } - }; + static void* HelperThreadProc(void* impl0) { + TImpl* impl = (TImpl*)impl0; + impl->RunHelper(); + return nullptr; + } + }; -} +} static TExecutor::TConfig MakeConfig(unsigned workerCount) { TExecutor::TConfig config; @@ -296,9 +296,9 @@ TAutoPtr<IWorkItem> TExecutor::DequeueWork() { WorkAvailable.Wait(WorkMutex); } } - - auto& wtls = TlsRef(WorkerThreadLocalData); - + + auto& wtls = TlsRef(WorkerThreadLocalData); + if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) { RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize); } @@ -334,5 +334,5 @@ void TExecutor::RunWorker() { RunWorkItem(wi); } - ThreadCurrentExecutor = (TExecutor*)nullptr; + ThreadCurrentExecutor = (TExecutor*)nullptr; } diff --git a/library/cpp/messagebus/actor/executor.h b/library/cpp/messagebus/actor/executor.h index 7292d8be53..4b9bcb1da0 100644 --- a/library/cpp/messagebus/actor/executor.h +++ b/library/cpp/messagebus/actor/executor.h @@ -11,95 +11,95 @@ #include <util/thread/lfqueue.h> namespace NActor { - namespace NPrivate { - struct TExecutorHistory { - struct THistoryRecord { - ui32 MaxQueueSize; - }; - TVector<THistoryRecord> HistoryRecords; - ui64 LastHistoryRecordSecond; - - ui64 FirstHistoryRecordSecond() const { - return LastHistoryRecordSecond - HistoryRecords.size() + 1; - } + namespace NPrivate { + struct TExecutorHistory { + struct THistoryRecord { + ui32 MaxQueueSize; + }; + TVector<THistoryRecord> HistoryRecords; + ui64 LastHistoryRecordSecond; + + ui64 FirstHistoryRecordSecond() const { + return LastHistoryRecordSecond - HistoryRecords.size() + 1; + } + }; + + struct TExecutorStatus { + size_t WorkQueueSize = 0; + TExecutorHistory History; + TString Status; }; + } - struct TExecutorStatus { - size_t WorkQueueSize = 0; - TExecutorHistory History; - TString Status; - }; - } - - class IWorkItem { - public: - virtual ~IWorkItem() { + class IWorkItem { + public: + virtual ~IWorkItem() { } - virtual void DoWork(/* must release this */) = 0; + virtual void DoWork(/* must release this */) = 0; }; - struct TExecutorWorker; + struct TExecutorWorker; - class TExecutor: public TAtomicRefCount<TExecutor> { - friend struct TExecutorWorker; + class TExecutor: public TAtomicRefCount<TExecutor> { + friend struct TExecutorWorker; - public: - struct TConfig { - size_t WorkerCount; - const char* Name; + public: + struct TConfig { + size_t WorkerCount; + const char* Name; - TConfig() - : WorkerCount(1) - , Name() - { - } - }; + TConfig() + : WorkerCount(1) + , Name() + { + } + }; - private: - struct TImpl; - THolder<TImpl> Impl; + private: + struct TImpl; + THolder<TImpl> Impl; - const TConfig Config; + const TConfig Config; - TAtomic ExitWorkers; + TAtomic ExitWorkers; - TVector<TAutoPtr<TExecutorWorker>> WorkerThreads; + TVector<TAutoPtr<TExecutorWorker>> WorkerThreads; - TRingBufferWithSpinLock<IWorkItem*> WorkItems; + TRingBufferWithSpinLock<IWorkItem*> WorkItems; - TMutex WorkMutex; - TCondVar WorkAvailable; + TMutex WorkMutex; + TCondVar WorkAvailable; - public: - explicit TExecutor(size_t workerCount); - TExecutor(const TConfig& config); - ~TExecutor(); + public: + explicit TExecutor(size_t workerCount); + TExecutor(const TConfig& config); + ~TExecutor(); - void Stop(); + void Stop(); - void EnqueueWork(TArrayRef<IWorkItem* const> w); + void EnqueueWork(TArrayRef<IWorkItem* const> w); - size_t GetWorkQueueSize() const; - TString GetStatus() const; - TString GetStatusSingleLine() const; - NPrivate::TExecutorStatus GetStatusRecordInternal() const; + size_t GetWorkQueueSize() const; + TString GetStatus() const; + TString GetStatusSingleLine() const; + NPrivate::TExecutorStatus GetStatusRecordInternal() const; - bool IsInExecutorThread() const; + bool IsInExecutorThread() const; - private: - void Init(); + private: + void Init(); - TAutoPtr<IWorkItem> DequeueWork(); + TAutoPtr<IWorkItem> DequeueWork(); - void ProcessWorkQueueHere(); + void ProcessWorkQueueHere(); - inline void RunWorkItem(TAutoPtr<IWorkItem>); + inline void RunWorkItem(TAutoPtr<IWorkItem>); - void RunWorker(); + void RunWorker(); - ui32 GetMaxQueueSizeAndClear() const; - }; + ui32 GetMaxQueueSizeAndClear() const; + }; - using TExecutorPtr = TIntrusivePtr<TExecutor>; + using TExecutorPtr = TIntrusivePtr<TExecutor>; -} +} diff --git a/library/cpp/messagebus/actor/queue_for_actor.h b/library/cpp/messagebus/actor/queue_for_actor.h index 40fa536b82..f59f229b79 100644 --- a/library/cpp/messagebus/actor/queue_for_actor.h +++ b/library/cpp/messagebus/actor/queue_for_actor.h @@ -9,66 +9,66 @@ #include "temp_tls_vector.h" namespace NActor { - namespace NPrivate { - struct TTagForTl {}; + namespace NPrivate { + struct TTagForTl {}; - } + } - template <typename T> - class TQueueForActor { - private: - TLockFreeStack<T> Queue; + template <typename T> + class TQueueForActor { + private: + TLockFreeStack<T> Queue; - public: - ~TQueueForActor() { - Y_VERIFY(Queue.IsEmpty()); - } + public: + ~TQueueForActor() { + Y_VERIFY(Queue.IsEmpty()); + } - bool IsEmpty() { - return Queue.IsEmpty(); - } + bool IsEmpty() { + return Queue.IsEmpty(); + } - void Enqueue(const T& value) { - Queue.Enqueue(value); - } + void Enqueue(const T& value) { + Queue.Enqueue(value); + } - template <typename TCollection> - void EnqueueAll(const TCollection& all) { - Queue.EnqueueAll(all); - } + template <typename TCollection> + void EnqueueAll(const TCollection& all) { + Queue.EnqueueAll(all); + } - void Clear() { - TVector<T> tmp; - Queue.DequeueAll(&tmp); - } + void Clear() { + TVector<T> tmp; + Queue.DequeueAll(&tmp); + } - template <typename TFunc> - void DequeueAll(const TFunc& func - // TODO: , std::enable_if_t<TFunctionParamCount<TFunc>::Value == 1>* = 0 - ) { - TTempTlsVector<T> temp; + template <typename TFunc> + void DequeueAll(const TFunc& func + // TODO: , std::enable_if_t<TFunctionParamCount<TFunc>::Value == 1>* = 0 + ) { + TTempTlsVector<T> temp; - Queue.DequeueAllSingleConsumer(temp.GetVector()); + Queue.DequeueAllSingleConsumer(temp.GetVector()); - for (typename TVector<T>::reverse_iterator i = temp.GetVector()->rbegin(); i != temp.GetVector()->rend(); ++i) { - func(*i); - } + for (typename TVector<T>::reverse_iterator i = temp.GetVector()->rbegin(); i != temp.GetVector()->rend(); ++i) { + func(*i); + } - temp.Clear(); + temp.Clear(); - if (temp.Capacity() * sizeof(T) > 64 * 1024) { - temp.Shrink(); - } + if (temp.Capacity() * sizeof(T) > 64 * 1024) { + temp.Shrink(); + } } - template <typename TFunc> - void DequeueAllLikelyEmpty(const TFunc& func) { - if (Y_LIKELY(IsEmpty())) { - return; - } + template <typename TFunc> + void DequeueAllLikelyEmpty(const TFunc& func) { + if (Y_LIKELY(IsEmpty())) { + return; + } - DequeueAll(func); + DequeueAll(func); } - }; + }; -} +} diff --git a/library/cpp/messagebus/actor/queue_in_actor.h b/library/cpp/messagebus/actor/queue_in_actor.h index 9865996532..65110967aa 100644 --- a/library/cpp/messagebus/actor/queue_in_actor.h +++ b/library/cpp/messagebus/actor/queue_in_actor.h @@ -6,75 +6,75 @@ #include <functional> namespace NActor { - template <typename TItem> - class IQueueInActor { - public: - virtual void EnqueueAndScheduleV(const TItem& item) = 0; - virtual void DequeueAllV() = 0; - virtual void DequeueAllLikelyEmptyV() = 0; - - virtual ~IQueueInActor() { - } - }; - - template <typename TThis, typename TItem, typename TActorTag = TDefaultTag, typename TQueueTag = TDefaultTag> - class TQueueInActor: public IQueueInActor<TItem> { - typedef TQueueInActor<TThis, TItem, TActorTag, TQueueTag> TSelf; - - public: - // TODO: make protected - TQueueForActor<TItem> QueueInActor; - - private: - TActor<TThis, TActorTag>* GetActor() { - return GetThis(); - } - - TThis* GetThis() { - return static_cast<TThis*>(this); - } - - void ProcessItem(const TItem& item) { - GetThis()->ProcessItem(TActorTag(), TQueueTag(), item); - } - - public: - void EnqueueAndNoSchedule(const TItem& item) { - QueueInActor.Enqueue(item); - } - - void EnqueueAndSchedule(const TItem& item) { - EnqueueAndNoSchedule(item); - GetActor()->Schedule(); - } - - void EnqueueAndScheduleV(const TItem& item) override { - EnqueueAndSchedule(item); - } - - void Clear() { - QueueInActor.Clear(); - } - - void DequeueAll() { - QueueInActor.DequeueAll(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1)); - } - - void DequeueAllV() override { - return DequeueAll(); - } - - void DequeueAllLikelyEmpty() { - QueueInActor.DequeueAllLikelyEmpty(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1)); - } - - void DequeueAllLikelyEmptyV() override { - return DequeueAllLikelyEmpty(); - } - - bool IsEmpty() { - return QueueInActor.IsEmpty(); - } - }; - -} + template <typename TItem> + class IQueueInActor { + public: + virtual void EnqueueAndScheduleV(const TItem& item) = 0; + virtual void DequeueAllV() = 0; + virtual void DequeueAllLikelyEmptyV() = 0; + + virtual ~IQueueInActor() { + } + }; + + template <typename TThis, typename TItem, typename TActorTag = TDefaultTag, typename TQueueTag = TDefaultTag> + class TQueueInActor: public IQueueInActor<TItem> { + typedef TQueueInActor<TThis, TItem, TActorTag, TQueueTag> TSelf; + + public: + // TODO: make protected + TQueueForActor<TItem> QueueInActor; + + private: + TActor<TThis, TActorTag>* GetActor() { + return GetThis(); + } + + TThis* GetThis() { + return static_cast<TThis*>(this); + } + + void ProcessItem(const TItem& item) { + GetThis()->ProcessItem(TActorTag(), TQueueTag(), item); + } + + public: + void EnqueueAndNoSchedule(const TItem& item) { + QueueInActor.Enqueue(item); + } + + void EnqueueAndSchedule(const TItem& item) { + EnqueueAndNoSchedule(item); + GetActor()->Schedule(); + } + + void EnqueueAndScheduleV(const TItem& item) override { + EnqueueAndSchedule(item); + } + + void Clear() { + QueueInActor.Clear(); + } + + void DequeueAll() { + QueueInActor.DequeueAll(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1)); + } + + void DequeueAllV() override { + return DequeueAll(); + } + + void DequeueAllLikelyEmpty() { + QueueInActor.DequeueAllLikelyEmpty(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1)); + } + + void DequeueAllLikelyEmptyV() override { + return DequeueAllLikelyEmpty(); + } + + bool IsEmpty() { + return QueueInActor.IsEmpty(); + } + }; + +} diff --git a/library/cpp/messagebus/actor/ring_buffer.h b/library/cpp/messagebus/actor/ring_buffer.h index ec5706f7c7..8ca76bdb91 100644 --- a/library/cpp/messagebus/actor/ring_buffer.h +++ b/library/cpp/messagebus/actor/ring_buffer.h @@ -95,11 +95,11 @@ public: } for (size_t i = 0; i < firstSize; ++i) { - Data[WritePos + i] = value[i]; + Data[WritePos + i] = value[i]; } for (size_t i = 0; i < secondSize; ++i) { - Data[i] = value[firstSize + i]; + Data[i] = value[firstSize + i]; } WritePos = (WritePos + value.size()) & CapacityMask; diff --git a/library/cpp/messagebus/actor/ring_buffer_ut.cpp b/library/cpp/messagebus/actor/ring_buffer_ut.cpp index bdb379b3a9..27926b5f2c 100644 --- a/library/cpp/messagebus/actor/ring_buffer_ut.cpp +++ b/library/cpp/messagebus/actor/ring_buffer_ut.cpp @@ -11,11 +11,11 @@ Y_UNIT_TEST_SUITE(RingBuffer) { unsigned NextPush; unsigned NextPop; - TRingBufferTester() - : NextPush() - , NextPop() - { - } + TRingBufferTester() + : NextPush() + , NextPop() + { + } void Push() { //Cerr << "push " << NextPush << "\n"; diff --git a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h index f0b7cd90e4..39e1d05506 100644 --- a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h +++ b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h @@ -14,8 +14,8 @@ private: public: TRingBufferWithSpinLock() : CachedSize(0) - { - } + { + } void Push(const T& t) { PushAll(t); diff --git a/library/cpp/messagebus/actor/tasks.h b/library/cpp/messagebus/actor/tasks.h index 31d35931d2..e3b3f0f504 100644 --- a/library/cpp/messagebus/actor/tasks.h +++ b/library/cpp/messagebus/actor/tasks.h @@ -4,45 +4,45 @@ #include <util/system/yassert.h> namespace NActor { - class TTasks { - enum { - // order of values is important - E_WAITING, - E_RUNNING_NO_TASKS, - E_RUNNING_GOT_TASKS, - }; - - private: - TAtomic State; - - public: - TTasks() - : State(E_WAITING) - { + class TTasks { + enum { + // order of values is important + E_WAITING, + E_RUNNING_NO_TASKS, + E_RUNNING_GOT_TASKS, + }; + + private: + TAtomic State; + + public: + TTasks() + : State(E_WAITING) + { } - // @return true iff caller have to either schedule task or execute it - bool AddTask() { - // High contention case optimization: AtomicGet is cheaper than AtomicSwap. - if (E_RUNNING_GOT_TASKS == AtomicGet(State)) { - return false; - } + // @return true iff caller have to either schedule task or execute it + bool AddTask() { + // High contention case optimization: AtomicGet is cheaper than AtomicSwap. + if (E_RUNNING_GOT_TASKS == AtomicGet(State)) { + return false; + } - TAtomicBase oldState = AtomicSwap(&State, E_RUNNING_GOT_TASKS); - return oldState == E_WAITING; + TAtomicBase oldState = AtomicSwap(&State, E_RUNNING_GOT_TASKS); + return oldState == E_WAITING; } - // called by executor - // @return true iff we have to recheck queues - bool FetchTask() { - TAtomicBase newState = AtomicDecrement(State); - if (newState == E_RUNNING_NO_TASKS) { - return true; - } else if (newState == E_WAITING) { - return false; - } - Y_FAIL("unknown"); - } - }; - -} + // called by executor + // @return true iff we have to recheck queues + bool FetchTask() { + TAtomicBase newState = AtomicDecrement(State); + if (newState == E_RUNNING_NO_TASKS) { + return true; + } else if (newState == E_WAITING) { + return false; + } + Y_FAIL("unknown"); + } + }; + +} diff --git a/library/cpp/messagebus/actor/temp_tls_vector.h b/library/cpp/messagebus/actor/temp_tls_vector.h index 675d92f5b0..5b94b2126f 100644 --- a/library/cpp/messagebus/actor/temp_tls_vector.h +++ b/library/cpp/messagebus/actor/temp_tls_vector.h @@ -5,13 +5,13 @@ #include <util/generic/vector.h> #include <util/system/yassert.h> -template <typename T, typename TTag = void, template <typename, class> class TVectorType = TVector> +template <typename T, typename TTag = void, template <typename, class> class TVectorType = TVector> class TTempTlsVector { private: struct TTagForTls {}; TVectorType<T, std::allocator<T>>* Vector; - + public: TVectorType<T, std::allocator<T>>* GetVector() { return Vector; diff --git a/library/cpp/messagebus/actor/thread_extra.h b/library/cpp/messagebus/actor/thread_extra.h index b5aa151618..efd150a0b2 100644 --- a/library/cpp/messagebus/actor/thread_extra.h +++ b/library/cpp/messagebus/actor/thread_extra.h @@ -3,39 +3,39 @@ #include <util/thread/singleton.h> namespace NTSAN { - template <typename T> - inline void RelaxedStore(volatile T* a, T x) { - static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); + template <typename T> + inline void RelaxedStore(volatile T* a, T x) { + static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value"); #ifdef _win_ - *a = x; + *a = x; #else - __atomic_store_n(a, x, __ATOMIC_RELAXED); + __atomic_store_n(a, x, __ATOMIC_RELAXED); #endif - } + } - template <typename T> - inline T RelaxedLoad(volatile T* a) { + template <typename T> + inline T RelaxedLoad(volatile T* a) { #ifdef _win_ - return *a; + return *a; #else - return __atomic_load_n(a, __ATOMIC_RELAXED); + return __atomic_load_n(a, __ATOMIC_RELAXED); #endif - } + } } void SetCurrentThreadName(const char* name); -namespace NThreadExtra { - namespace NPrivate { - template <typename TValue, typename TTag> - struct TValueHolder { - TValue Value; - }; - } -} +namespace NThreadExtra { + namespace NPrivate { + template <typename TValue, typename TTag> + struct TValueHolder { + TValue Value; + }; + } +} template <typename TValue, typename TTag> static inline TValue* FastTlsSingletonWithTag() { - return &FastTlsSingleton< ::NThreadExtra::NPrivate::TValueHolder<TValue, TTag>>()->Value; + return &FastTlsSingleton< ::NThreadExtra::NPrivate::TValueHolder<TValue, TTag>>()->Value; } diff --git a/library/cpp/messagebus/actor/what_thread_does.cpp b/library/cpp/messagebus/actor/what_thread_does.cpp index bebb6a888c..5960093a6b 100644 --- a/library/cpp/messagebus/actor/what_thread_does.cpp +++ b/library/cpp/messagebus/actor/what_thread_does.cpp @@ -4,19 +4,19 @@ #include <util/system/tls.h> -Y_POD_STATIC_THREAD(const char*) -WhatThreadDoes; +Y_POD_STATIC_THREAD(const char*) +WhatThreadDoes; -const char* PushWhatThreadDoes(const char* what) { +const char* PushWhatThreadDoes(const char* what) { const char* r = NTSAN::RelaxedLoad(&WhatThreadDoes); NTSAN::RelaxedStore(&WhatThreadDoes, what); return r; } -void PopWhatThreadDoes(const char* prev) { +void PopWhatThreadDoes(const char* prev) { NTSAN::RelaxedStore(&WhatThreadDoes, prev); } -const char** WhatThreadDoesLocation() { +const char** WhatThreadDoesLocation() { return &WhatThreadDoes; } diff --git a/library/cpp/messagebus/actor/what_thread_does_guard.h b/library/cpp/messagebus/actor/what_thread_does_guard.h index f104e9e173..4fb0abef6d 100644 --- a/library/cpp/messagebus/actor/what_thread_does_guard.h +++ b/library/cpp/messagebus/actor/what_thread_does_guard.h @@ -4,37 +4,37 @@ template <class T> class TWhatThreadDoesAcquireGuard: public TNonCopyable { -public: - inline TWhatThreadDoesAcquireGuard(const T& t, const char* acquire) noexcept { - Init(&t, acquire); - } - - inline TWhatThreadDoesAcquireGuard(const T* t, const char* acquire) noexcept { - Init(t, acquire); - } - - inline ~TWhatThreadDoesAcquireGuard() { - Release(); - } - - inline void Release() noexcept { - if (WasAcquired()) { - const_cast<T*>(T_)->Release(); - T_ = nullptr; +public: + inline TWhatThreadDoesAcquireGuard(const T& t, const char* acquire) noexcept { + Init(&t, acquire); + } + + inline TWhatThreadDoesAcquireGuard(const T* t, const char* acquire) noexcept { + Init(t, acquire); + } + + inline ~TWhatThreadDoesAcquireGuard() { + Release(); + } + + inline void Release() noexcept { + if (WasAcquired()) { + const_cast<T*>(T_)->Release(); + T_ = nullptr; } - } + } - inline bool WasAcquired() const noexcept { - return T_ != nullptr; - } + inline bool WasAcquired() const noexcept { + return T_ != nullptr; + } -private: - inline void Init(const T* t, const char* acquire) noexcept { - T_ = const_cast<T*>(t); - TWhatThreadDoesPushPop pp(acquire); - T_->Acquire(); - } +private: + inline void Init(const T* t, const char* acquire) noexcept { + T_ = const_cast<T*>(t); + TWhatThreadDoesPushPop pp(acquire); + T_->Acquire(); + } -private: - T* T_; +private: + T* T_; }; |