diff options
author | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
---|---|---|
committer | monster <monster@ydb.tech> | 2022-07-07 14:41:37 +0300 |
commit | 06e5c21a835c0e923506c4ff27929f34e00761c2 (patch) | |
tree | 75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/jobqueue.cpp | |
parent | 03f024c4412e3aa613bb543cf1660176320ba8f4 (diff) | |
download | ydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz |
fix ya.make
Diffstat (limited to 'library/cpp/neh/jobqueue.cpp')
-rw-r--r-- | library/cpp/neh/jobqueue.cpp | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/library/cpp/neh/jobqueue.cpp b/library/cpp/neh/jobqueue.cpp new file mode 100644 index 0000000000..8d02fd9bbf --- /dev/null +++ b/library/cpp/neh/jobqueue.cpp @@ -0,0 +1,79 @@ +#include "utils.h" +#include "lfqueue.h" +#include "jobqueue.h" +#include "pipequeue.h" + +#include <util/thread/factory.h> +#include <util/generic/singleton.h> +#include <util/system/thread.h> + +using namespace NNeh; + +namespace { + class TExecThread: public IThreadFactory::IThreadAble, public IJob { + public: + TExecThread() + : T_(SystemThreadFactory()->Run(this)) + { + } + + ~TExecThread() override { + Enqueue(this); + T_->Join(); + } + + inline void Enqueue(IJob* job) { + Q_.Enqueue(job); + } + + private: + void DoRun(TCont* c) override { + c->Executor()->Abort(); + } + + void DoExecute() override { + SetHighestThreadPriority(); + + TContExecutor e(RealStackSize(20000)); + + e.Execute<TExecThread, &TExecThread::Dispatcher>(this); + } + + inline void Dispatcher(TCont* c) { + IJob* job; + + while ((job = Q_.Dequeue(c))) { + try { + c->Executor()->Create(*job, "job"); + } catch (...) { + (*job)(c); + } + } + } + + typedef TAutoPtr<IThreadFactory::IThread> IThreadRef; + TOneConsumerPipeQueue<IJob> Q_; + IThreadRef T_; + }; + + class TJobScatter: public IJobQueue { + public: + inline TJobScatter() { + for (size_t i = 0; i < 2; ++i) { + E_.push_back(new TExecThread()); + } + } + + void ScheduleImpl(IJob* job) override { + E_[TThread::CurrentThreadId() % E_.size()]->Enqueue(job); + } + + private: + typedef TAutoPtr<TExecThread> TExecThreadRef; + TVector<TExecThreadRef> E_; + }; +} + +IJobQueue* NNeh::JobQueue() { + return Singleton<TJobScatter>(); +} |