#include "utils.h" #include "lfqueue.h" #include "jobqueue.h" #include "pipequeue.h" #include <util/thread/factory.h> #include <util/generic/singleton.h> #include <util/system/thread.h> using namespace NNeh; namespace { class TExecThread: public IThreadFactory::IThreadAble, public IJob { public: TExecThread() : T_(SystemThreadFactory()->Run(this)) { } ~TExecThread() override { Enqueue(this); T_->Join(); } inline void Enqueue(IJob* job) { Q_.Enqueue(job); } private: void DoRun(TCont* c) override { c->Executor()->Abort(); } void DoExecute() override { SetHighestThreadPriority(); TContExecutor e(RealStackSize(20000)); e.Execute<TExecThread, &TExecThread::Dispatcher>(this); } inline void Dispatcher(TCont* c) { IJob* job; while ((job = Q_.Dequeue(c))) { try { c->Executor()->Create(*job, "job"); } catch (...) { (*job)(c); } } } typedef TAutoPtr<IThreadFactory::IThread> IThreadRef; TOneConsumerPipeQueue<IJob> Q_; IThreadRef T_; }; class TJobScatter: public IJobQueue { public: inline TJobScatter() { for (size_t i = 0; i < 2; ++i) { E_.push_back(new TExecThread()); } } void ScheduleImpl(IJob* job) override { E_[TThread::CurrentThreadId() % E_.size()]->Enqueue(job); } private: typedef TAutoPtr<TExecThread> TExecThreadRef; TVector<TExecThreadRef> E_; }; } IJobQueue* NNeh::JobQueue() { return Singleton<TJobScatter>(); }