aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/neh/asio/executor.h
blob: 4f6549044d82f04e1e175ca4f5fc6a98bb951d6d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#pragma once

#include "asio.h"

#include <library/cpp/deprecated/atomic/atomic.h>

#include <util/thread/factory.h>
#include <util/system/thread.h>

namespace NAsio {
    class TIOServiceExecutor: public IThreadFactory::IThreadAble {
    public:
        TIOServiceExecutor()
            : Work_(new TIOService::TWork(Srv_))
        {
            T_ = SystemThreadFactory()->Run(this);
        }

        ~TIOServiceExecutor() override {
            SyncShutdown();
        }

        void DoExecute() override {
            TThread::SetCurrentThreadName("NehAsioExecutor");
            Srv_.Run();
        }

        inline TIOService& GetIOService() noexcept {
            return Srv_;
        }

        void SyncShutdown() {
            if (Work_) {
                Work_.Destroy();
                Srv_.Abort(); //cancel all async operations, break Run() execution
                T_->Join();
            }
        }

    private:
        TIOService Srv_;
        TAutoPtr<TIOService::TWork> Work_;
        typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
        IThreadRef T_;
    };

    class TExecutorsPool {
    public:
        TExecutorsPool(size_t executors)
            : C_(0)
        {
            for (size_t i = 0; i < executors; ++i) {
                E_.push_back(new TIOServiceExecutor());
            }
        }

        inline size_t Size() const noexcept {
            return E_.size();
        }

        inline TIOServiceExecutor& GetExecutor() noexcept {
            TAtomicBase next = AtomicIncrement(C_);
            return *E_[next % E_.size()];
        }

        void SyncShutdown() {
            for (size_t i = 0; i < E_.size(); ++i) {
                E_[i]->SyncShutdown();
            }
        }

    private:
        TAtomic C_;
        TVector<TAutoPtr<TIOServiceExecutor>> E_;
    };
}