blob: 8d02fd9bbfe85f45f85597215aa70feb4fedfdda (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
#include "utils.h"
#include "lfqueue.h"
#include "jobqueue.h"
#include "pipequeue.h"
#include <util/thread/factory.h>
#include <util/generic/singleton.h>
#include <util/system/thread.h>
using namespace NNeh;
namespace {
class TExecThread: public IThreadFactory::IThreadAble, public IJob {
public:
TExecThread()
: T_(SystemThreadFactory()->Run(this))
{
}
~TExecThread() override {
Enqueue(this);
T_->Join();
}
inline void Enqueue(IJob* job) {
Q_.Enqueue(job);
}
private:
void DoRun(TCont* c) override {
c->Executor()->Abort();
}
void DoExecute() override {
SetHighestThreadPriority();
TContExecutor e(RealStackSize(20000));
e.Execute<TExecThread, &TExecThread::Dispatcher>(this);
}
inline void Dispatcher(TCont* c) {
IJob* job;
while ((job = Q_.Dequeue(c))) {
try {
c->Executor()->Create(*job, "job");
} catch (...) {
(*job)(c);
}
}
}
typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
TOneConsumerPipeQueue<IJob> Q_;
IThreadRef T_;
};
class TJobScatter: public IJobQueue {
public:
inline TJobScatter() {
for (size_t i = 0; i < 2; ++i) {
E_.push_back(new TExecThread());
}
}
void ScheduleImpl(IJob* job) override {
E_[TThread::CurrentThreadId() % E_.size()]->Enqueue(job);
}
private:
typedef TAutoPtr<TExecThread> TExecThreadRef;
TVector<TExecThreadRef> E_;
};
}
IJobQueue* NNeh::JobQueue() {
return Singleton<TJobScatter>();
}
|