aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/messagebus/actor/executor.h
diff options
context:
space:
mode:
authorAnton Samokhvalov <pg83@yandex.ru>2022-02-10 16:45:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:17 +0300
commitd3a398281c6fd1d3672036cb2d63f842d2cb28c5 (patch)
treedd4bd3ca0f36b817e96812825ffaf10d645803f2 /library/cpp/messagebus/actor/executor.h
parent72cb13b4aff9bc9cf22e49251bc8fd143f82538f (diff)
downloadydb-d3a398281c6fd1d3672036cb2d63f842d2cb28c5.tar.gz
Restoring authorship annotation for Anton Samokhvalov <pg83@yandex.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/messagebus/actor/executor.h')
-rw-r--r--library/cpp/messagebus/actor/executor.h130
1 files changed, 65 insertions, 65 deletions
diff --git a/library/cpp/messagebus/actor/executor.h b/library/cpp/messagebus/actor/executor.h
index 4b9bcb1da0..7292d8be53 100644
--- a/library/cpp/messagebus/actor/executor.h
+++ b/library/cpp/messagebus/actor/executor.h
@@ -11,95 +11,95 @@
#include <util/thread/lfqueue.h>
namespace NActor {
- namespace NPrivate {
- struct TExecutorHistory {
- struct THistoryRecord {
- ui32 MaxQueueSize;
- };
- TVector<THistoryRecord> HistoryRecords;
- ui64 LastHistoryRecordSecond;
-
- ui64 FirstHistoryRecordSecond() const {
- return LastHistoryRecordSecond - HistoryRecords.size() + 1;
- }
- };
-
- struct TExecutorStatus {
- size_t WorkQueueSize = 0;
- TExecutorHistory History;
- TString Status;
+ namespace NPrivate {
+ struct TExecutorHistory {
+ struct THistoryRecord {
+ ui32 MaxQueueSize;
+ };
+ TVector<THistoryRecord> HistoryRecords;
+ ui64 LastHistoryRecordSecond;
+
+ ui64 FirstHistoryRecordSecond() const {
+ return LastHistoryRecordSecond - HistoryRecords.size() + 1;
+ }
};
- }
- class IWorkItem {
- public:
- virtual ~IWorkItem() {
+ struct TExecutorStatus {
+ size_t WorkQueueSize = 0;
+ TExecutorHistory History;
+ TString Status;
+ };
+ }
+
+ class IWorkItem {
+ public:
+ virtual ~IWorkItem() {
}
- virtual void DoWork(/* must release this */) = 0;
+ virtual void DoWork(/* must release this */) = 0;
};
- struct TExecutorWorker;
+ struct TExecutorWorker;
- class TExecutor: public TAtomicRefCount<TExecutor> {
- friend struct TExecutorWorker;
+ class TExecutor: public TAtomicRefCount<TExecutor> {
+ friend struct TExecutorWorker;
- public:
- struct TConfig {
- size_t WorkerCount;
- const char* Name;
+ public:
+ struct TConfig {
+ size_t WorkerCount;
+ const char* Name;
- TConfig()
- : WorkerCount(1)
- , Name()
- {
- }
- };
+ TConfig()
+ : WorkerCount(1)
+ , Name()
+ {
+ }
+ };
- private:
- struct TImpl;
- THolder<TImpl> Impl;
+ private:
+ struct TImpl;
+ THolder<TImpl> Impl;
- const TConfig Config;
+ const TConfig Config;
- TAtomic ExitWorkers;
+ TAtomic ExitWorkers;
- TVector<TAutoPtr<TExecutorWorker>> WorkerThreads;
+ TVector<TAutoPtr<TExecutorWorker>> WorkerThreads;
- TRingBufferWithSpinLock<IWorkItem*> WorkItems;
+ TRingBufferWithSpinLock<IWorkItem*> WorkItems;
- TMutex WorkMutex;
- TCondVar WorkAvailable;
+ TMutex WorkMutex;
+ TCondVar WorkAvailable;
- public:
- explicit TExecutor(size_t workerCount);
- TExecutor(const TConfig& config);
- ~TExecutor();
+ public:
+ explicit TExecutor(size_t workerCount);
+ TExecutor(const TConfig& config);
+ ~TExecutor();
- void Stop();
+ void Stop();
- void EnqueueWork(TArrayRef<IWorkItem* const> w);
+ void EnqueueWork(TArrayRef<IWorkItem* const> w);
- size_t GetWorkQueueSize() const;
- TString GetStatus() const;
- TString GetStatusSingleLine() const;
- NPrivate::TExecutorStatus GetStatusRecordInternal() const;
+ size_t GetWorkQueueSize() const;
+ TString GetStatus() const;
+ TString GetStatusSingleLine() const;
+ NPrivate::TExecutorStatus GetStatusRecordInternal() const;
- bool IsInExecutorThread() const;
+ bool IsInExecutorThread() const;
- private:
- void Init();
+ private:
+ void Init();
- TAutoPtr<IWorkItem> DequeueWork();
+ TAutoPtr<IWorkItem> DequeueWork();
- void ProcessWorkQueueHere();
+ void ProcessWorkQueueHere();
- inline void RunWorkItem(TAutoPtr<IWorkItem>);
+ inline void RunWorkItem(TAutoPtr<IWorkItem>);
- void RunWorker();
+ void RunWorker();
- ui32 GetMaxQueueSizeAndClear() const;
- };
+ ui32 GetMaxQueueSizeAndClear() const;
+ };
- using TExecutorPtr = TIntrusivePtr<TExecutor>;
+ using TExecutorPtr = TIntrusivePtr<TExecutor>;
-}
+}