aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/threading/local_executor/local_executor.h
blob: d0c622f4ef5afbd916a2a7df44d129231aad4ebd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
#pragma once

#include <library/cpp/threading/future/future.h> 

#include <util/generic/cast.h>
#include <util/generic/fwd.h>
#include <util/generic/noncopyable.h>
#include <util/generic/ptr.h>
#include <util/generic/singleton.h>
#include <util/generic/ymath.h>

#include <functional>

namespace NPar {
    struct ILocallyExecutable : virtual public TThrRefBase {
        // Must be implemented by the end user to define job that will be processed by one of
        // executor threads.
        //
        // @param id        Job parameter, typically an index pointing somewhere in array, or just
        //                  some dummy value, e.g. `0`.
        virtual void LocalExec(int id) = 0;
    };

    // Alternative and simpler way of describing a job for executor. Function argument has the
    // same meaning as `id` in `ILocallyExecutable::LocalExec`.
    //
    using TLocallyExecutableFunction = std::function<void(int)>;

    class ILocalExecutor: public TNonCopyable {
    public:
        ILocalExecutor() = default;
        virtual ~ILocalExecutor() = default;

        enum EFlags : int {
            HIGH_PRIORITY = 0,
            MED_PRIORITY = 1,
            LOW_PRIORITY = 2,
            PRIORITY_MASK = 3,
            WAIT_COMPLETE = 4
        };

        // Add task for further execution.
        //
        // @param exec          Task description.
        // @param id            Task argument.
        // @param flags         Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
        //                      and `WAIT_COMPLETE`.
        virtual void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) = 0;

        // Add tasks range for further execution.
        //
        // @param exec                      Task description.
        // @param firstId, lastId           Task arguments [firstId, lastId)
        // @param flags                     Same as for `Exec`.
        virtual void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) = 0;

        // 0-based ILocalExecutor worker thread identification
        virtual int GetWorkerThreadId() const noexcept = 0;
        virtual int GetThreadCount() const noexcept = 0;

        // Describes a range of tasks with parameters from integer range [FirstId, LastId).
        //
        class TExecRangeParams {
        public:
            template <typename TFirst, typename TLast>
            TExecRangeParams(TFirst firstId, TLast lastId)
                : FirstId(SafeIntegerCast<int>(firstId))
                , LastId(SafeIntegerCast<int>(lastId))
            {
                Y_ASSERT(LastId >= FirstId);
                SetBlockSize(1);
            }
            // Partition tasks into `blockCount` blocks of approximately equal size, each of which
            // will be executed as a separate bigger task.
            //
            template <typename TBlockCount>
            TExecRangeParams& SetBlockCount(TBlockCount blockCount) {
                Y_ASSERT(SafeIntegerCast<int>(blockCount) > 0 || FirstId == LastId);
                BlockSize = FirstId == LastId ? 0 : CeilDiv(LastId - FirstId, SafeIntegerCast<int>(blockCount));
                BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);
                BlockEqualToThreads = false;
                return *this;
            }
            // Partition tasks into blocks of approximately `blockSize` size, each of which will
            // be executed as a separate bigger task.
            //
            template <typename TBlockSize>
            TExecRangeParams& SetBlockSize(TBlockSize blockSize) {
                Y_ASSERT(SafeIntegerCast<int>(blockSize) > 0 || FirstId == LastId);
                BlockSize = SafeIntegerCast<int>(blockSize);
                BlockCount = BlockSize == 0 ? 0 : CeilDiv(LastId - FirstId, BlockSize);
                BlockEqualToThreads = false;
                return *this;
            }
            // Partition tasks into thread count blocks of approximately equal size, each of which
            // will be executed as a separate bigger task.
            //
            TExecRangeParams& SetBlockCountToThreadCount() {
                BlockEqualToThreads = true;
                return *this;
            }
            int GetBlockCount() const {
                Y_ASSERT(!BlockEqualToThreads);
                return BlockCount;
            }
            int GetBlockSize() const {
                Y_ASSERT(!BlockEqualToThreads);
                return BlockSize;
            }
            bool GetBlockEqualToThreads() {
                return BlockEqualToThreads;
            }

            const int FirstId = 0;
            const int LastId = 0;

        private:
            int BlockSize;
            int BlockCount;
            bool BlockEqualToThreads;
        };

        // `Exec` and `ExecRange` versions that accept functions.
        //
        void Exec(TLocallyExecutableFunction exec, int id, int flags);
        void ExecRange(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);

        // Version of `ExecRange` that throws exception from task with minimal id if at least one of
        // task threw an exception.
        //
        void ExecRangeWithThrow(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);

        // Version of `ExecRange` that returns vector of futures, thus allowing to retry any task if
        // it fails.
        //
        TVector<NThreading::TFuture<void>> ExecRangeWithFutures(TLocallyExecutableFunction exec, int firstId, int lastId, int flags);

        template <typename TBody>
        static inline auto BlockedLoopBody(const TExecRangeParams& params, const TBody& body) {
            return [=](int blockId) {
                const int blockFirstId = params.FirstId + blockId * params.GetBlockSize();
                const int blockLastId = Min(params.LastId, blockFirstId + params.GetBlockSize());
                for (int i = blockFirstId; i < blockLastId; ++i) {
                    body(i);
                }
            };
        }

        template <typename TBody>
        inline void ExecRange(TBody&& body, TExecRangeParams params, int flags) {
            if (TryExecRangeSequentially(body, params.FirstId, params.LastId, flags)) {
                return;
            }
            if (params.GetBlockEqualToThreads()) {
                params.SetBlockCount(GetThreadCount() + ((flags & WAIT_COMPLETE) != 0)); // ThreadCount or ThreadCount+1 depending on WaitFlag
            }
            ExecRange(BlockedLoopBody(params, body), 0, params.GetBlockCount(), flags);
        }

        template <typename TBody>
        inline void ExecRangeBlockedWithThrow(TBody&& body, int firstId, int lastId, int batchSizeOrZeroForAutoBatchSize, int flags) {
            if (firstId >= lastId) {
                return;
            }
            const int threadCount = Max(GetThreadCount(), 1);
            const int batchSize = batchSizeOrZeroForAutoBatchSize
                ? batchSizeOrZeroForAutoBatchSize
                : (lastId - firstId + threadCount - 1) / threadCount;
            const int batchCount = (lastId - firstId + batchSize - 1) / batchSize;
            const int batchCountPerThread = (batchCount + threadCount - 1) / threadCount;
            auto states = ExecRangeWithFutures(
                [=](int threadId) {
                    for (int batchIdPerThread = 0; batchIdPerThread < batchCountPerThread; ++batchIdPerThread) {
                        int batchId = batchIdPerThread * threadCount + threadId;
                        int begin = firstId + batchId * batchSize;
                        int end = Min(begin + batchSize, lastId);
                        for (int i = begin; i < end; ++i) {
                            body(i);
                        }
                    }
                },
                0, threadCount, flags);
            for (auto& state: states) {
                state.GetValueSync(); // Re-throw exception if any.
            }
        }

        template <typename TBody>
        static inline bool TryExecRangeSequentially(TBody&& body, int firstId, int lastId, int flags) {
            if (lastId == firstId) {
                return true;
            }
            if ((flags & WAIT_COMPLETE) && lastId - firstId == 1) {
                body(firstId);
                return true;
            }
            return false;
        }
    };

    // `TLocalExecutor` provides facilities for easy parallelization of existing code and cycles.
    //
    // Examples:
    // Execute one task with medium priority and wait for it completion.
    // ```
    // LocalExecutor().Run(4);
    // TEvent event;
    // LocalExecutor().Exec([](int) {
    //     SomeFunc();
    //     event.Signal();
    // }, 0, TLocalExecutor::MED_PRIORITY);
    //
    // SomeOtherCode();
    // event.WaitI();
    // ```
    //
    // Execute range of tasks with medium priority.
    // ```
    // LocalExecutor().Run(4);
    // LocalExecutor().ExecRange([](int id) {
    //     SomeFunc(id);
    // }, TExecRangeParams(0, 10), TLocalExecutor::WAIT_COMPLETE | TLocalExecutor::MED_PRIORITY);
    // ```
    //
    class TLocalExecutor final: public ILocalExecutor {
    public:
        using EFlags = ILocalExecutor::EFlags;

        // Creates executor without threads. You'll need to explicitly call `RunAdditionalThreads`
        // to add threads to underlying thread pool.
        //
        TLocalExecutor();
        ~TLocalExecutor();

        int GetQueueSize() const noexcept;
        int GetMPQueueSize() const noexcept;
        int GetLPQueueSize() const noexcept;
        void ClearLPQueue();

        // 0-based TLocalExecutor worker thread identification
        int GetWorkerThreadId() const noexcept override;
        int GetThreadCount() const noexcept override;

        // **Add** threads to underlying thread pool.
        //
        // @param threadCount       Number of threads to add.
        void RunAdditionalThreads(int threadCount);

        // Add task for further execution.
        //
        // @param exec          Task description.
        // @param id            Task argument.
        // @param flags         Bitmask composed by `HIGH_PRIORITY`, `MED_PRIORITY`, `LOW_PRIORITY`
        //                      and `WAIT_COMPLETE`.
        void Exec(TIntrusivePtr<ILocallyExecutable> exec, int id, int flags) override;

        // Add tasks range for further execution.
        //
        // @param exec                      Task description.
        // @param firstId, lastId           Task arguments [firstId, lastId)
        // @param flags                     Same as for `Exec`.
        void ExecRange(TIntrusivePtr<ILocallyExecutable> exec, int firstId, int lastId, int flags) override;

        using ILocalExecutor::Exec;
        using ILocalExecutor::ExecRange;

    private:
        class TImpl;
        THolder<TImpl> Impl_;
    };

    static inline TLocalExecutor& LocalExecutor() {
        return *Singleton<TLocalExecutor>();
    }

    template <typename TBody>
    inline void ParallelFor(ILocalExecutor& executor, ui32 from, ui32 to, TBody&& body) {
        ILocalExecutor::TExecRangeParams params(from, to);
        params.SetBlockCountToThreadCount();
        executor.ExecRange(std::forward<TBody>(body), params, TLocalExecutor::WAIT_COMPLETE);
    }

    template <typename TBody>
    inline void ParallelFor(ui32 from, ui32 to, TBody&& body) {
        ParallelFor(LocalExecutor(), from, to, std::forward<TBody>(body));
    }

    template <typename TBody>
    inline void AsyncParallelFor(ui32 from, ui32 to, TBody&& body) {
        ILocalExecutor::TExecRangeParams params(from, to);
        params.SetBlockCountToThreadCount();
        LocalExecutor().ExecRange(std::forward<TBody>(body), params, 0);
    }
}