aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/jobqueue.cpp
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/neh/jobqueue.cpp
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/neh/jobqueue.cpp')
-rw-r--r--library/cpp/neh/jobqueue.cpp79
1 files changed, 79 insertions, 0 deletions
diff --git a/library/cpp/neh/jobqueue.cpp b/library/cpp/neh/jobqueue.cpp
new file mode 100644
index 0000000000..8d02fd9bbf
--- /dev/null
+++ b/library/cpp/neh/jobqueue.cpp
@@ -0,0 +1,79 @@
+#include "utils.h"
+#include "lfqueue.h"
+#include "jobqueue.h"
+#include "pipequeue.h"
+
+#include <util/thread/factory.h>
+#include <util/generic/singleton.h>
+#include <util/system/thread.h>
+
+using namespace NNeh;
+
+namespace {
+ class TExecThread: public IThreadFactory::IThreadAble, public IJob {
+ public:
+ TExecThread()
+ : T_(SystemThreadFactory()->Run(this))
+ {
+ }
+
+ ~TExecThread() override {
+ Enqueue(this);
+ T_->Join();
+ }
+
+ inline void Enqueue(IJob* job) {
+ Q_.Enqueue(job);
+ }
+
+ private:
+ void DoRun(TCont* c) override {
+ c->Executor()->Abort();
+ }
+
+ void DoExecute() override {
+ SetHighestThreadPriority();
+
+ TContExecutor e(RealStackSize(20000));
+
+ e.Execute<TExecThread, &TExecThread::Dispatcher>(this);
+ }
+
+ inline void Dispatcher(TCont* c) {
+ IJob* job;
+
+ while ((job = Q_.Dequeue(c))) {
+ try {
+ c->Executor()->Create(*job, "job");
+ } catch (...) {
+ (*job)(c);
+ }
+ }
+ }
+
+ typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
+ TOneConsumerPipeQueue<IJob> Q_;
+ IThreadRef T_;
+ };
+
+ class TJobScatter: public IJobQueue {
+ public:
+ inline TJobScatter() {
+ for (size_t i = 0; i < 2; ++i) {
+ E_.push_back(new TExecThread());
+ }
+ }
+
+ void ScheduleImpl(IJob* job) override {
+ E_[TThread::CurrentThreadId() % E_.size()]->Enqueue(job);
+ }
+
+ private:
+ typedef TAutoPtr<TExecThread> TExecThreadRef;
+ TVector<TExecThreadRef> E_;
+ };
+}
+
+IJobQueue* NNeh::JobQueue() {
+ return Singleton<TJobScatter>();
+}