diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/actor/executor.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/actor/executor.cpp')
-rw-r--r-- | library/cpp/messagebus/actor/executor.cpp | 338 |
1 files changed, 338 insertions, 0 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp new file mode 100644 index 0000000000..7a2227a458 --- /dev/null +++ b/library/cpp/messagebus/actor/executor.cpp @@ -0,0 +1,338 @@ +#include "executor.h" + +#include "thread_extra.h" +#include "what_thread_does.h" +#include "what_thread_does_guard.h" + +#include <util/generic/utility.h> +#include <util/random/random.h> +#include <util/stream/str.h> +#include <util/system/tls.h> +#include <util/system/yassert.h> + +#include <array> + +using namespace NActor; +using namespace NActor::NPrivate; + +namespace { + struct THistoryInternal { + struct TRecord { + TAtomic MaxQueueSize; + + TRecord() + : MaxQueueSize() + { + } + + TExecutorHistory::THistoryRecord Capture() { + TExecutorHistory::THistoryRecord r; + r.MaxQueueSize = AtomicGet(MaxQueueSize); + return r; + } + }; + + ui64 Start; + ui64 LastTime; + + std::array<TRecord, 3600> Records; + + THistoryInternal() { + Start = TInstant::Now().Seconds(); + LastTime = Start - 1; + } + + TRecord& GetRecordForTime(ui64 time) { + return Records[time % Records.size()]; + } + + TRecord& GetNowRecord(ui64 now) { + for (ui64 t = LastTime + 1; t <= now; ++t) { + GetRecordForTime(t) = TRecord(); + } + LastTime = now; + return GetRecordForTime(now); + } + + TExecutorHistory Capture() { + TExecutorHistory history; + ui64 now = TInstant::Now().Seconds(); + ui64 lastHistoryRecord = now - 1; + ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1); + history.HistoryRecords.resize(historySize); + for (ui32 i = 0; i < historySize; ++i) { + history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture(); + } + history.LastHistoryRecordSecond = lastHistoryRecord; + return history; + } + }; + +} + +Y_POD_STATIC_THREAD(TExecutor*) +ThreadCurrentExecutor; + +static const char* NoLocation = "nowhere"; + +struct TExecutorWorkerThreadLocalData { + ui32 MaxQueueSize; +}; + +static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData; +Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData) +WorkerThreadLocalData; + +namespace NActor { + struct TExecutorWorker { + TExecutor* const Executor; + TThread Thread; + const char** WhatThreadDoesLocation; + TExecutorWorkerThreadLocalData* ThreadLocalData; + + TExecutorWorker(TExecutor* executor) + : Executor(executor) + , Thread(RunThreadProc, this) + , WhatThreadDoesLocation(&NoLocation) + , ThreadLocalData(&::WorkerNoThreadLocalData) + { + Thread.Start(); + } + + void Run() { + WhatThreadDoesLocation = ::WhatThreadDoesLocation(); + AtomicSet(ThreadLocalData, &::WorkerThreadLocalData); + WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); + Executor->RunWorker(); + } + + static void* RunThreadProc(void* thiz0) { + TExecutorWorker* thiz = (TExecutorWorker*)thiz0; + thiz->Run(); + return nullptr; + } + }; + + struct TExecutor::TImpl { + TExecutor* const Executor; + THistoryInternal History; + + TSystemEvent HelperStopSignal; + TThread HelperThread; + + TImpl(TExecutor* executor) + : Executor(executor) + , HelperThread(HelperThreadProc, this) + { + } + + void RunHelper() { + ui64 nowSeconds = TInstant::Now().Seconds(); + for (;;) { + TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000)); + + if (HelperStopSignal.WaitD(nextStop)) { + return; + } + + nowSeconds = nextStop.Seconds(); + + THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds); + + ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear(); + if (maxQueueSize > record.MaxQueueSize) { + AtomicSet(record.MaxQueueSize, maxQueueSize); + } + } + } + + static void* HelperThreadProc(void* impl0) { + TImpl* impl = (TImpl*)impl0; + impl->RunHelper(); + return nullptr; + } + }; + +} + +static TExecutor::TConfig MakeConfig(unsigned workerCount) { + TExecutor::TConfig config; + config.WorkerCount = workerCount; + return config; +} + +TExecutor::TExecutor(size_t workerCount) + : Config(MakeConfig(workerCount)) +{ + Init(); +} + +TExecutor::TExecutor(const TExecutor::TConfig& config) + : Config(config) +{ + Init(); +} + +void TExecutor::Init() { + Impl.Reset(new TImpl(this)); + + AtomicSet(ExitWorkers, 0); + + Y_VERIFY(Config.WorkerCount > 0); + + for (size_t i = 0; i < Config.WorkerCount; i++) { + WorkerThreads.push_back(new TExecutorWorker(this)); + } + + Impl->HelperThread.Start(); +} + +TExecutor::~TExecutor() { + Stop(); +} + +void TExecutor::Stop() { + AtomicSet(ExitWorkers, 1); + + Impl->HelperStopSignal.Signal(); + Impl->HelperThread.Join(); + + { + TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop"); + WorkAvailable.BroadCast(); + } + + for (size_t i = 0; i < WorkerThreads.size(); i++) { + WorkerThreads[i]->Thread.Join(); + } + + // TODO: make queue empty at this point + ProcessWorkQueueHere(); +} + +void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) { + if (wis.empty()) + return; + + if (Y_UNLIKELY(AtomicGet(ExitWorkers) != 0)) { + Y_VERIFY(WorkItems.Empty(), "executor %s: cannot add tasks after queue shutdown", Config.Name); + } + + TWhatThreadDoesPushPop pp("executor: EnqueueWork"); + + WorkItems.PushAll(wis); + + { + if (wis.size() == 1) { + TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork"); + WorkAvailable.Signal(); + } else { + TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork"); + WorkAvailable.BroadCast(); + } + } +} + +size_t TExecutor::GetWorkQueueSize() const { + return WorkItems.Size(); +} + +using namespace NTSAN; + +ui32 TExecutor::GetMaxQueueSizeAndClear() const { + ui32 max = 0; + for (unsigned i = 0; i < WorkerThreads.size(); ++i) { + TExecutorWorkerThreadLocalData* wtls = RelaxedLoad(&WorkerThreads[i]->ThreadLocalData); + max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize)); + RelaxedStore<ui32>(&wtls->MaxQueueSize, 0); + } + return max; +} + +TString TExecutor::GetStatus() const { + return GetStatusRecordInternal().Status; +} + +TString TExecutor::GetStatusSingleLine() const { + TStringStream ss; + ss << "work items: " << GetWorkQueueSize(); + return ss.Str(); +} + +TExecutorStatus TExecutor::GetStatusRecordInternal() const { + TExecutorStatus r; + + r.WorkQueueSize = GetWorkQueueSize(); + + { + TStringStream ss; + ss << "work items: " << GetWorkQueueSize() << "\n"; + ss << "workers:\n"; + for (unsigned i = 0; i < WorkerThreads.size(); ++i) { + ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n"; + } + r.Status = ss.Str(); + } + + r.History = Impl->History.Capture(); + + return r; +} + +bool TExecutor::IsInExecutorThread() const { + return ThreadCurrentExecutor == this; +} + +TAutoPtr<IWorkItem> TExecutor::DequeueWork() { + IWorkItem* wi = reinterpret_cast<IWorkItem*>(1); + size_t queueSize = Max<size_t>(); + if (!WorkItems.TryPop(&wi, &queueSize)) { + TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork"); + while (!WorkItems.TryPop(&wi, &queueSize)) { + if (AtomicGet(ExitWorkers) != 0) + return nullptr; + + TWhatThreadDoesPushPop pp("waiting for work on condvar"); + WorkAvailable.Wait(WorkMutex); + } + } + + auto& wtls = TlsRef(WorkerThreadLocalData); + + if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) { + RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize); + } + + return wi; +} + +void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) { + WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC(); + wi.Release()->DoWork(); +} + +void TExecutor::ProcessWorkQueueHere() { + IWorkItem* wi; + while (WorkItems.TryPop(&wi)) { + RunWorkItem(wi); + } +} + +void TExecutor::RunWorker() { + Y_VERIFY(!ThreadCurrentExecutor, "state check"); + ThreadCurrentExecutor = this; + + SetCurrentThreadName("wrkr"); + + for (;;) { + TAutoPtr<IWorkItem> wi = DequeueWork(); + if (!wi) { + break; + } + // Note for messagebus users: make sure program crashes + // on uncaught exception in thread, otherewise messagebus may just hang on error. + RunWorkItem(wi); + } + + ThreadCurrentExecutor = (TExecutor*)nullptr; +} |