aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/contourpy/src/threaded.cpp
blob: e27cd55d519bef80ca75d5c794fd0f7dc7b50711 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
#include "base_impl.h"
#include "converter.h"
#include "threaded.h"
#include "util.h"
#include <thread>

namespace contourpy {

ThreadedContourGenerator::ThreadedContourGenerator(
    const CoordinateArray& x, const CoordinateArray& y, const CoordinateArray& z,
    const MaskArray& mask, bool corner_mask, LineType line_type, FillType fill_type,
    bool quad_as_tri, ZInterp z_interp, index_t x_chunk_size, index_t y_chunk_size,
    index_t n_threads)
    : BaseContourGenerator(x, y, z, mask, corner_mask, line_type, fill_type, quad_as_tri, z_interp,
                           x_chunk_size, y_chunk_size),
      _n_threads(limit_n_threads(n_threads, get_n_chunks())),
      _next_chunk(0)
{}

void ThreadedContourGenerator::export_filled(
    const ChunkLocal& local, std::vector<py::list>& return_lists)
{
    // Reimplementation of SerialContourGenerator::export_filled() to separate out the creation of
    // numpy arrays (which requires a thread lock) from the population of those arrays (which does
    // not). This minimises the time that the lock is used for.

    assert(local.total_point_count > 0);

    switch (get_fill_type())
    {
        case FillType::OuterCode:
        case FillType::OuterOffset: {
            assert(!has_direct_points() && !has_direct_line_offsets());

            auto outer_count = local.line_count - local.hole_count;
            bool outer_code = (get_fill_type() == FillType::OuterCode);
            std::vector<PointArray::value_type*> points_ptrs(outer_count);
            std::vector<CodeArray::value_type*> codes_ptrs(outer_code ? outer_count: 0);
            std::vector<OffsetArray::value_type*> offsets_ptrs(outer_code ? 0 : outer_count);

            {
                Lock lock(*this);  // cppcheck-suppress unreadVariable
                for (decltype(outer_count) i = 0; i < outer_count; ++i) {
                    auto outer_start = local.outer_offsets.start[i];
                    auto outer_end = local.outer_offsets.start[i+1];
                    auto point_start = local.line_offsets.start[outer_start];
                    auto point_end = local.line_offsets.start[outer_end];
                    auto point_count = point_end - point_start;
                    assert(point_count > 2);

                    index_t points_shape[2] = {static_cast<index_t>(point_count), 2};
                    PointArray point_array(points_shape);
                    return_lists[0].append(point_array);
                    points_ptrs[i] = point_array.mutable_data();

                    if (outer_code) {
                        index_t codes_shape = static_cast<index_t>(point_count);
                        CodeArray code_array(codes_shape);
                        return_lists[1].append(code_array);
                        codes_ptrs[i] = code_array.mutable_data();
                    }
                    else {
                        index_t offsets_shape = static_cast<index_t>(outer_end - outer_start + 1);
                        OffsetArray offset_array(offsets_shape);
                        return_lists[1].append(offset_array);
                        offsets_ptrs[i] = offset_array.mutable_data();
                    }
                }
            }

            for (decltype(outer_count) i = 0; i < outer_count; ++i) {
                auto outer_start = local.outer_offsets.start[i];
                auto outer_end = local.outer_offsets.start[i+1];
                auto point_start = local.line_offsets.start[outer_start];
                auto point_end = local.line_offsets.start[outer_end];
                auto point_count = point_end - point_start;
                assert(point_count > 2);

                Converter::convert_points(
                    point_count, local.points.start + 2*point_start, points_ptrs[i]);

                if (outer_code)
                    Converter::convert_codes(
                        point_count, outer_end - outer_start + 1,
                        local.line_offsets.start + outer_start, point_start, codes_ptrs[i]);
                else
                    Converter::convert_offsets(
                        outer_end - outer_start + 1, local.line_offsets.start + outer_start,
                        point_start, offsets_ptrs[i]);
            }
            break;
        }
        case FillType::ChunkCombinedCode:
        case FillType::ChunkCombinedCodeOffset: {
            assert(has_direct_points() && !has_direct_line_offsets());
            // return_lists[0][local_chunk] already contains combined points.
            // If ChunkCombinedCodeOffset. return_lists[2][local.chunk] already contains outer
            //    offsets.

            index_t codes_shape = static_cast<index_t>(local.total_point_count);
            CodeArray::value_type* codes_ptr = nullptr;

            {
                Lock lock(*this);  // cppcheck-suppress unreadVariable
                CodeArray code_array(codes_shape);
                return_lists[1][local.chunk] = code_array;
                codes_ptr = code_array.mutable_data();
            }

            Converter::convert_codes(
                local.total_point_count, local.line_count + 1, local.line_offsets.start, 0,
                codes_ptr);
            break;
        }
        case FillType::ChunkCombinedOffset:
        case FillType::ChunkCombinedOffsetOffset:
            assert(has_direct_points() && has_direct_line_offsets());
            if (get_fill_type() == FillType::ChunkCombinedOffsetOffset) {
                assert(has_direct_outer_offsets());
            }
            // return_lists[0][local_chunk] already contains combined points.
            // return_lists[1][local.chunk] already contains line offsets.
            // If ChunkCombinedOffsetOffset, return_lists[2][local.chunk] already contains
            //      outer offsets.
            break;
    }
}

void ThreadedContourGenerator::export_lines(
    const ChunkLocal& local, std::vector<py::list>& return_lists)
{
    // Reimplementation of SerialContourGenerator::export_lines() to separate out the creation of
    // numpy arrays (which requires a thread lock) from the population of those arrays (which does
    // not). This minimises the time that the lock is used for.

    assert(local.total_point_count > 0);

    switch (get_line_type())
    {
        case LineType::Separate:
        case LineType::SeparateCode: {
            assert(!has_direct_points() && !has_direct_line_offsets());

            bool separate_code = (get_line_type() == LineType::SeparateCode);
            std::vector<PointArray::value_type*> points_ptrs(local.line_count);
            std::vector<CodeArray::value_type*> codes_ptrs(separate_code ? local.line_count: 0);

            {
                Lock lock(*this);  // cppcheck-suppress unreadVariable
                for (decltype(local.line_count) i = 0; i < local.line_count; ++i) {
                    auto point_start = local.line_offsets.start[i];
                    auto point_end = local.line_offsets.start[i+1];
                    auto point_count = point_end - point_start;
                    assert(point_count > 1);

                    index_t points_shape[2] = {static_cast<index_t>(point_count), 2};
                    PointArray point_array(points_shape);

                    return_lists[0].append(point_array);
                    points_ptrs[i] = point_array.mutable_data();

                    if (separate_code) {
                        index_t codes_shape = static_cast<index_t>(point_count);
                        CodeArray code_array(codes_shape);
                        return_lists[1].append(code_array);
                        codes_ptrs[i] = code_array.mutable_data();
                    }
                }
            }

            for (decltype(local.line_count) i = 0; i < local.line_count; ++i) {
                auto point_start = local.line_offsets.start[i];
                auto point_end = local.line_offsets.start[i+1];
                auto point_count = point_end - point_start;
                assert(point_count > 1);

                Converter::convert_points(
                    point_count, local.points.start + 2*point_start, points_ptrs[i]);

                if (separate_code) {
                    Converter::convert_codes_check_closed_single(
                        point_count, local.points.start + 2*point_start, codes_ptrs[i]);
                }
            }
            break;
        }
        case LineType::ChunkCombinedCode: {
            assert(has_direct_points() && !has_direct_line_offsets());
            // return_lists[0][local.chunk] already contains points.

            index_t codes_shape = static_cast<index_t>(local.total_point_count);
            CodeArray::value_type* codes_ptr = nullptr;

            {
                Lock lock(*this);  // cppcheck-suppress unreadVariable
                CodeArray code_array(codes_shape);
                return_lists[1][local.chunk] = code_array;
                codes_ptr = code_array.mutable_data();
            }

            Converter::convert_codes_check_closed(
                local.total_point_count, local.line_count + 1, local.line_offsets.start,
                local.points.start, codes_ptr);
            break;
        }
        case LineType::ChunkCombinedOffset:
            assert(has_direct_points() && has_direct_line_offsets());
            // return_lists[0][local.chunk] already contains points.
            // return_lists[1][local.chunk] already contains line offsets.
            break;
    }
}

index_t ThreadedContourGenerator::get_thread_count() const
{
    return _n_threads;
}

index_t ThreadedContourGenerator::limit_n_threads(index_t n_threads, index_t n_chunks)
{
    index_t max_threads = std::max<index_t>(Util::get_max_threads(), 1);
    if (n_threads == 0)
        return std::min(max_threads, n_chunks);
    else
        return std::min({max_threads, n_chunks, n_threads});
}

void ThreadedContourGenerator::march(std::vector<py::list>& return_lists)
{
    // Each thread executes thread_function() which has two stages:
    //   1) Initialise cache z-levels and starting locations
    //   2) Trace contours
    // Each stage is performed on a chunk by chunk basis.  There is a barrier between the two stages
    // to synchronise the threads so the cache setup is complete before being used by the trace.
    _next_chunk = 0;      // Next available chunk index.
    _finished_count = 0;  // Count of threads that have finished the cache init.

    // Main thread releases GIL for remainder of this function.
    // It is temporarily reacquired as necessary within the scope of threaded Lock objects.
    py::gil_scoped_release release;

    // Create (_n_threads-1) new worker threads.
    std::vector<std::thread> threads;
    threads.reserve(_n_threads-1);
    for (index_t i = 0; i < _n_threads-1; ++i)
        threads.emplace_back(
            &ThreadedContourGenerator::thread_function, this, std::ref(return_lists));

    thread_function(std::ref(return_lists));  // Main thread work.

    for (auto& thread : threads)
        thread.join();
    assert(_next_chunk == 2*get_n_chunks());
    threads.clear();
}

void ThreadedContourGenerator::thread_function(std::vector<py::list>& return_lists)
{
    // Function that is executed by each of the threads.
    // _next_chunk starts at zero and increases up to 2*_n_chunks.  A thread in need of work reads
    // _next_chunk and incremements it, then processes that chunk.  For _next_chunk < _n_chunks this
    // is stage 1 (init cache levels and starting locations) and for _next_chunk >= _n_chunks this
    // is stage 2 (trace contours).  There is a synchronisation barrier between the two stages so
    // that the cache initialisation is complete before being used by the contour trace.

    auto n_chunks = get_n_chunks();
    index_t chunk;
    ChunkLocal local;

    // Stage 1: Initialise cache z-levels and starting locations.
    while (true) {
        {
            std::lock_guard<std::mutex> guard(_chunk_mutex);
            if (_next_chunk < n_chunks)
                chunk = _next_chunk++;
            else
                break;  // No more work to do.
        }

        get_chunk_limits(chunk, local);
        init_cache_levels_and_starts(&local);
        local.clear();
    }

    {
        // Implementation of multithreaded barrier.  Each thread increments the shared counter.
        // Last thread to finish notifies the other threads that they can all continue.
        std::unique_lock<std::mutex> lock(_chunk_mutex);
        _finished_count++;
        if (_finished_count == _n_threads)
            _condition_variable.notify_all();
        else
            _condition_variable.wait(lock);
    }

    // Stage 2: Trace contours.
    while (true) {
        {
            std::lock_guard<std::mutex> guard(_chunk_mutex);
            if (_next_chunk < 2*n_chunks)
                chunk = _next_chunk++ - n_chunks;
            else
                break;  // No more work to do.
        }

        get_chunk_limits(chunk, local);
        march_chunk(local, return_lists);
        local.clear();
    }
}

} // namespace contourpy