aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/jobqueue.cpp
blob: 8d02fd9bbfe85f45f85597215aa70feb4fedfdda (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include "utils.h"
#include "lfqueue.h"
#include "jobqueue.h"
#include "pipequeue.h"

#include <util/thread/factory.h>
#include <util/generic/singleton.h>
#include <util/system/thread.h>

using namespace NNeh;

namespace {
    class TExecThread: public IThreadFactory::IThreadAble, public IJob {
    public:
        TExecThread()
            : T_(SystemThreadFactory()->Run(this))
        {
        }

        ~TExecThread() override {
            Enqueue(this);
            T_->Join();
        }

        inline void Enqueue(IJob* job) {
            Q_.Enqueue(job);
        }

    private:
        void DoRun(TCont* c) override {
            c->Executor()->Abort();
        }

        void DoExecute() override {
            SetHighestThreadPriority();

            TContExecutor e(RealStackSize(20000));

            e.Execute<TExecThread, &TExecThread::Dispatcher>(this);
        }

        inline void Dispatcher(TCont* c) {
            IJob* job;

            while ((job = Q_.Dequeue(c))) {
                try {
                    c->Executor()->Create(*job, "job");
                } catch (...) {
                    (*job)(c);
                }
            }
        }

        typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
        TOneConsumerPipeQueue<IJob> Q_;
        IThreadRef T_;
    };

    class TJobScatter: public IJobQueue {
    public:
        inline TJobScatter() {
            for (size_t i = 0; i < 2; ++i) {
                E_.push_back(new TExecThread());
            }
        }

        void ScheduleImpl(IJob* job) override {
            E_[TThread::CurrentThreadId() % E_.size()]->Enqueue(job);
        }

    private:
        typedef TAutoPtr<TExecThread> TExecThreadRef;
        TVector<TExecThreadRef> E_;
    };
}

IJobQueue* NNeh::JobQueue() {
    return Singleton<TJobScatter>();
}