aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/actor/executor.cpp
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/actor/executor.cpp
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/actor/executor.cpp')
-rw-r--r--library/cpp/messagebus/actor/executor.cpp338
1 files changed, 338 insertions, 0 deletions
diff --git a/library/cpp/messagebus/actor/executor.cpp b/library/cpp/messagebus/actor/executor.cpp
new file mode 100644
index 0000000000..7a2227a458
--- /dev/null
+++ b/library/cpp/messagebus/actor/executor.cpp
@@ -0,0 +1,338 @@
+#include "executor.h"
+
+#include "thread_extra.h"
+#include "what_thread_does.h"
+#include "what_thread_does_guard.h"
+
+#include <util/generic/utility.h>
+#include <util/random/random.h>
+#include <util/stream/str.h>
+#include <util/system/tls.h>
+#include <util/system/yassert.h>
+
+#include <array>
+
+using namespace NActor;
+using namespace NActor::NPrivate;
+
+namespace {
+ struct THistoryInternal {
+ struct TRecord {
+ TAtomic MaxQueueSize;
+
+ TRecord()
+ : MaxQueueSize()
+ {
+ }
+
+ TExecutorHistory::THistoryRecord Capture() {
+ TExecutorHistory::THistoryRecord r;
+ r.MaxQueueSize = AtomicGet(MaxQueueSize);
+ return r;
+ }
+ };
+
+ ui64 Start;
+ ui64 LastTime;
+
+ std::array<TRecord, 3600> Records;
+
+ THistoryInternal() {
+ Start = TInstant::Now().Seconds();
+ LastTime = Start - 1;
+ }
+
+ TRecord& GetRecordForTime(ui64 time) {
+ return Records[time % Records.size()];
+ }
+
+ TRecord& GetNowRecord(ui64 now) {
+ for (ui64 t = LastTime + 1; t <= now; ++t) {
+ GetRecordForTime(t) = TRecord();
+ }
+ LastTime = now;
+ return GetRecordForTime(now);
+ }
+
+ TExecutorHistory Capture() {
+ TExecutorHistory history;
+ ui64 now = TInstant::Now().Seconds();
+ ui64 lastHistoryRecord = now - 1;
+ ui32 historySize = Min<ui32>(lastHistoryRecord - Start, Records.size() - 1);
+ history.HistoryRecords.resize(historySize);
+ for (ui32 i = 0; i < historySize; ++i) {
+ history.HistoryRecords[i] = GetRecordForTime(lastHistoryRecord - historySize + i).Capture();
+ }
+ history.LastHistoryRecordSecond = lastHistoryRecord;
+ return history;
+ }
+ };
+
+}
+
+Y_POD_STATIC_THREAD(TExecutor*)
+ThreadCurrentExecutor;
+
+static const char* NoLocation = "nowhere";
+
+struct TExecutorWorkerThreadLocalData {
+ ui32 MaxQueueSize;
+};
+
+static TExecutorWorkerThreadLocalData WorkerNoThreadLocalData;
+Y_POD_STATIC_THREAD(TExecutorWorkerThreadLocalData)
+WorkerThreadLocalData;
+
+namespace NActor {
+ struct TExecutorWorker {
+ TExecutor* const Executor;
+ TThread Thread;
+ const char** WhatThreadDoesLocation;
+ TExecutorWorkerThreadLocalData* ThreadLocalData;
+
+ TExecutorWorker(TExecutor* executor)
+ : Executor(executor)
+ , Thread(RunThreadProc, this)
+ , WhatThreadDoesLocation(&NoLocation)
+ , ThreadLocalData(&::WorkerNoThreadLocalData)
+ {
+ Thread.Start();
+ }
+
+ void Run() {
+ WhatThreadDoesLocation = ::WhatThreadDoesLocation();
+ AtomicSet(ThreadLocalData, &::WorkerThreadLocalData);
+ WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
+ Executor->RunWorker();
+ }
+
+ static void* RunThreadProc(void* thiz0) {
+ TExecutorWorker* thiz = (TExecutorWorker*)thiz0;
+ thiz->Run();
+ return nullptr;
+ }
+ };
+
+ struct TExecutor::TImpl {
+ TExecutor* const Executor;
+ THistoryInternal History;
+
+ TSystemEvent HelperStopSignal;
+ TThread HelperThread;
+
+ TImpl(TExecutor* executor)
+ : Executor(executor)
+ , HelperThread(HelperThreadProc, this)
+ {
+ }
+
+ void RunHelper() {
+ ui64 nowSeconds = TInstant::Now().Seconds();
+ for (;;) {
+ TInstant nextStop = TInstant::Seconds(nowSeconds + 1) + TDuration::MilliSeconds(RandomNumber<ui32>(1000));
+
+ if (HelperStopSignal.WaitD(nextStop)) {
+ return;
+ }
+
+ nowSeconds = nextStop.Seconds();
+
+ THistoryInternal::TRecord& record = History.GetNowRecord(nowSeconds);
+
+ ui32 maxQueueSize = Executor->GetMaxQueueSizeAndClear();
+ if (maxQueueSize > record.MaxQueueSize) {
+ AtomicSet(record.MaxQueueSize, maxQueueSize);
+ }
+ }
+ }
+
+ static void* HelperThreadProc(void* impl0) {
+ TImpl* impl = (TImpl*)impl0;
+ impl->RunHelper();
+ return nullptr;
+ }
+ };
+
+}
+
+static TExecutor::TConfig MakeConfig(unsigned workerCount) {
+ TExecutor::TConfig config;
+ config.WorkerCount = workerCount;
+ return config;
+}
+
+TExecutor::TExecutor(size_t workerCount)
+ : Config(MakeConfig(workerCount))
+{
+ Init();
+}
+
+TExecutor::TExecutor(const TExecutor::TConfig& config)
+ : Config(config)
+{
+ Init();
+}
+
+void TExecutor::Init() {
+ Impl.Reset(new TImpl(this));
+
+ AtomicSet(ExitWorkers, 0);
+
+ Y_VERIFY(Config.WorkerCount > 0);
+
+ for (size_t i = 0; i < Config.WorkerCount; i++) {
+ WorkerThreads.push_back(new TExecutorWorker(this));
+ }
+
+ Impl->HelperThread.Start();
+}
+
+TExecutor::~TExecutor() {
+ Stop();
+}
+
+void TExecutor::Stop() {
+ AtomicSet(ExitWorkers, 1);
+
+ Impl->HelperStopSignal.Signal();
+ Impl->HelperThread.Join();
+
+ {
+ TWhatThreadDoesAcquireGuard<TMutex> guard(WorkMutex, "executor: acquiring lock for Stop");
+ WorkAvailable.BroadCast();
+ }
+
+ for (size_t i = 0; i < WorkerThreads.size(); i++) {
+ WorkerThreads[i]->Thread.Join();
+ }
+
+ // TODO: make queue empty at this point
+ ProcessWorkQueueHere();
+}
+
+void TExecutor::EnqueueWork(TArrayRef<IWorkItem* const> wis) {
+ if (wis.empty())
+ return;
+
+ if (Y_UNLIKELY(AtomicGet(ExitWorkers) != 0)) {
+ Y_VERIFY(WorkItems.Empty(), "executor %s: cannot add tasks after queue shutdown", Config.Name);
+ }
+
+ TWhatThreadDoesPushPop pp("executor: EnqueueWork");
+
+ WorkItems.PushAll(wis);
+
+ {
+ if (wis.size() == 1) {
+ TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
+ WorkAvailable.Signal();
+ } else {
+ TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for EnqueueWork");
+ WorkAvailable.BroadCast();
+ }
+ }
+}
+
+size_t TExecutor::GetWorkQueueSize() const {
+ return WorkItems.Size();
+}
+
+using namespace NTSAN;
+
+ui32 TExecutor::GetMaxQueueSizeAndClear() const {
+ ui32 max = 0;
+ for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
+ TExecutorWorkerThreadLocalData* wtls = RelaxedLoad(&WorkerThreads[i]->ThreadLocalData);
+ max = Max<ui32>(max, RelaxedLoad(&wtls->MaxQueueSize));
+ RelaxedStore<ui32>(&wtls->MaxQueueSize, 0);
+ }
+ return max;
+}
+
+TString TExecutor::GetStatus() const {
+ return GetStatusRecordInternal().Status;
+}
+
+TString TExecutor::GetStatusSingleLine() const {
+ TStringStream ss;
+ ss << "work items: " << GetWorkQueueSize();
+ return ss.Str();
+}
+
+TExecutorStatus TExecutor::GetStatusRecordInternal() const {
+ TExecutorStatus r;
+
+ r.WorkQueueSize = GetWorkQueueSize();
+
+ {
+ TStringStream ss;
+ ss << "work items: " << GetWorkQueueSize() << "\n";
+ ss << "workers:\n";
+ for (unsigned i = 0; i < WorkerThreads.size(); ++i) {
+ ss << "-- " << AtomicGet(*AtomicGet(WorkerThreads[i]->WhatThreadDoesLocation)) << "\n";
+ }
+ r.Status = ss.Str();
+ }
+
+ r.History = Impl->History.Capture();
+
+ return r;
+}
+
+bool TExecutor::IsInExecutorThread() const {
+ return ThreadCurrentExecutor == this;
+}
+
+TAutoPtr<IWorkItem> TExecutor::DequeueWork() {
+ IWorkItem* wi = reinterpret_cast<IWorkItem*>(1);
+ size_t queueSize = Max<size_t>();
+ if (!WorkItems.TryPop(&wi, &queueSize)) {
+ TWhatThreadDoesAcquireGuard<TMutex> g(WorkMutex, "executor: acquiring lock for DequeueWork");
+ while (!WorkItems.TryPop(&wi, &queueSize)) {
+ if (AtomicGet(ExitWorkers) != 0)
+ return nullptr;
+
+ TWhatThreadDoesPushPop pp("waiting for work on condvar");
+ WorkAvailable.Wait(WorkMutex);
+ }
+ }
+
+ auto& wtls = TlsRef(WorkerThreadLocalData);
+
+ if (queueSize > RelaxedLoad(&wtls.MaxQueueSize)) {
+ RelaxedStore<ui32>(&wtls.MaxQueueSize, queueSize);
+ }
+
+ return wi;
+}
+
+void TExecutor::RunWorkItem(TAutoPtr<IWorkItem> wi) {
+ WHAT_THREAD_DOES_PUSH_POP_CURRENT_FUNC();
+ wi.Release()->DoWork();
+}
+
+void TExecutor::ProcessWorkQueueHere() {
+ IWorkItem* wi;
+ while (WorkItems.TryPop(&wi)) {
+ RunWorkItem(wi);
+ }
+}
+
+void TExecutor::RunWorker() {
+ Y_VERIFY(!ThreadCurrentExecutor, "state check");
+ ThreadCurrentExecutor = this;
+
+ SetCurrentThreadName("wrkr");
+
+ for (;;) {
+ TAutoPtr<IWorkItem> wi = DequeueWork();
+ if (!wi) {
+ break;
+ }
+ // Note for messagebus users: make sure program crashes
+ // on uncaught exception in thread, otherewise messagebus may just hang on error.
+ RunWorkItem(wi);
+ }
+
+ ThreadCurrentExecutor = (TExecutor*)nullptr;
+}