diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/actor | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/actor')
19 files changed, 904 insertions, 904 deletions
diff --git a/library/cpp/messagebus/actor/actor.h b/library/cpp/messagebus/actor/actor.h index 9b8f20298a..6c05d474ae 100644 --- a/library/cpp/messagebus/actor/actor.h +++ b/library/cpp/messagebus/actor/actor.h @@ -1,100 +1,100 @@ -#pragma once - +#pragma once + #include "executor.h" -#include "tasks.h" -#include "what_thread_does.h" - +#include "tasks.h" +#include "what_thread_does.h" + #include <util/system/yassert.h> -namespace NActor { +namespace NActor { class IActor: protected IWorkItem { public: // TODO: make private TTasks Tasks; - + public: virtual void ScheduleHereV() = 0; virtual void ScheduleV() = 0; virtual void ScheduleHereAtMostOnceV() = 0; - + // TODO: make private virtual void RefV() = 0; virtual void UnRefV() = 0; - + // mute warnings ~IActor() override { } }; - + struct TDefaultTag {}; - + template <typename TThis, typename TTag = TDefaultTag> class TActor: public IActor { private: TExecutor* const Executor; - + public: TActor(TExecutor* executor) : Executor(executor) { } - + void AddTaskFromActorLoop() { bool schedule = Tasks.AddTask(); // TODO: check thread id Y_ASSERT(!schedule); } - + /** - * Schedule actor. - * - * If actor is sleeping, then actor will be executed right now. - * If actor is executing right now, it will be executed one more time. - * If this method is called multiple time, actor will be re-executed no more than one more time. - */ + * Schedule actor. + * + * If actor is sleeping, then actor will be executed right now. + * If actor is executing right now, it will be executed one more time. + * If this method is called multiple time, actor will be re-executed no more than one more time. + */ void Schedule() { if (Tasks.AddTask()) { EnqueueWork(); } - } - + } + /** - * Schedule actor, execute it in current thread. - * - * If actor is running, continue executing where it is executing. - * If actor is sleeping, execute it in current thread. - * - * Operation is useful for tasks that are likely to complete quickly. - */ + * Schedule actor, execute it in current thread. + * + * If actor is running, continue executing where it is executing. + * If actor is sleeping, execute it in current thread. + * + * Operation is useful for tasks that are likely to complete quickly. + */ void ScheduleHere() { if (Tasks.AddTask()) { Loop(); } - } - + } + /** - * Schedule actor, execute in current thread no more than once. - * - * If actor is running, continue executing where it is executing. - * If actor is sleeping, execute one iteration here, and if actor got new tasks, - * reschedule it in worker pool. - */ + * Schedule actor, execute in current thread no more than once. + * + * If actor is running, continue executing where it is executing. + * If actor is sleeping, execute one iteration here, and if actor got new tasks, + * reschedule it in worker pool. + */ void ScheduleHereAtMostOnce() { if (Tasks.AddTask()) { bool fetched = Tasks.FetchTask(); Y_VERIFY(fetched, "happens"); - + DoAct(); - + // if someone added more tasks, schedule them if (Tasks.FetchTask()) { bool added = Tasks.AddTask(); Y_VERIFY(!added, "happens"); EnqueueWork(); } - } - } - + } + } + void ScheduleHereV() override { ScheduleHere(); } @@ -110,35 +110,35 @@ namespace NActor { void UnRefV() override { GetThis()->UnRef(); } - + private: TThis* GetThis() { return static_cast<TThis*>(this); } - + void EnqueueWork() { GetThis()->Ref(); Executor->EnqueueWork({this}); } - + void DoAct() { WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); - + GetThis()->Act(TTag()); } - + void Loop() { // TODO: limit number of iterations while (Tasks.FetchTask()) { DoAct(); } - } - + } + void DoWork() override { Y_ASSERT(GetThis()->RefCount() >= 1); Loop(); GetThis()->UnRef(); } }; - + } diff --git a/library/cpp/messagebus/actor/actor_ut.cpp b/library/cpp/messagebus/actor/actor_ut.cpp index b76ab55bfa..3f6d72ccdc 100644 --- a/library/cpp/messagebus/actor/actor_ut.cpp +++ b/library/cpp/messagebus/actor/actor_ut.cpp @@ -1,157 +1,157 @@ #include <library/cpp/testing/unittest/registar.h> - -#include "actor.h" -#include "queue_in_actor.h" - + +#include "actor.h" +#include "queue_in_actor.h" + #include <library/cpp/messagebus/misc/test_sync.h> #include <util/generic/object_counter.h> #include <util/system/event.h> -using namespace NActor; - -template <typename TThis> +using namespace NActor; + +template <typename TThis> struct TTestActorBase: public TAtomicRefCount<TThis>, public TActor<TThis> { - TTestSync Started; - TTestSync Acted; - - TTestActorBase(TExecutor* executor) - : TActor<TThis>(executor) + TTestSync Started; + TTestSync Acted; + + TTestActorBase(TExecutor* executor) + : TActor<TThis>(executor) { } - - void Act(TDefaultTag) { - Started.Inc(); - static_cast<TThis*>(this)->Act2(); - Acted.Inc(); - } -}; - + + void Act(TDefaultTag) { + Started.Inc(); + static_cast<TThis*>(this)->Act2(); + Acted.Inc(); + } +}; + struct TNopActor: public TTestActorBase<TNopActor> { - TObjectCounter<TNopActor> AllocCounter; - - TNopActor(TExecutor* executor) - : TTestActorBase<TNopActor>(executor) + TObjectCounter<TNopActor> AllocCounter; + + TNopActor(TExecutor* executor) + : TTestActorBase<TNopActor>(executor) { } - - void Act2() { - } -}; - + + void Act2() { + } +}; + struct TWaitForSignalActor: public TTestActorBase<TWaitForSignalActor> { - TWaitForSignalActor(TExecutor* executor) - : TTestActorBase<TWaitForSignalActor>(executor) + TWaitForSignalActor(TExecutor* executor) + : TTestActorBase<TWaitForSignalActor>(executor) { } - + TSystemEvent WaitFor; - - void Act2() { - WaitFor.Wait(); - } -}; - + + void Act2() { + WaitFor.Wait(); + } +}; + struct TDecrementAndSendActor: public TTestActorBase<TDecrementAndSendActor>, public TQueueInActor<TDecrementAndSendActor, int> { TSystemEvent Done; - - TDecrementAndSendActor* Next; - - TDecrementAndSendActor(TExecutor* executor) - : TTestActorBase<TDecrementAndSendActor>(executor) + + TDecrementAndSendActor* Next; + + TDecrementAndSendActor(TExecutor* executor) + : TTestActorBase<TDecrementAndSendActor>(executor) , Next(nullptr) { } - - void ProcessItem(TDefaultTag, TDefaultTag, int n) { - if (n == 0) { - Done.Signal(); - } else { - Next->EnqueueAndSchedule(n - 1); - } - } - - void Act(TDefaultTag) { - DequeueAll(); - } -}; - -struct TObjectCountChecker { - TObjectCountChecker() { - CheckCounts(); - } - - ~TObjectCountChecker() { - CheckCounts(); - } - - void CheckCounts() { - UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TNopActor>::ObjectCount()); - UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TWaitForSignalActor>::ObjectCount()); - UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TDecrementAndSendActor>::ObjectCount()); - } -}; - + + void ProcessItem(TDefaultTag, TDefaultTag, int n) { + if (n == 0) { + Done.Signal(); + } else { + Next->EnqueueAndSchedule(n - 1); + } + } + + void Act(TDefaultTag) { + DequeueAll(); + } +}; + +struct TObjectCountChecker { + TObjectCountChecker() { + CheckCounts(); + } + + ~TObjectCountChecker() { + CheckCounts(); + } + + void CheckCounts() { + UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TNopActor>::ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TWaitForSignalActor>::ObjectCount()); + UNIT_ASSERT_VALUES_EQUAL(TAtomicBase(0), TObjectCounter<TDecrementAndSendActor>::ObjectCount()); + } +}; + Y_UNIT_TEST_SUITE(TActor) { Y_UNIT_TEST(Simple) { - TObjectCountChecker objectCountChecker; - - TExecutor executor(4); - - TIntrusivePtr<TNopActor> actor(new TNopActor(&executor)); - - actor->Schedule(); - - actor->Acted.WaitFor(1u); - } - + TObjectCountChecker objectCountChecker; + + TExecutor executor(4); + + TIntrusivePtr<TNopActor> actor(new TNopActor(&executor)); + + actor->Schedule(); + + actor->Acted.WaitFor(1u); + } + Y_UNIT_TEST(ScheduleAfterStart) { - TObjectCountChecker objectCountChecker; - - TExecutor executor(4); - - TIntrusivePtr<TWaitForSignalActor> actor(new TWaitForSignalActor(&executor)); - - actor->Schedule(); - - actor->Started.WaitFor(1); - - actor->Schedule(); - - actor->WaitFor.Signal(); - - // make sure Act is called second time - actor->Acted.WaitFor(2u); - } - - void ComplexImpl(int queueSize, int actorCount) { - TObjectCountChecker objectCountChecker; - - TExecutor executor(queueSize); - + TObjectCountChecker objectCountChecker; + + TExecutor executor(4); + + TIntrusivePtr<TWaitForSignalActor> actor(new TWaitForSignalActor(&executor)); + + actor->Schedule(); + + actor->Started.WaitFor(1); + + actor->Schedule(); + + actor->WaitFor.Signal(); + + // make sure Act is called second time + actor->Acted.WaitFor(2u); + } + + void ComplexImpl(int queueSize, int actorCount) { + TObjectCountChecker objectCountChecker; + + TExecutor executor(queueSize); + TVector<TIntrusivePtr<TDecrementAndSendActor>> actors; - for (int i = 0; i < actorCount; ++i) { - actors.push_back(new TDecrementAndSendActor(&executor)); - } - - for (int i = 0; i < actorCount; ++i) { - actors.at(i)->Next = &*actors.at((i + 1) % actorCount); - } - - for (int i = 0; i < actorCount; ++i) { - actors.at(i)->EnqueueAndSchedule(10000); - } - - for (int i = 0; i < actorCount; ++i) { - actors.at(i)->Done.WaitI(); - } - } - + for (int i = 0; i < actorCount; ++i) { + actors.push_back(new TDecrementAndSendActor(&executor)); + } + + for (int i = 0; i < actorCount; ++i) { + actors.at(i)->Next = &*actors.at((i + 1) % actorCount); + } + + for (int i = 0; i < actorCount; ++i) { + actors.at(i)->EnqueueAndSchedule(10000); + } + + for (int i = 0; i < actorCount; ++i) { + actors.at(i)->Done.WaitI(); + } + } + Y_UNIT_TEST(ComplexContention) { - ComplexImpl(4, 6); - } - + ComplexImpl(4, 6); + } + Y_UNIT_TEST(ComplexNoContention) { - ComplexImpl(6, 4); - } -} + ComplexImpl(6, 4); + } +} diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp index 7a2227a458..fbd76a86ba 100644 --- a/library/cpp/messagebus/actor/executor.cpp +++ b/library/cpp/messagebus/actor/executor.cpp @@ -1,95 +1,95 @@ #include "executor.h" - -#include "thread_extra.h" -#include "what_thread_does.h" -#include "what_thread_does_guard.h" - + +#include "thread_extra.h" +#include "what_thread_does.h" +#include "what_thread_does_guard.h" + #include <util/generic/utility.h> #include <util/random/random.h> #include <util/stream/str.h> #include <util/system/tls.h> #include <util/system/yassert.h> - + #include <array> -using namespace NActor; -using namespace NActor::NPrivate; - -namespace { - struct THistoryInternal { - struct TRecord { +using namespace NActor; +using namespace NActor::NPrivate; + +namespace { + struct THistoryInternal { + struct TRecord { TAtomic MaxQueueSize; - + TRecord() : MaxQueueSize() { } - - TExecutorHistory::THistoryRecord Capture() { - TExecutorHistory::THistoryRecord r; + + TExecutorHistory::THistoryRecord Capture() { + TExecutorHistory::THistoryRecord r; r.MaxQueueSize = AtomicGet(MaxQueueSize); - return r; - } - }; - - ui64 Start; - ui64 LastTime; - + return r; + } + }; + + ui64 Start; + ui64 LastTime; + std::array<TRecord, 3600> Records; - - THistoryInternal() { - Start = TInstant::Now().Seconds(); - LastTime = Start - 1; - } - - TRecord& GetRecordForTime(ui64 time) { - return Records[time % Records.size()]; - } - - TRecord& GetNowRecord(ui64 now) { - for (ui64 t = LastTime + 1; t <= now; ++t) { - GetRecordForTime(t) = TRecord(); - } - LastTime = now; - return GetRecordForTime(now); - } - - TExecutorHistory Capture() { - TExecutorHistory history; - ui64 now = TInstant::Now().Seconds(); - ui64 lastHistoryRecord = now - 1; - ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1); - history.HistoryRecords.resize(historySize); - for (ui32 i = 0; i < historySize; ++i) { - history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture(); - } - history.LastHistoryRecordSecond = lastHistoryRecord; - return history; - } - }; - -} - + + THistoryInternal() { + Start = TInstant::Now().Seconds(); + LastTime = Start - 1; + } + + TRecord& GetRecordForTime(ui64 time) { + return Records[time % Records.size()]; + } + + TRecord& GetNowRecord(ui64 now) { + for (ui64 t = LastTime + 1; t <= now; ++t) { + GetRecordForTime(t) = TRecord(); + } + LastTime = now; + return GetRecordForTime(now); + } + + TExecutorHistory Capture() { + TExecutorHistory history; + ui64 now = TInstant::Now().Seconds(); + ui64 lastHistoryRecord = now - 1; + ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1); + history.HistoryRecords.resize(historySize); + for (ui32 i = 0; i < historySize; ++i) { + history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture(); + } + history.LastHistoryRecordSecond = lastHistoryRecord; + return history; + } + }; + +} + Y_POD_STATIC_THREAD(TExecutor*) ThreadCurrentExecutor; - -static const char* NoLocation = "nowhere"; - -struct TExecutorWorkerThreadLocalData { - ui32 MaxQueueSize; -}; - -static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData; + +static const char* NoLocation = "nowhere"; + +struct TExecutorWorkerThreadLocalData { + ui32 MaxQueueSize; +}; + +static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData; Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData) WorkerThreadLocalData; - -namespace NActor { + +namespace NActor { struct TExecutorWorker { TExecutor* const Executor; TThread Thread; const char** WhatThreadDoesLocation; TExecutorWorkerThreadLocalData* ThreadLocalData; - + TExecutorWorker(TExecutor* executor) : Executor(executor) , Thread(RunThreadProc, this) @@ -98,241 +98,241 @@ namespace NActor { { Thread.Start(); } - + void Run() { WhatThreadDoesLocation = ::WhatThreadDoesLocation(); AtomicSet(ThreadLocalData, &::WorkerThreadLocalData); WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); Executor->RunWorker(); } - + static void* RunThreadProc(void* thiz0) { TExecutorWorker* thiz = (TExecutorWorker*)thiz0; thiz->Run(); return nullptr; } }; - + struct TExecutor::TImpl { TExecutor* const Executor; THistoryInternal History; - + TSystemEvent HelperStopSignal; TThread HelperThread; - + TImpl(TExecutor* executor) : Executor(executor) , HelperThread(HelperThreadProc, this) { } - + void RunHelper() { ui64 nowSeconds = TInstant::Now().Seconds(); for (;;) { TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000)); - + if (HelperStopSignal.WaitD(nextStop)) { return; } - + nowSeconds = nextStop.Seconds(); - + THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds); - + ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear(); if (maxQueueSize > record.MaxQueueSize) { AtomicSet(record.MaxQueueSize, maxQueueSize); } - } - } - + } + } + static void* HelperThreadProc(void* impl0) { TImpl* impl = (TImpl*)impl0; impl->RunHelper(); return nullptr; } }; - -} - -static TExecutor::TConfig MakeConfig(unsigned workerCount) { - TExecutor::TConfig config; - config.WorkerCount = workerCount; - return config; -} - -TExecutor::TExecutor(size_t workerCount) - : Config(MakeConfig(workerCount)) -{ - Init(); -} - -TExecutor::TExecutor(const TExecutor::TConfig& config) - : Config(config) -{ - Init(); + } - -void TExecutor::Init() { - Impl.Reset(new TImpl(this)); - + +static TExecutor::TConfig MakeConfig(unsigned workerCount) { + TExecutor::TConfig config; + config.WorkerCount = workerCount; + return config; +} + +TExecutor::TExecutor(size_t workerCount) + : Config(MakeConfig(workerCount)) +{ + Init(); +} + +TExecutor::TExecutor(const TExecutor::TConfig& config) + : Config(config) +{ + Init(); +} + +void TExecutor::Init() { + Impl.Reset(new TImpl(this)); + AtomicSet(ExitWorkers, 0); - + Y_VERIFY(Config.WorkerCount > 0); - - for (size_t i = 0; i < Config.WorkerCount; i++) { - WorkerThreads.push_back(new TExecutorWorker(this)); - } - - Impl->HelperThread.Start(); -} - -TExecutor::~TExecutor() { - Stop(); -} - -void TExecutor::Stop() { + + for (size_t i = 0; i < Config.WorkerCount; i++) { + WorkerThreads.push_back(new TExecutorWorker(this)); + } + + Impl->HelperThread.Start(); +} + +TExecutor::~TExecutor() { + Stop(); +} + +void TExecutor::Stop() { AtomicSet(ExitWorkers, 1); - - Impl->HelperStopSignal.Signal(); - Impl->HelperThread.Join(); - - { - TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop"); - WorkAvailable.BroadCast(); - } - - for (size_t i = 0; i < WorkerThreads.size(); i++) { - WorkerThreads[i]->Thread.Join(); - } - - // TODO: make queue empty at this point - ProcessWorkQueueHere(); -} - + + Impl->HelperStopSignal.Signal(); + Impl->HelperThread.Join(); + + { + TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop"); + WorkAvailable.BroadCast(); + } + + for (size_t i = 0; i < WorkerThreads.size(); i++) { + WorkerThreads[i]->Thread.Join(); + } + + // TODO: make queue empty at this point + ProcessWorkQueueHere(); +} + void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) { - if (wis.empty()) - return; - + if (wis.empty()) + return; + if (Y_UNLIKELY(AtomicGet(ExitWorkers) != 0)) { Y_VERIFY(WorkItems.Empty(), "executor %s: cannot add tasks after queue shutdown", Config.Name); - } - - TWhatThreadDoesPushPop pp("executor: EnqueueWork"); - - WorkItems.PushAll(wis); - - { - if (wis.size() == 1) { - TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork"); - WorkAvailable.Signal(); - } else { - TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork"); - WorkAvailable.BroadCast(); - } - } -} - + } + + TWhatThreadDoesPushPop pp("executor: EnqueueWork"); + + WorkItems.PushAll(wis); + + { + if (wis.size() == 1) { + TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork"); + WorkAvailable.Signal(); + } else { + TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork"); + WorkAvailable.BroadCast(); + } + } +} + size_t TExecutor::GetWorkQueueSize() const { - return WorkItems.Size(); -} - + return WorkItems.Size(); +} + using namespace NTSAN; -ui32 TExecutor::GetMaxQueueSizeAndClear() const { - ui32 max = 0; - for (unsigned i = 0; i < WorkerThreads.size(); ++i) { +ui32 TExecutor::GetMaxQueueSizeAndClear() const { + ui32 max = 0; + for (unsigned i = 0; i < WorkerThreads.size(); ++i) { TExecutorWorkerThreadLocalData* wtls = RelaxedLoad(&WorkerThreads[i]->ThreadLocalData); max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize)); RelaxedStore<ui32>(&wtls->MaxQueueSize, 0); - } - return max; -} - + } + return max; +} + TString TExecutor::GetStatus() const { - return GetStatusRecordInternal().Status; -} - + return GetStatusRecordInternal().Status; +} + TString TExecutor::GetStatusSingleLine() const { - TStringStream ss; - ss << "work items: " << GetWorkQueueSize(); - return ss.Str(); -} - + TStringStream ss; + ss << "work items: " << GetWorkQueueSize(); + return ss.Str(); +} + TExecutorStatus TExecutor::GetStatusRecordInternal() const { - TExecutorStatus r; - - r.WorkQueueSize = GetWorkQueueSize(); - - { - TStringStream ss; - ss << "work items: " << GetWorkQueueSize() << "\n"; - ss << "workers:\n"; - for (unsigned i = 0; i < WorkerThreads.size(); ++i) { + TExecutorStatus r; + + r.WorkQueueSize = GetWorkQueueSize(); + + { + TStringStream ss; + ss << "work items: " << GetWorkQueueSize() << "\n"; + ss << "workers:\n"; + for (unsigned i = 0; i < WorkerThreads.size(); ++i) { ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n"; - } - r.Status = ss.Str(); - } - - r.History = Impl->History.Capture(); - - return r; -} - + } + r.Status = ss.Str(); + } + + r.History = Impl->History.Capture(); + + return r; +} + bool TExecutor::IsInExecutorThread() const { - return ThreadCurrentExecutor == this; -} - -TAutoPtr<IWorkItem> TExecutor::DequeueWork() { - IWorkItem* wi = reinterpret_cast<IWorkItem*>(1); - size_t queueSize = Max<size_t>(); - if (!WorkItems.TryPop(&wi, &queueSize)) { - TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork"); - while (!WorkItems.TryPop(&wi, &queueSize)) { + return ThreadCurrentExecutor == this; +} + +TAutoPtr<IWorkItem> TExecutor::DequeueWork() { + IWorkItem* wi = reinterpret_cast<IWorkItem*>(1); + size_t queueSize = Max<size_t>(); + if (!WorkItems.TryPop(&wi, &queueSize)) { + TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork"); + while (!WorkItems.TryPop(&wi, &queueSize)) { if (AtomicGet(ExitWorkers) != 0) return nullptr; - - TWhatThreadDoesPushPop pp("waiting for work on condvar"); - WorkAvailable.Wait(WorkMutex); - } - } + + TWhatThreadDoesPushPop pp("waiting for work on condvar"); + WorkAvailable.Wait(WorkMutex); + } + } auto& wtls = TlsRef(WorkerThreadLocalData); if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) { RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize); - } - - return wi; -} - -void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) { - WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); - wi.Release()->DoWork(); -} - -void TExecutor::ProcessWorkQueueHere() { - IWorkItem* wi; - while (WorkItems.TryPop(&wi)) { - RunWorkItem(wi); - } -} - -void TExecutor::RunWorker() { + } + + return wi; +} + +void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) { + WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); + wi.Release()->DoWork(); +} + +void TExecutor::ProcessWorkQueueHere() { + IWorkItem* wi; + while (WorkItems.TryPop(&wi)) { + RunWorkItem(wi); + } +} + +void TExecutor::RunWorker() { Y_VERIFY(!ThreadCurrentExecutor, "state check"); - ThreadCurrentExecutor = this; - + ThreadCurrentExecutor = this; + SetCurrentThreadName("wrkr"); - - for (;;) { - TAutoPtr<IWorkItem> wi = DequeueWork(); - if (!wi) { - break; - } - // Note for messagebus users: make sure program crashes - // on uncaught exception in thread, otherewise messagebus may just hang on error. - RunWorkItem(wi); - } - + + for (;;) { + TAutoPtr<IWorkItem> wi = DequeueWork(); + if (!wi) { + break; + } + // Note for messagebus users: make sure program crashes + // on uncaught exception in thread, otherewise messagebus may just hang on error. + RunWorkItem(wi); + } + ThreadCurrentExecutor = (TExecutor*)nullptr; -} +} diff --git a/library/cpp/messagebus/actor/executor.h b/library/cpp/messagebus/actor/executor.h index 7292d8be53..2a30580ff1 100644 --- a/library/cpp/messagebus/actor/executor.h +++ b/library/cpp/messagebus/actor/executor.h @@ -1,16 +1,16 @@ -#pragma once - +#pragma once + #include "ring_buffer_with_spin_lock.h" #include <util/generic/array_ref.h> -#include <util/generic/vector.h> -#include <util/system/condvar.h> -#include <util/system/event.h> +#include <util/generic/vector.h> +#include <util/system/condvar.h> +#include <util/system/event.h> #include <util/system/mutex.h> #include <util/system/thread.h> -#include <util/thread/lfqueue.h> - -namespace NActor { +#include <util/thread/lfqueue.h> + +namespace NActor { namespace NPrivate { struct TExecutorHistory { struct THistoryRecord { @@ -18,88 +18,88 @@ namespace NActor { }; TVector<THistoryRecord> HistoryRecords; ui64 LastHistoryRecordSecond; - + ui64 FirstHistoryRecordSecond() const { return LastHistoryRecordSecond - HistoryRecords.size() + 1; } }; - + struct TExecutorStatus { size_t WorkQueueSize = 0; TExecutorHistory History; TString Status; - }; + }; } - + class IWorkItem { public: virtual ~IWorkItem() { - } + } virtual void DoWork(/* must release this */) = 0; - }; - + }; + struct TExecutorWorker; - + class TExecutor: public TAtomicRefCount<TExecutor> { friend struct TExecutorWorker; - + public: struct TConfig { size_t WorkerCount; const char* Name; - + TConfig() : WorkerCount(1) , Name() { } }; - + private: struct TImpl; THolder<TImpl> Impl; - + const TConfig Config; - + TAtomic ExitWorkers; - + TVector<TAutoPtr<TExecutorWorker>> WorkerThreads; - + TRingBufferWithSpinLock<IWorkItem*> WorkItems; - + TMutex WorkMutex; TCondVar WorkAvailable; - + public: explicit TExecutor(size_t workerCount); TExecutor(const TConfig& config); ~TExecutor(); - + void Stop(); - + void EnqueueWork(TArrayRef<IWorkItem* const> w); - + size_t GetWorkQueueSize() const; TString GetStatus() const; TString GetStatusSingleLine() const; NPrivate::TExecutorStatus GetStatusRecordInternal() const; - + bool IsInExecutorThread() const; - + private: void Init(); - + TAutoPtr<IWorkItem> DequeueWork(); - + void ProcessWorkQueueHere(); - + inline void RunWorkItem(TAutoPtr<IWorkItem>); - + void RunWorker(); - + ui32 GetMaxQueueSizeAndClear() const; }; - + using TExecutorPtr = TIntrusivePtr<TExecutor>; - + } diff --git a/library/cpp/messagebus/actor/queue_for_actor.h b/library/cpp/messagebus/actor/queue_for_actor.h index 40fa536b82..b64a2a27a2 100644 --- a/library/cpp/messagebus/actor/queue_for_actor.h +++ b/library/cpp/messagebus/actor/queue_for_actor.h @@ -1,65 +1,65 @@ -#pragma once - -#include <util/generic/vector.h> -#include <util/system/yassert.h> -#include <util/thread/lfstack.h> -#include <util/thread/singleton.h> - -// TODO: include from correct directory -#include "temp_tls_vector.h" - -namespace NActor { +#pragma once + +#include <util/generic/vector.h> +#include <util/system/yassert.h> +#include <util/thread/lfstack.h> +#include <util/thread/singleton.h> + +// TODO: include from correct directory +#include "temp_tls_vector.h" + +namespace NActor { namespace NPrivate { struct TTagForTl {}; - + } - + template <typename T> class TQueueForActor { private: TLockFreeStack<T> Queue; - + public: ~TQueueForActor() { Y_VERIFY(Queue.IsEmpty()); } - + bool IsEmpty() { return Queue.IsEmpty(); } - + void Enqueue(const T& value) { Queue.Enqueue(value); } - + template <typename TCollection> void EnqueueAll(const TCollection& all) { Queue.EnqueueAll(all); } - + void Clear() { TVector<T> tmp; Queue.DequeueAll(&tmp); } - + template <typename TFunc> void DequeueAll(const TFunc& func // TODO: , std::enable_if_t<TFunctionParamCount<TFunc>::Value == 1>* = 0 ) { TTempTlsVector<T> temp; - + Queue.DequeueAllSingleConsumer(temp.GetVector()); - + for (typename TVector<T>::reverse_iterator i = temp.GetVector()->rbegin(); i != temp.GetVector()->rend(); ++i) { func(*i); } - + temp.Clear(); - + if (temp.Capacity() * sizeof(T) > 64 * 1024) { temp.Shrink(); } - } + } template <typename TFunc> void DequeueAllLikelyEmpty(const TFunc& func) { @@ -70,5 +70,5 @@ namespace NActor { DequeueAll(func); } }; - + } diff --git a/library/cpp/messagebus/actor/queue_in_actor.h b/library/cpp/messagebus/actor/queue_in_actor.h index 9865996532..f93eb03070 100644 --- a/library/cpp/messagebus/actor/queue_in_actor.h +++ b/library/cpp/messagebus/actor/queue_in_actor.h @@ -1,80 +1,80 @@ -#pragma once - -#include "actor.h" -#include "queue_for_actor.h" - +#pragma once + +#include "actor.h" +#include "queue_for_actor.h" + #include <functional> -namespace NActor { +namespace NActor { template <typename TItem> class IQueueInActor { public: virtual void EnqueueAndScheduleV(const TItem& item) = 0; virtual void DequeueAllV() = 0; virtual void DequeueAllLikelyEmptyV() = 0; - + virtual ~IQueueInActor() { } }; - + template <typename TThis, typename TItem, typename TActorTag = TDefaultTag, typename TQueueTag = TDefaultTag> class TQueueInActor: public IQueueInActor<TItem> { typedef TQueueInActor<TThis, TItem, TActorTag, TQueueTag> TSelf; - + public: // TODO: make protected TQueueForActor<TItem> QueueInActor; - + private: TActor<TThis, TActorTag>* GetActor() { return GetThis(); } - + TThis* GetThis() { return static_cast<TThis*>(this); } - + void ProcessItem(const TItem& item) { GetThis()->ProcessItem(TActorTag(), TQueueTag(), item); } - + public: void EnqueueAndNoSchedule(const TItem& item) { QueueInActor.Enqueue(item); } - + void EnqueueAndSchedule(const TItem& item) { EnqueueAndNoSchedule(item); GetActor()->Schedule(); } - + void EnqueueAndScheduleV(const TItem& item) override { EnqueueAndSchedule(item); } - + void Clear() { QueueInActor.Clear(); } - + void DequeueAll() { QueueInActor.DequeueAll(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1)); } - + void DequeueAllV() override { return DequeueAll(); } - + void DequeueAllLikelyEmpty() { QueueInActor.DequeueAllLikelyEmpty(std::bind(&TSelf::ProcessItem, this, std::placeholders::_1)); } - + void DequeueAllLikelyEmptyV() override { return DequeueAllLikelyEmpty(); } - + bool IsEmpty() { return QueueInActor.IsEmpty(); } }; - + } diff --git a/library/cpp/messagebus/actor/ring_buffer.h b/library/cpp/messagebus/actor/ring_buffer.h index ec5706f7c7..a0e2f481bf 100644 --- a/library/cpp/messagebus/actor/ring_buffer.h +++ b/library/cpp/messagebus/actor/ring_buffer.h @@ -1,135 +1,135 @@ -#pragma once - +#pragma once + #include <util/generic/array_ref.h> #include <util/generic/maybe.h> #include <util/generic/utility.h> #include <util/generic/vector.h> -#include <util/system/yassert.h> - -template <typename T> -struct TRingBuffer { -private: - ui32 CapacityPow; - ui32 CapacityMask; - ui32 Capacity; - ui32 WritePos; - ui32 ReadPos; +#include <util/system/yassert.h> + +template <typename T> +struct TRingBuffer { +private: + ui32 CapacityPow; + ui32 CapacityMask; + ui32 Capacity; + ui32 WritePos; + ui32 ReadPos; TVector<T> Data; - - void StateCheck() const { + + void StateCheck() const { Y_ASSERT(Capacity == Data.size()); Y_ASSERT(Capacity == (1u << CapacityPow)); Y_ASSERT((Capacity & CapacityMask) == 0u); Y_ASSERT(Capacity - CapacityMask == 1u); Y_ASSERT(WritePos < Capacity); Y_ASSERT(ReadPos < Capacity); - } - - size_t Writable() const { - return (Capacity + ReadPos - WritePos - 1) & CapacityMask; - } - - void ReserveWritable(ui32 sz) { - if (sz <= Writable()) - return; - - ui32 newCapacityPow = CapacityPow; + } + + size_t Writable() const { + return (Capacity + ReadPos - WritePos - 1) & CapacityMask; + } + + void ReserveWritable(ui32 sz) { + if (sz <= Writable()) + return; + + ui32 newCapacityPow = CapacityPow; while ((1u << newCapacityPow) < sz + ui32(Size()) + 1u) { - ++newCapacityPow; - } - ui32 newCapacity = 1u << newCapacityPow; - ui32 newCapacityMask = newCapacity - 1u; + ++newCapacityPow; + } + ui32 newCapacity = 1u << newCapacityPow; + ui32 newCapacityMask = newCapacity - 1u; TVector<T> newData(newCapacity); - ui32 oldSize = Size(); - // Copy old elements - for (size_t i = 0; i < oldSize; ++i) { - newData[i] = Get(i); - } - - CapacityPow = newCapacityPow; - Capacity = newCapacity; - CapacityMask = newCapacityMask; - Data.swap(newData); - ReadPos = 0; - WritePos = oldSize; - - StateCheck(); - } - - const T& Get(ui32 i) const { - return Data[(ReadPos + i) & CapacityMask]; - } - -public: - TRingBuffer() - : CapacityPow(0) - , CapacityMask(0) - , Capacity(1 << CapacityPow) - , WritePos(0) - , ReadPos(0) - , Data(Capacity) - { - StateCheck(); - } - - size_t Size() const { - return (Capacity + WritePos - ReadPos) & CapacityMask; - } - - bool Empty() const { - return WritePos == ReadPos; - } - + ui32 oldSize = Size(); + // Copy old elements + for (size_t i = 0; i < oldSize; ++i) { + newData[i] = Get(i); + } + + CapacityPow = newCapacityPow; + Capacity = newCapacity; + CapacityMask = newCapacityMask; + Data.swap(newData); + ReadPos = 0; + WritePos = oldSize; + + StateCheck(); + } + + const T& Get(ui32 i) const { + return Data[(ReadPos + i) & CapacityMask]; + } + +public: + TRingBuffer() + : CapacityPow(0) + , CapacityMask(0) + , Capacity(1 << CapacityPow) + , WritePos(0) + , ReadPos(0) + , Data(Capacity) + { + StateCheck(); + } + + size_t Size() const { + return (Capacity + WritePos - ReadPos) & CapacityMask; + } + + bool Empty() const { + return WritePos == ReadPos; + } + void PushAll(TArrayRef<const T> value) { - ReserveWritable(value.size()); - - ui32 secondSize; - ui32 firstSize; - - if (WritePos + value.size() <= Capacity) { - firstSize = value.size(); - secondSize = 0; - } else { - firstSize = Capacity - WritePos; - secondSize = value.size() - firstSize; - } - - for (size_t i = 0; i < firstSize; ++i) { + ReserveWritable(value.size()); + + ui32 secondSize; + ui32 firstSize; + + if (WritePos + value.size() <= Capacity) { + firstSize = value.size(); + secondSize = 0; + } else { + firstSize = Capacity - WritePos; + secondSize = value.size() - firstSize; + } + + for (size_t i = 0; i < firstSize; ++i) { Data[WritePos + i] = value[i]; - } - - for (size_t i = 0; i < secondSize; ++i) { + } + + for (size_t i = 0; i < secondSize; ++i) { Data[i] = value[firstSize + i]; - } - - WritePos = (WritePos + value.size()) & CapacityMask; - StateCheck(); - } - - void Push(const T& t) { + } + + WritePos = (WritePos + value.size()) & CapacityMask; + StateCheck(); + } + + void Push(const T& t) { PushAll(MakeArrayRef(&t, 1)); - } - - bool TryPop(T* r) { - StateCheck(); - if (Empty()) { - return false; - } - *r = Data[ReadPos]; - ReadPos = (ReadPos + 1) & CapacityMask; - return true; - } - - TMaybe<T> TryPop() { - T tmp; - if (TryPop(&tmp)) { - return tmp; - } else { - return TMaybe<T>(); - } - } - - T Pop() { - return *TryPop(); - } -}; + } + + bool TryPop(T* r) { + StateCheck(); + if (Empty()) { + return false; + } + *r = Data[ReadPos]; + ReadPos = (ReadPos + 1) & CapacityMask; + return true; + } + + TMaybe<T> TryPop() { + T tmp; + if (TryPop(&tmp)) { + return tmp; + } else { + return TMaybe<T>(); + } + } + + T Pop() { + return *TryPop(); + } +}; diff --git a/library/cpp/messagebus/actor/ring_buffer_ut.cpp b/library/cpp/messagebus/actor/ring_buffer_ut.cpp index bdb379b3a9..9b2f46957b 100644 --- a/library/cpp/messagebus/actor/ring_buffer_ut.cpp +++ b/library/cpp/messagebus/actor/ring_buffer_ut.cpp @@ -1,60 +1,60 @@ #include <library/cpp/testing/unittest/registar.h> - -#include "ring_buffer.h" - + +#include "ring_buffer.h" + #include <util/random/random.h> Y_UNIT_TEST_SUITE(RingBuffer) { - struct TRingBufferTester { - TRingBuffer<unsigned> RingBuffer; - - unsigned NextPush; - unsigned NextPop; - + struct TRingBufferTester { + TRingBuffer<unsigned> RingBuffer; + + unsigned NextPush; + unsigned NextPop; + TRingBufferTester() : NextPush() , NextPop() { } - - void Push() { - //Cerr << "push " << NextPush << "\n"; - RingBuffer.Push(NextPush); - NextPush += 1; - } - - void Pop() { - //Cerr << "pop " << NextPop << "\n"; - unsigned popped = RingBuffer.Pop(); - UNIT_ASSERT_VALUES_EQUAL(NextPop, popped); - NextPop += 1; - } - - bool Empty() const { - UNIT_ASSERT_VALUES_EQUAL(RingBuffer.Size(), NextPush - NextPop); - UNIT_ASSERT_VALUES_EQUAL(RingBuffer.Empty(), RingBuffer.Size() == 0); - return RingBuffer.Empty(); - } - }; - - void Iter() { - TRingBufferTester rb; - - while (rb.NextPush < 1000) { - rb.Push(); - while (!rb.Empty() && RandomNumber<bool>()) { - rb.Pop(); - } - } - - while (!rb.Empty()) { - rb.Pop(); - } - } - + + void Push() { + //Cerr << "push " << NextPush << "\n"; + RingBuffer.Push(NextPush); + NextPush += 1; + } + + void Pop() { + //Cerr << "pop " << NextPop << "\n"; + unsigned popped = RingBuffer.Pop(); + UNIT_ASSERT_VALUES_EQUAL(NextPop, popped); + NextPop += 1; + } + + bool Empty() const { + UNIT_ASSERT_VALUES_EQUAL(RingBuffer.Size(), NextPush - NextPop); + UNIT_ASSERT_VALUES_EQUAL(RingBuffer.Empty(), RingBuffer.Size() == 0); + return RingBuffer.Empty(); + } + }; + + void Iter() { + TRingBufferTester rb; + + while (rb.NextPush < 1000) { + rb.Push(); + while (!rb.Empty() && RandomNumber<bool>()) { + rb.Pop(); + } + } + + while (!rb.Empty()) { + rb.Pop(); + } + } + Y_UNIT_TEST(Random) { - for (unsigned i = 0; i < 100; ++i) { - Iter(); - } - } -} + for (unsigned i = 0; i < 100; ++i) { + Iter(); + } + } +} diff --git a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h index f0b7cd90e4..d05dec8577 100644 --- a/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h +++ b/library/cpp/messagebus/actor/ring_buffer_with_spin_lock.h @@ -1,91 +1,91 @@ -#pragma once - +#pragma once + #include "ring_buffer.h" -#include <util/system/spinlock.h> - -template <typename T> -class TRingBufferWithSpinLock { -private: - TRingBuffer<T> RingBuffer; - TSpinLock SpinLock; +#include <util/system/spinlock.h> + +template <typename T> +class TRingBufferWithSpinLock { +private: + TRingBuffer<T> RingBuffer; + TSpinLock SpinLock; TAtomic CachedSize; -public: - TRingBufferWithSpinLock() +public: + TRingBufferWithSpinLock() : CachedSize(0) { } - - void Push(const T& t) { - PushAll(t); - } - + + void Push(const T& t) { + PushAll(t); + } + void PushAll(TArrayRef<const T> collection) { - if (collection.empty()) { - return; - } - - TGuard<TSpinLock> Guard(SpinLock); - RingBuffer.PushAll(collection); + if (collection.empty()) { + return; + } + + TGuard<TSpinLock> Guard(SpinLock); + RingBuffer.PushAll(collection); AtomicSet(CachedSize, RingBuffer.Size()); - } - + } + bool TryPop(T* r, size_t* sizePtr = nullptr) { if (AtomicGet(CachedSize) == 0) { - return false; - } - - bool ok; - size_t size; - { - TGuard<TSpinLock> Guard(SpinLock); - ok = RingBuffer.TryPop(r); - size = RingBuffer.Size(); + return false; + } + + bool ok; + size_t size; + { + TGuard<TSpinLock> Guard(SpinLock); + ok = RingBuffer.TryPop(r); + size = RingBuffer.Size(); AtomicSet(CachedSize, size); - } - if (!!sizePtr) { - *sizePtr = size; - } - return ok; - } - - TMaybe<T> TryPop() { - T tmp; - if (TryPop(&tmp)) { - return tmp; - } else { - return TMaybe<T>(); - } - } - + } + if (!!sizePtr) { + *sizePtr = size; + } + return ok; + } + + TMaybe<T> TryPop() { + T tmp; + if (TryPop(&tmp)) { + return tmp; + } else { + return TMaybe<T>(); + } + } + bool PushAllAndTryPop(TArrayRef<const T> collection, T* r) { - if (collection.size() == 0) { - return TryPop(r); - } else { + if (collection.size() == 0) { + return TryPop(r); + } else { if (AtomicGet(CachedSize) == 0) { - *r = collection[0]; - if (collection.size() > 1) { - TGuard<TSpinLock> guard(SpinLock); + *r = collection[0]; + if (collection.size() > 1) { + TGuard<TSpinLock> guard(SpinLock); RingBuffer.PushAll(MakeArrayRef(collection.data() + 1, collection.size() - 1)); AtomicSet(CachedSize, RingBuffer.Size()); - } - } else { - TGuard<TSpinLock> guard(SpinLock); - RingBuffer.PushAll(collection); - *r = RingBuffer.Pop(); + } + } else { + TGuard<TSpinLock> guard(SpinLock); + RingBuffer.PushAll(collection); + *r = RingBuffer.Pop(); AtomicSet(CachedSize, RingBuffer.Size()); - } - return true; - } - } - - bool Empty() const { + } + return true; + } + } + + bool Empty() const { return AtomicGet(CachedSize) == 0; - } - - size_t Size() const { - TGuard<TSpinLock> Guard(SpinLock); - return RingBuffer.Size(); - } -}; + } + + size_t Size() const { + TGuard<TSpinLock> Guard(SpinLock); + return RingBuffer.Size(); + } +}; diff --git a/library/cpp/messagebus/actor/tasks.h b/library/cpp/messagebus/actor/tasks.h index 31d35931d2..3eab200b38 100644 --- a/library/cpp/messagebus/actor/tasks.h +++ b/library/cpp/messagebus/actor/tasks.h @@ -1,9 +1,9 @@ -#pragma once - +#pragma once + #include <util/system/atomic.h> -#include <util/system/yassert.h> - -namespace NActor { +#include <util/system/yassert.h> + +namespace NActor { class TTasks { enum { // order of values is important @@ -11,27 +11,27 @@ namespace NActor { E_RUNNING_NO_TASKS, E_RUNNING_GOT_TASKS, }; - + private: TAtomic State; - + public: TTasks() : State(E_WAITING) { - } - + } + // @return true iff caller have to either schedule task or execute it bool AddTask() { // High contention case optimization: AtomicGet is cheaper than AtomicSwap. if (E_RUNNING_GOT_TASKS == AtomicGet(State)) { return false; } - + TAtomicBase oldState = AtomicSwap(&State, E_RUNNING_GOT_TASKS); return oldState == E_WAITING; - } - + } + // called by executor // @return true iff we have to recheck queues bool FetchTask() { @@ -44,5 +44,5 @@ namespace NActor { Y_FAIL("unknown"); } }; - + } diff --git a/library/cpp/messagebus/actor/tasks_ut.cpp b/library/cpp/messagebus/actor/tasks_ut.cpp index d80e8451a5..8468109a7d 100644 --- a/library/cpp/messagebus/actor/tasks_ut.cpp +++ b/library/cpp/messagebus/actor/tasks_ut.cpp @@ -1,37 +1,37 @@ #include <library/cpp/testing/unittest/registar.h> - -#include "tasks.h" - -using namespace NActor; - + +#include "tasks.h" + +using namespace NActor; + Y_UNIT_TEST_SUITE(TTasks) { Y_UNIT_TEST(AddTask_FetchTask_Simple) { - TTasks tasks; - - UNIT_ASSERT(tasks.AddTask()); - UNIT_ASSERT(!tasks.AddTask()); - UNIT_ASSERT(!tasks.AddTask()); - - UNIT_ASSERT(tasks.FetchTask()); - UNIT_ASSERT(!tasks.FetchTask()); - - UNIT_ASSERT(tasks.AddTask()); - } - + TTasks tasks; + + UNIT_ASSERT(tasks.AddTask()); + UNIT_ASSERT(!tasks.AddTask()); + UNIT_ASSERT(!tasks.AddTask()); + + UNIT_ASSERT(tasks.FetchTask()); + UNIT_ASSERT(!tasks.FetchTask()); + + UNIT_ASSERT(tasks.AddTask()); + } + Y_UNIT_TEST(AddTask_FetchTask_AddTask) { - TTasks tasks; - - UNIT_ASSERT(tasks.AddTask()); - UNIT_ASSERT(!tasks.AddTask()); - - UNIT_ASSERT(tasks.FetchTask()); - UNIT_ASSERT(!tasks.AddTask()); - UNIT_ASSERT(tasks.FetchTask()); - UNIT_ASSERT(!tasks.AddTask()); - UNIT_ASSERT(!tasks.AddTask()); - UNIT_ASSERT(tasks.FetchTask()); - UNIT_ASSERT(!tasks.FetchTask()); - - UNIT_ASSERT(tasks.AddTask()); - } -} + TTasks tasks; + + UNIT_ASSERT(tasks.AddTask()); + UNIT_ASSERT(!tasks.AddTask()); + + UNIT_ASSERT(tasks.FetchTask()); + UNIT_ASSERT(!tasks.AddTask()); + UNIT_ASSERT(tasks.FetchTask()); + UNIT_ASSERT(!tasks.AddTask()); + UNIT_ASSERT(!tasks.AddTask()); + UNIT_ASSERT(tasks.FetchTask()); + UNIT_ASSERT(!tasks.FetchTask()); + + UNIT_ASSERT(tasks.AddTask()); + } +} diff --git a/library/cpp/messagebus/actor/temp_tls_vector.h b/library/cpp/messagebus/actor/temp_tls_vector.h index 675d92f5b0..5c535dd07c 100644 --- a/library/cpp/messagebus/actor/temp_tls_vector.h +++ b/library/cpp/messagebus/actor/temp_tls_vector.h @@ -1,34 +1,34 @@ -#pragma once - +#pragma once + #include "thread_extra.h" #include <util/generic/vector.h> -#include <util/system/yassert.h> - +#include <util/system/yassert.h> + template <typename T, typename TTag = void, template <typename, class> class TVectorType = TVector> -class TTempTlsVector { -private: - struct TTagForTls {}; - +class TTempTlsVector { +private: + struct TTagForTls {}; + TVectorType<T, std::allocator<T>>* Vector; -public: +public: TVectorType<T, std::allocator<T>>* GetVector() { - return Vector; - } - - TTempTlsVector() { + return Vector; + } + + TTempTlsVector() { Vector = FastTlsSingletonWithTag<TVectorType<T, std::allocator<T>>, TTagForTls>(); Y_ASSERT(Vector->empty()); - } - - ~TTempTlsVector() { + } + + ~TTempTlsVector() { Clear(); } void Clear() { - Vector->clear(); - } + Vector->clear(); + } size_t Capacity() const noexcept { return Vector->capacity(); @@ -37,4 +37,4 @@ public: void Shrink() { Vector->shrink_to_fit(); } -}; +}; diff --git a/library/cpp/messagebus/actor/thread_extra.cpp b/library/cpp/messagebus/actor/thread_extra.cpp index 048480f255..6472dd92f4 100644 --- a/library/cpp/messagebus/actor/thread_extra.cpp +++ b/library/cpp/messagebus/actor/thread_extra.cpp @@ -2,29 +2,29 @@ #include <util/stream/str.h> #include <util/system/execpath.h> -#include <util/system/platform.h> -#include <util/system/thread.h> - -namespace { +#include <util/system/platform.h> +#include <util/system/thread.h> + +namespace { #ifdef _linux_ TString GetExecName() { TString execPath = GetExecPath(); - size_t lastSlash = execPath.find_last_of('/'); + size_t lastSlash = execPath.find_last_of('/'); if (lastSlash == TString::npos) { - return execPath; - } else { - return execPath.substr(lastSlash + 1); - } - } + return execPath; + } else { + return execPath.substr(lastSlash + 1); + } + } #endif -} - +} + void SetCurrentThreadName(const char* name) { -#ifdef _linux_ - TStringStream linuxName; - linuxName << GetExecName() << "." << name; +#ifdef _linux_ + TStringStream linuxName; + linuxName << GetExecName() << "." << name; TThread::SetCurrentThreadName(linuxName.Str().data()); -#else +#else TThread::SetCurrentThreadName(name); -#endif -} +#endif +} diff --git a/library/cpp/messagebus/actor/thread_extra.h b/library/cpp/messagebus/actor/thread_extra.h index b5aa151618..002b2d8d5f 100644 --- a/library/cpp/messagebus/actor/thread_extra.h +++ b/library/cpp/messagebus/actor/thread_extra.h @@ -1,7 +1,7 @@ -#pragma once - -#include <util/thread/singleton.h> - +#pragma once + +#include <util/thread/singleton.h> + namespace NTSAN { template <typename T> inline void RelaxedStore(volatile T* a, T x) { @@ -25,7 +25,7 @@ namespace NTSAN { } void SetCurrentThreadName(const char* name); - + namespace NThreadExtra { namespace NPrivate { template <typename TValue, typename TTag> @@ -34,8 +34,8 @@ namespace NThreadExtra { }; } } - -template <typename TValue, typename TTag> -static inline TValue* FastTlsSingletonWithTag() { + +template <typename TValue, typename TTag> +static inline TValue* FastTlsSingletonWithTag() { return &FastTlsSingleton< ::NThreadExtra::NPrivate::TValueHolder<TValue, TTag>>()->Value; -} +} diff --git a/library/cpp/messagebus/actor/what_thread_does.cpp b/library/cpp/messagebus/actor/what_thread_does.cpp index bebb6a888c..94e0c0f64f 100644 --- a/library/cpp/messagebus/actor/what_thread_does.cpp +++ b/library/cpp/messagebus/actor/what_thread_does.cpp @@ -1,22 +1,22 @@ #include "what_thread_does.h" - + #include "thread_extra.h" - + #include <util/system/tls.h> Y_POD_STATIC_THREAD(const char*) WhatThreadDoes; - + const char* PushWhatThreadDoes(const char* what) { const char* r = NTSAN::RelaxedLoad(&WhatThreadDoes); NTSAN::RelaxedStore(&WhatThreadDoes, what); - return r; -} - + return r; +} + void PopWhatThreadDoes(const char* prev) { NTSAN::RelaxedStore(&WhatThreadDoes, prev); -} - +} + const char** WhatThreadDoesLocation() { - return &WhatThreadDoes; -} + return &WhatThreadDoes; +} diff --git a/library/cpp/messagebus/actor/what_thread_does.h b/library/cpp/messagebus/actor/what_thread_does.h index 235d2c3700..325528fc55 100644 --- a/library/cpp/messagebus/actor/what_thread_does.h +++ b/library/cpp/messagebus/actor/what_thread_does.h @@ -1,28 +1,28 @@ -#pragma once - -const char* PushWhatThreadDoes(const char* what); -void PopWhatThreadDoes(const char* prev); -const char** WhatThreadDoesLocation(); - -struct TWhatThreadDoesPushPop { -private: - const char* Prev; - -public: - TWhatThreadDoesPushPop(const char* what) { - Prev = PushWhatThreadDoes(what); - } - - ~TWhatThreadDoesPushPop() { - PopWhatThreadDoes(Prev); - } -}; - -#ifdef __GNUC__ -#define WHAT_THREAD_DOES_FUNCTION __PRETTY_FUNCTION__ -#else -#define WHAT_THREAD_DOES_FUNCTION __FUNCTION__ -#endif - -#define WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC() \ - TWhatThreadDoesPushPop whatThreadDoesPushPopCurrentFunc(WHAT_THREAD_DOES_FUNCTION) +#pragma once + +const char* PushWhatThreadDoes(const char* what); +void PopWhatThreadDoes(const char* prev); +const char** WhatThreadDoesLocation(); + +struct TWhatThreadDoesPushPop { +private: + const char* Prev; + +public: + TWhatThreadDoesPushPop(const char* what) { + Prev = PushWhatThreadDoes(what); + } + + ~TWhatThreadDoesPushPop() { + PopWhatThreadDoes(Prev); + } +}; + +#ifdef __GNUC__ +#define WHAT_THREAD_DOES_FUNCTION __PRETTY_FUNCTION__ +#else +#define WHAT_THREAD_DOES_FUNCTION __FUNCTION__ +#endif + +#define WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC() \ + TWhatThreadDoesPushPop whatThreadDoesPushPopCurrentFunc(WHAT_THREAD_DOES_FUNCTION) diff --git a/library/cpp/messagebus/actor/what_thread_does_guard.h b/library/cpp/messagebus/actor/what_thread_does_guard.h index f104e9e173..f0888f0a8d 100644 --- a/library/cpp/messagebus/actor/what_thread_does_guard.h +++ b/library/cpp/messagebus/actor/what_thread_does_guard.h @@ -1,40 +1,40 @@ -#pragma once - -#include "what_thread_does.h" - -template <class T> -class TWhatThreadDoesAcquireGuard: public TNonCopyable { +#pragma once + +#include "what_thread_does.h" + +template <class T> +class TWhatThreadDoesAcquireGuard: public TNonCopyable { public: inline TWhatThreadDoesAcquireGuard(const T& t, const char* acquire) noexcept { Init(&t, acquire); } - + inline TWhatThreadDoesAcquireGuard(const T* t, const char* acquire) noexcept { Init(t, acquire); } - + inline ~TWhatThreadDoesAcquireGuard() { Release(); } - + inline void Release() noexcept { if (WasAcquired()) { const_cast<T*>(T_)->Release(); T_ = nullptr; - } + } } - + inline bool WasAcquired() const noexcept { return T_ != nullptr; } - + private: inline void Init(const T* t, const char* acquire) noexcept { T_ = const_cast<T*>(t); TWhatThreadDoesPushPop pp(acquire); T_->Acquire(); } - + private: T* T_; -}; +}; diff --git a/library/cpp/messagebus/actor/what_thread_does_guard_ut.cpp b/library/cpp/messagebus/actor/what_thread_does_guard_ut.cpp index e4b218a7ca..74137f8f90 100644 --- a/library/cpp/messagebus/actor/what_thread_does_guard_ut.cpp +++ b/library/cpp/messagebus/actor/what_thread_does_guard_ut.cpp @@ -1,13 +1,13 @@ #include <library/cpp/testing/unittest/registar.h> - -#include "what_thread_does_guard.h" - + +#include "what_thread_does_guard.h" + #include <util/system/mutex.h> Y_UNIT_TEST_SUITE(WhatThreadDoesGuard) { Y_UNIT_TEST(Simple) { - TMutex mutex; - - TWhatThreadDoesAcquireGuard<TMutex> guard(mutex, "acquiring my mutex"); - } -} + TMutex mutex; + + TWhatThreadDoesAcquireGuard<TMutex> guard(mutex, "acquiring my mutex"); + } +} diff --git a/library/cpp/messagebus/actor/ya.make b/library/cpp/messagebus/actor/ya.make index 59bd1b0b99..1ea37f5b48 100644 --- a/library/cpp/messagebus/actor/ya.make +++ b/library/cpp/messagebus/actor/ya.make @@ -1,11 +1,11 @@ -LIBRARY(messagebus_actor) - +LIBRARY(messagebus_actor) + OWNER(g:messagebus) - -SRCS( - executor.cpp - thread_extra.cpp - what_thread_does.cpp -) - -END() + +SRCS( + executor.cpp + thread_extra.cpp + what_thread_does.cpp +) + +END() |