aboutsummaryrefslogblamecommitdiffstats
path: root/contrib/python/contourpy/src/threaded.cpp
blob: 9f6eec1bedad9b811dcc5f53aa9b6efa0094f2db (plain) (tree)
















































































































































































































                                                                                                   


                                                                    




































































































                                                                                                    
#include "base_impl.h"
#include "converter.h"
#include "threaded.h"
#include "util.h"
#include <thread>

namespace contourpy {

ThreadedContourGenerator::ThreadedContourGenerator(
    const CoordinateArray& x, const CoordinateArray& y, const CoordinateArray& z,
    const MaskArray& mask, bool corner_mask, LineType line_type, FillType fill_type,
    bool quad_as_tri, ZInterp z_interp, index_t x_chunk_size, index_t y_chunk_size,
    index_t n_threads)
    : BaseContourGenerator(x, y, z, mask, corner_mask, line_type, fill_type, quad_as_tri, z_interp,
                           x_chunk_size, y_chunk_size),
      _n_threads(limit_n_threads(n_threads, get_n_chunks())),
      _next_chunk(0)
{}

void ThreadedContourGenerator::export_filled(
    const ChunkLocal& local, std::vector<py::list>& return_lists)
{
    // Reimplementation of SerialContourGenerator::export_filled() to separate out the creation of
    // numpy arrays (which requires a thread lock) from the population of those arrays (which does
    // not). This minimises the time that the lock is used for.

    assert(local.total_point_count > 0);

    switch (get_fill_type())
    {
        case FillType::OuterCode:
        case FillType::OuterOffset: {
            assert(!has_direct_points() && !has_direct_line_offsets());

            auto outer_count = local.line_count - local.hole_count;
            bool outer_code = (get_fill_type() == FillType::OuterCode);
            std::vector<PointArray::value_type*> points_ptrs(outer_count);
            std::vector<CodeArray::value_type*> codes_ptrs(outer_code ? outer_count: 0);
            std::vector<OffsetArray::value_type*> offsets_ptrs(outer_code ? 0 : outer_count);

            {
                Lock lock(*this);  // cppcheck-suppress unreadVariable
                for (decltype(outer_count) i = 0; i < outer_count; ++i) {
                    auto outer_start = local.outer_offsets.start[i];
                    auto outer_end = local.outer_offsets.start[i+1];
                    auto point_start = local.line_offsets.start[outer_start];
                    auto point_end = local.line_offsets.start[outer_end];
                    auto point_count = point_end - point_start;
                    assert(point_count > 2);

                    index_t points_shape[2] = {static_cast<index_t>(point_count), 2};
                    PointArray point_array(points_shape);
                    return_lists[0].append(point_array);
                    points_ptrs[i] = point_array.mutable_data();

                    if (outer_code) {
                        index_t codes_shape = static_cast<index_t>(point_count);
                        CodeArray code_array(codes_shape);
                        return_lists[1].append(code_array);
                        codes_ptrs[i] = code_array.mutable_data();
                    }
                    else {
                        index_t offsets_shape = static_cast<index_t>(outer_end - outer_start + 1);
                        OffsetArray offset_array(offsets_shape);
                        return_lists[1].append(offset_array);
                        offsets_ptrs[i] = offset_array.mutable_data();
                    }
                }
            }

            for (decltype(outer_count) i = 0; i < outer_count; ++i) {
                auto outer_start = local.outer_offsets.start[i];
                auto outer_end = local.outer_offsets.start[i+1];
                auto point_start = local.line_offsets.start[outer_start];
                auto point_end = local.line_offsets.start[outer_end];
                auto point_count = point_end - point_start;
                assert(point_count > 2);

                Converter::convert_points(
                    point_count, local.points.start + 2*point_start, points_ptrs[i]);

                if (outer_code)
                    Converter::convert_codes(
                        point_count, outer_end - outer_start + 1,
                        local.line_offsets.start + outer_start, point_start, codes_ptrs[i]);
                else
                    Converter::convert_offsets(
                        outer_end - outer_start + 1, local.line_offsets.start + outer_start,
                        point_start, offsets_ptrs[i]);
            }
            break;
        }
        case FillType::ChunkCombinedCode:
        case FillType::ChunkCombinedCodeOffset: {
            assert(has_direct_points() && !has_direct_line_offsets());
            // return_lists[0][local_chunk] already contains combined points.
            // If ChunkCombinedCodeOffset. return_lists[2][local.chunk] already contains outer
            //    offsets.

            index_t codes_shape = static_cast<index_t>(local.total_point_count);
            CodeArray::value_type* codes_ptr = nullptr;

            {
                Lock lock(*this);  // cppcheck-suppress unreadVariable
                CodeArray code_array(codes_shape);
                return_lists[1][local.chunk] = code_array;
                codes_ptr = code_array.mutable_data();
            }

            Converter::convert_codes(
                local.total_point_count, local.line_count + 1, local.line_offsets.start, 0,
                codes_ptr);
            break;
        }
        case FillType::ChunkCombinedOffset:
        case FillType::ChunkCombinedOffsetOffset:
            assert(has_direct_points() && has_direct_line_offsets());
            if (get_fill_type() == FillType::ChunkCombinedOffsetOffset) {
                assert(has_direct_outer_offsets());
            }
            // return_lists[0][local_chunk] already contains combined points.
            // return_lists[1][local.chunk] already contains line offsets.
            // If ChunkCombinedOffsetOffset, return_lists[2][local.chunk] already contains
            //      outer offsets.
            break;
    }
}

void ThreadedContourGenerator::export_lines(
    const ChunkLocal& local, std::vector<py::list>& return_lists)
{
    // Reimplementation of SerialContourGenerator::export_lines() to separate out the creation of
    // numpy arrays (which requires a thread lock) from the population of those arrays (which does
    // not). This minimises the time that the lock is used for.

    assert(local.total_point_count > 0);

    switch (get_line_type())
    {
        case LineType::Separate:
        case LineType::SeparateCode: {
            assert(!has_direct_points() && !has_direct_line_offsets());

            bool separate_code = (get_line_type() == LineType::SeparateCode);
            std::vector<PointArray::value_type*> points_ptrs(local.line_count);
            std::vector<CodeArray::value_type*> codes_ptrs(separate_code ? local.line_count: 0);

            {
                Lock lock(*this);  // cppcheck-suppress unreadVariable
                for (decltype(local.line_count) i = 0; i < local.line_count; ++i) {
                    auto point_start = local.line_offsets.start[i];
                    auto point_end = local.line_offsets.start[i+1];
                    auto point_count = point_end - point_start;
                    assert(point_count > 1);

                    index_t points_shape[2] = {static_cast<index_t>(point_count), 2};
                    PointArray point_array(points_shape);

                    return_lists[0].append(point_array);
                    points_ptrs[i] = point_array.mutable_data();

                    if (separate_code) {
                        index_t codes_shape = static_cast<index_t>(point_count);
                        CodeArray code_array(codes_shape);
                        return_lists[1].append(code_array);
                        codes_ptrs[i] = code_array.mutable_data();
                    }
                }
            }

            for (decltype(local.line_count) i = 0; i < local.line_count; ++i) {
                auto point_start = local.line_offsets.start[i];
                auto point_end = local.line_offsets.start[i+1];
                auto point_count = point_end - point_start;
                assert(point_count > 1);

                Converter::convert_points(
                    point_count, local.points.start + 2*point_start, points_ptrs[i]);

                if (separate_code) {
                    Converter::convert_codes_check_closed_single(
                        point_count, local.points.start + 2*point_start, codes_ptrs[i]);
                }
            }
            break;
        }
        case LineType::ChunkCombinedCode: {
            assert(has_direct_points() && !has_direct_line_offsets());
            // return_lists[0][local.chunk] already contains points.

            index_t codes_shape = static_cast<index_t>(local.total_point_count);
            CodeArray::value_type* codes_ptr = nullptr;

            {
                Lock lock(*this);  // cppcheck-suppress unreadVariable
                CodeArray code_array(codes_shape);
                return_lists[1][local.chunk] = code_array;
                codes_ptr = code_array.mutable_data();
            }

            Converter::convert_codes_check_closed(
                local.total_point_count, local.line_count + 1, local.line_offsets.start,
                local.points.start, codes_ptr);
            break;
        }
        case LineType::ChunkCombinedOffset:
            assert(has_direct_points() && has_direct_line_offsets());
            // return_lists[0][local.chunk] already contains points.
            // return_lists[1][local.chunk] already contains line offsets.
            break;
        case LineType::ChunkCombinedNan:
            assert(has_direct_points());
            // return_lists[0][local.chunk] already contains points.
            break;
    }
}

index_t ThreadedContourGenerator::get_thread_count() const
{
    return _n_threads;
}

index_t ThreadedContourGenerator::limit_n_threads(index_t n_threads, index_t n_chunks)
{
    index_t max_threads = std::max<index_t>(Util::get_max_threads(), 1);
    if (n_threads == 0)
        return std::min(max_threads, n_chunks);
    else
        return std::min({max_threads, n_chunks, n_threads});
}

void ThreadedContourGenerator::march(std::vector<py::list>& return_lists)
{
    // Each thread executes thread_function() which has two stages:
    //   1) Initialise cache z-levels and starting locations
    //   2) Trace contours
    // Each stage is performed on a chunk by chunk basis.  There is a barrier between the two stages
    // to synchronise the threads so the cache setup is complete before being used by the trace.
    _next_chunk = 0;      // Next available chunk index.
    _finished_count = 0;  // Count of threads that have finished the cache init.

    // Main thread releases GIL for remainder of this function.
    // It is temporarily reacquired as necessary within the scope of threaded Lock objects.
    py::gil_scoped_release release;

    // Create (_n_threads-1) new worker threads.
    std::vector<std::thread> threads;
    threads.reserve(_n_threads-1);
    for (index_t i = 0; i < _n_threads-1; ++i)
        threads.emplace_back(
            &ThreadedContourGenerator::thread_function, this, std::ref(return_lists));

    thread_function(std::ref(return_lists));  // Main thread work.

    for (auto& thread : threads)
        thread.join();
    assert(_next_chunk == 2*get_n_chunks());
    threads.clear();
}

void ThreadedContourGenerator::thread_function(std::vector<py::list>& return_lists)
{
    // Function that is executed by each of the threads.
    // _next_chunk starts at zero and increases up to 2*_n_chunks.  A thread in need of work reads
    // _next_chunk and incremements it, then processes that chunk.  For _next_chunk < _n_chunks this
    // is stage 1 (init cache levels and starting locations) and for _next_chunk >= _n_chunks this
    // is stage 2 (trace contours).  There is a synchronisation barrier between the two stages so
    // that the cache initialisation is complete before being used by the contour trace.

    auto n_chunks = get_n_chunks();
    index_t chunk;
    ChunkLocal local;

    // Stage 1: Initialise cache z-levels and starting locations.
    while (true) {
        {
            std::lock_guard<std::mutex> guard(_chunk_mutex);
            if (_next_chunk < n_chunks)
                chunk = _next_chunk++;
            else
                break;  // No more work to do.
        }

        get_chunk_limits(chunk, local);
        init_cache_levels_and_starts(&local);
        local.clear();
    }

    {
        // Implementation of multithreaded barrier.  Each thread increments the shared counter.
        // Last thread to finish notifies the other threads that they can all continue.
        std::unique_lock<std::mutex> lock(_chunk_mutex);
        _finished_count++;
        if (_finished_count == _n_threads)
            _condition_variable.notify_all();
        else
            _condition_variable.wait(lock);
    }

    // Stage 2: Trace contours.
    while (true) {
        {
            std::lock_guard<std::mutex> guard(_chunk_mutex);
            if (_next_chunk < 2*n_chunks)
                chunk = _next_chunk++ - n_chunks;
            else
                break;  // No more work to do.
        }

        get_chunk_limits(chunk, local);
        march_chunk(local, return_lists);
        local.clear();
    }
}

} // namespace contourpy