diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/messagebus/actor/executor.h | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/messagebus/actor/executor.h')
-rw-r--r-- | library/cpp/messagebus/actor/executor.h | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/library/cpp/messagebus/actor/executor.h b/library/cpp/messagebus/actor/executor.h new file mode 100644 index 0000000000..7292d8be53 --- /dev/null +++ b/library/cpp/messagebus/actor/executor.h @@ -0,0 +1,105 @@ +#pragma once + +#include "ring_buffer_with_spin_lock.h" + +#include <util/generic/array_ref.h> +#include <util/generic/vector.h> +#include <util/system/condvar.h> +#include <util/system/event.h> +#include <util/system/mutex.h> +#include <util/system/thread.h> +#include <util/thread/lfqueue.h> + +namespace NActor { + namespace NPrivate { + struct TExecutorHistory { + struct THistoryRecord { + ui32 MaxQueueSize; + }; + TVector<THistoryRecord> HistoryRecords; + ui64 LastHistoryRecordSecond; + + ui64 FirstHistoryRecordSecond() const { + return LastHistoryRecordSecond - HistoryRecords.size() + 1; + } + }; + + struct TExecutorStatus { + size_t WorkQueueSize = 0; + TExecutorHistory History; + TString Status; + }; + } + + class IWorkItem { + public: + virtual ~IWorkItem() { + } + virtual void DoWork(/* must release this */) = 0; + }; + + struct TExecutorWorker; + + class TExecutor: public TAtomicRefCount<TExecutor> { + friend struct TExecutorWorker; + + public: + struct TConfig { + size_t WorkerCount; + const char* Name; + + TConfig() + : WorkerCount(1) + , Name() + { + } + }; + + private: + struct TImpl; + THolder<TImpl> Impl; + + const TConfig Config; + + TAtomic ExitWorkers; + + TVector<TAutoPtr<TExecutorWorker>> WorkerThreads; + + TRingBufferWithSpinLock<IWorkItem*> WorkItems; + + TMutex WorkMutex; + TCondVar WorkAvailable; + + public: + explicit TExecutor(size_t workerCount); + TExecutor(const TConfig& config); + ~TExecutor(); + + void Stop(); + + void EnqueueWork(TArrayRef<IWorkItem* const> w); + + size_t GetWorkQueueSize() const; + TString GetStatus() const; + TString GetStatusSingleLine() const; + NPrivate::TExecutorStatus GetStatusRecordInternal() const; + + bool IsInExecutorThread() const; + + private: + void Init(); + + TAutoPtr<IWorkItem> DequeueWork(); + + void ProcessWorkQueueHere(); + + inline void RunWorkItem(TAutoPtr<IWorkItem>); + + void RunWorker(); + + ui32 GetMaxQueueSizeAndClear() const; + }; + + using TExecutorPtr = TIntrusivePtr<TExecutor>; + +} |