diff options
author | nga <nga@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:48:09 +0300 |
commit | 1f553f46fb4f3c5eec631352cdd900a0709016af (patch) | |
tree | a231fba2c03b440becaea6c86a2702d0bfb0336e /library/cpp/messagebus/actor/executor.cpp | |
parent | c4de7efdedc25b49cbea74bd589eecb61b55b60a (diff) | |
download | ydb-1f553f46fb4f3c5eec631352cdd900a0709016af.tar.gz |
Restoring authorship annotation for <nga@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/messagebus/actor/executor.cpp')
-rw-r--r-- | library/cpp/messagebus/actor/executor.cpp | 474 |
1 files changed, 237 insertions, 237 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp index 7a2227a458..fbd76a86ba 100644 --- a/library/cpp/messagebus/actor/executor.cpp +++ b/library/cpp/messagebus/actor/executor.cpp @@ -1,95 +1,95 @@ #include "executor.h" - -#include "thread_extra.h" -#include "what_thread_does.h" -#include "what_thread_does_guard.h" - + +#include "thread_extra.h" +#include "what_thread_does.h" +#include "what_thread_does_guard.h" + #include <util/generic/utility.h> #include <util/random/random.h> #include <util/stream/str.h> #include <util/system/tls.h> #include <util/system/yassert.h> - + #include <array> -using namespace NActor; -using namespace NActor::NPrivate; - -namespace { - struct THistoryInternal { - struct TRecord { +using namespace NActor; +using namespace NActor::NPrivate; + +namespace { + struct THistoryInternal { + struct TRecord { TAtomic MaxQueueSize; - + TRecord() : MaxQueueSize() { } - - TExecutorHistory::THistoryRecord Capture() { - TExecutorHistory::THistoryRecord r; + + TExecutorHistory::THistoryRecord Capture() { + TExecutorHistory::THistoryRecord r; r.MaxQueueSize = AtomicGet(MaxQueueSize); - return r; - } - }; - - ui64 Start; - ui64 LastTime; - + return r; + } + }; + + ui64 Start; + ui64 LastTime; + std::array<TRecord, 3600> Records; - - THistoryInternal() { - Start = TInstant::Now().Seconds(); - LastTime = Start - 1; - } - - TRecord& GetRecordForTime(ui64 time) { - return Records[time % Records.size()]; - } - - TRecord& GetNowRecord(ui64 now) { - for (ui64 t = LastTime + 1; t <= now; ++t) { - GetRecordForTime(t) = TRecord(); - } - LastTime = now; - return GetRecordForTime(now); - } - - TExecutorHistory Capture() { - TExecutorHistory history; - ui64 now = TInstant::Now().Seconds(); - ui64 lastHistoryRecord = now - 1; - ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1); - history.HistoryRecords.resize(historySize); - for (ui32 i = 0; i < historySize; ++i) { - history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture(); - } - history.LastHistoryRecordSecond = lastHistoryRecord; - return history; - } - }; - -} - + + THistoryInternal() { + Start = TInstant::Now().Seconds(); + LastTime = Start - 1; + } + + TRecord& GetRecordForTime(ui64 time) { + return Records[time % Records.size()]; + } + + TRecord& GetNowRecord(ui64 now) { + for (ui64 t = LastTime + 1; t <= now; ++t) { + GetRecordForTime(t) = TRecord(); + } + LastTime = now; + return GetRecordForTime(now); + } + + TExecutorHistory Capture() { + TExecutorHistory history; + ui64 now = TInstant::Now().Seconds(); + ui64 lastHistoryRecord = now - 1; + ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1); + history.HistoryRecords.resize(historySize); + for (ui32 i = 0; i < historySize; ++i) { + history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture(); + } + history.LastHistoryRecordSecond = lastHistoryRecord; + return history; + } + }; + +} + Y_POD_STATIC_THREAD(TExecutor*) ThreadCurrentExecutor; - -static const char* NoLocation = "nowhere"; - -struct TExecutorWorkerThreadLocalData { - ui32 MaxQueueSize; -}; - -static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData; + +static const char* NoLocation = "nowhere"; + +struct TExecutorWorkerThreadLocalData { + ui32 MaxQueueSize; +}; + +static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData; Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData) WorkerThreadLocalData; - -namespace NActor { + +namespace NActor { struct TExecutorWorker { TExecutor* const Executor; TThread Thread; const char** WhatThreadDoesLocation; TExecutorWorkerThreadLocalData* ThreadLocalData; - + TExecutorWorker(TExecutor* executor) : Executor(executor) , Thread(RunThreadProc, this) @@ -98,241 +98,241 @@ namespace NActor { { Thread.Start(); } - + void Run() { WhatThreadDoesLocation = ::WhatThreadDoesLocation(); AtomicSet(ThreadLocalData, &::WorkerThreadLocalData); WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); Executor->RunWorker(); } - + static void* RunThreadProc(void* thiz0) { TExecutorWorker* thiz = (TExecutorWorker*)thiz0; thiz->Run(); return nullptr; } }; - + struct TExecutor::TImpl { TExecutor* const Executor; THistoryInternal History; - + TSystemEvent HelperStopSignal; TThread HelperThread; - + TImpl(TExecutor* executor) : Executor(executor) , HelperThread(HelperThreadProc, this) { } - + void RunHelper() { ui64 nowSeconds = TInstant::Now().Seconds(); for (;;) { TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000)); - + if (HelperStopSignal.WaitD(nextStop)) { return; } - + nowSeconds = nextStop.Seconds(); - + THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds); - + ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear(); if (maxQueueSize > record.MaxQueueSize) { AtomicSet(record.MaxQueueSize, maxQueueSize); } - } - } - + } + } + static void* HelperThreadProc(void* impl0) { TImpl* impl = (TImpl*)impl0; impl->RunHelper(); return nullptr; } }; - -} - -static TExecutor::TConfig MakeConfig(unsigned workerCount) { - TExecutor::TConfig config; - config.WorkerCount = workerCount; - return config; -} - -TExecutor::TExecutor(size_t workerCount) - : Config(MakeConfig(workerCount)) -{ - Init(); -} - -TExecutor::TExecutor(const TExecutor::TConfig& config) - : Config(config) -{ - Init(); + } - -void TExecutor::Init() { - Impl.Reset(new TImpl(this)); - + +static TExecutor::TConfig MakeConfig(unsigned workerCount) { + TExecutor::TConfig config; + config.WorkerCount = workerCount; + return config; +} + +TExecutor::TExecutor(size_t workerCount) + : Config(MakeConfig(workerCount)) +{ + Init(); +} + +TExecutor::TExecutor(const TExecutor::TConfig& config) + : Config(config) +{ + Init(); +} + +void TExecutor::Init() { + Impl.Reset(new TImpl(this)); + AtomicSet(ExitWorkers, 0); - + Y_VERIFY(Config.WorkerCount > 0); - - for (size_t i = 0; i < Config.WorkerCount; i++) { - WorkerThreads.push_back(new TExecutorWorker(this)); - } - - Impl->HelperThread.Start(); -} - -TExecutor::~TExecutor() { - Stop(); -} - -void TExecutor::Stop() { + + for (size_t i = 0; i < Config.WorkerCount; i++) { + WorkerThreads.push_back(new TExecutorWorker(this)); + } + + Impl->HelperThread.Start(); +} + +TExecutor::~TExecutor() { + Stop(); +} + +void TExecutor::Stop() { AtomicSet(ExitWorkers, 1); - - Impl->HelperStopSignal.Signal(); - Impl->HelperThread.Join(); - - { - TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop"); - WorkAvailable.BroadCast(); - } - - for (size_t i = 0; i < WorkerThreads.size(); i++) { - WorkerThreads[i]->Thread.Join(); - } - - // TODO: make queue empty at this point - ProcessWorkQueueHere(); -} - + + Impl->HelperStopSignal.Signal(); + Impl->HelperThread.Join(); + + { + TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop"); + WorkAvailable.BroadCast(); + } + + for (size_t i = 0; i < WorkerThreads.size(); i++) { + WorkerThreads[i]->Thread.Join(); + } + + // TODO: make queue empty at this point + ProcessWorkQueueHere(); +} + void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) { - if (wis.empty()) - return; - + if (wis.empty()) + return; + if (Y_UNLIKELY(AtomicGet(ExitWorkers) != 0)) { Y_VERIFY(WorkItems.Empty(), "executor %s: cannot add tasks after queue shutdown", Config.Name); - } - - TWhatThreadDoesPushPop pp("executor: EnqueueWork"); - - WorkItems.PushAll(wis); - - { - if (wis.size() == 1) { - TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork"); - WorkAvailable.Signal(); - } else { - TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork"); - WorkAvailable.BroadCast(); - } - } -} - + } + + TWhatThreadDoesPushPop pp("executor: EnqueueWork"); + + WorkItems.PushAll(wis); + + { + if (wis.size() == 1) { + TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork"); + WorkAvailable.Signal(); + } else { + TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork"); + WorkAvailable.BroadCast(); + } + } +} + size_t TExecutor::GetWorkQueueSize() const { - return WorkItems.Size(); -} - + return WorkItems.Size(); +} + using namespace NTSAN; -ui32 TExecutor::GetMaxQueueSizeAndClear() const { - ui32 max = 0; - for (unsigned i = 0; i < WorkerThreads.size(); ++i) { +ui32 TExecutor::GetMaxQueueSizeAndClear() const { + ui32 max = 0; + for (unsigned i = 0; i < WorkerThreads.size(); ++i) { TExecutorWorkerThreadLocalData* wtls = RelaxedLoad(&WorkerThreads[i]->ThreadLocalData); max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize)); RelaxedStore<ui32>(&wtls->MaxQueueSize, 0); - } - return max; -} - + } + return max; +} + TString TExecutor::GetStatus() const { - return GetStatusRecordInternal().Status; -} - + return GetStatusRecordInternal().Status; +} + TString TExecutor::GetStatusSingleLine() const { - TStringStream ss; - ss << "work items: " << GetWorkQueueSize(); - return ss.Str(); -} - + TStringStream ss; + ss << "work items: " << GetWorkQueueSize(); + return ss.Str(); +} + TExecutorStatus TExecutor::GetStatusRecordInternal() const { - TExecutorStatus r; - - r.WorkQueueSize = GetWorkQueueSize(); - - { - TStringStream ss; - ss << "work items: " << GetWorkQueueSize() << "\n"; - ss << "workers:\n"; - for (unsigned i = 0; i < WorkerThreads.size(); ++i) { + TExecutorStatus r; + + r.WorkQueueSize = GetWorkQueueSize(); + + { + TStringStream ss; + ss << "work items: " << GetWorkQueueSize() << "\n"; + ss << "workers:\n"; + for (unsigned i = 0; i < WorkerThreads.size(); ++i) { ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n"; - } - r.Status = ss.Str(); - } - - r.History = Impl->History.Capture(); - - return r; -} - + } + r.Status = ss.Str(); + } + + r.History = Impl->History.Capture(); + + return r; +} + bool TExecutor::IsInExecutorThread() const { - return ThreadCurrentExecutor == this; -} - -TAutoPtr<IWorkItem> TExecutor::DequeueWork() { - IWorkItem* wi = reinterpret_cast<IWorkItem*>(1); - size_t queueSize = Max<size_t>(); - if (!WorkItems.TryPop(&wi, &queueSize)) { - TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork"); - while (!WorkItems.TryPop(&wi, &queueSize)) { + return ThreadCurrentExecutor == this; +} + +TAutoPtr<IWorkItem> TExecutor::DequeueWork() { + IWorkItem* wi = reinterpret_cast<IWorkItem*>(1); + size_t queueSize = Max<size_t>(); + if (!WorkItems.TryPop(&wi, &queueSize)) { + TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork"); + while (!WorkItems.TryPop(&wi, &queueSize)) { if (AtomicGet(ExitWorkers) != 0) return nullptr; - - TWhatThreadDoesPushPop pp("waiting for work on condvar"); - WorkAvailable.Wait(WorkMutex); - } - } + + TWhatThreadDoesPushPop pp("waiting for work on condvar"); + WorkAvailable.Wait(WorkMutex); + } + } auto& wtls = TlsRef(WorkerThreadLocalData); if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) { RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize); - } - - return wi; -} - -void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) { - WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); - wi.Release()->DoWork(); -} - -void TExecutor::ProcessWorkQueueHere() { - IWorkItem* wi; - while (WorkItems.TryPop(&wi)) { - RunWorkItem(wi); - } -} - -void TExecutor::RunWorker() { + } + + return wi; +} + +void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) { + WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); + wi.Release()->DoWork(); +} + +void TExecutor::ProcessWorkQueueHere() { + IWorkItem* wi; + while (WorkItems.TryPop(&wi)) { + RunWorkItem(wi); + } +} + +void TExecutor::RunWorker() { Y_VERIFY(!ThreadCurrentExecutor, "state check"); - ThreadCurrentExecutor = this; - + ThreadCurrentExecutor = this; + SetCurrentThreadName("wrkr"); - - for (;;) { - TAutoPtr<IWorkItem> wi = DequeueWork(); - if (!wi) { - break; - } - // Note for messagebus users: make sure program crashes - // on uncaught exception in thread, otherewise messagebus may just hang on error. - RunWorkItem(wi); - } - + + for (;;) { + TAutoPtr<IWorkItem> wi = DequeueWork(); + if (!wi) { + break; + } + // Note for messagebus users: make sure program crashes + // on uncaught exception in thread, otherewise messagebus may just hang on error. + RunWorkItem(wi); + } + ThreadCurrentExecutor = (TExecutor*)nullptr; -} +} |